Source code for ractogateway.pipelines.agent.pipeline

"""AgentPipeline - autonomous ReAct agent with pluggable tools.

Implements the Reason+Act (ReAct) loop:

  1. The LLM receives a system prompt listing all registered tools and a
     growing conversation transcript (goal + previous steps).
  2. The LLM responds with a JSON object — either a single tool call:
     ``{"thought": "...", "tool_name": "...", "tool_input": {...}}``
     or a parallel batch:
     ``{"thought": "...", "tool_calls": [{"tool_name": "...", "tool_input": {...}}, ...]}``
  3. The tool(s) are executed; results (observations) are appended to the
     transcript.
  4. The loop repeats until the LLM calls ``finish(answer=...)`` or
     ``max_steps`` is reached.

Usage::

    from ractogateway.openai_developer_kit import Chat
    from ractogateway.pipelines.agent import AgentPipeline

    def get_weather(city: str) -> str:
        \"\"\"Return the current weather for a city.\"\"\"
        return f"Sunny, 22 C in {city}"

    agent = AgentPipeline(
        kit=Chat(model="gpt-4o"),
        tools=[get_weather],
        max_steps=6,
        safe_mode=True,
        max_parallel_tools=4,
    )

    result = agent.run("What is the weather in Paris and London?")
    print(result.final_answer)
    print(result.to_markdown())

    # Async variant (FastAPI / async servers):
    result = await agent.arun("What is the weather in Tokyo?")
"""

from __future__ import annotations

import json
import re
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from typing import Any

from ._executor import (
    FINISH_TOOL,
    ToolExecutor,
    make_finish_tool,
    make_http_tool,
    make_memory_tools,
    make_rag_tool,
    make_sql_tool,
)
from ._models import (
    AgentRateLimitExceededError,
    AgentResult,
    AgentStep,
    AgentUsage,
    StopReason,
)

# Built-in tool name for dynamic step extension
REQUEST_STEPS_TOOL = "request_more_steps"

# Type alias for one parsed tool call
_ToolCall = tuple[str, dict[str, Any]]

# Sentinel for "not provided by caller"
_UNSET = object()

# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------

_MEMO_PREVIEW_CHARS = 200

_SYSTEM_PROMPT_TEMPLATE = """\
You are an autonomous AI agent. You solve tasks step-by-step by calling the \
tools listed below. Think carefully before each action.

AVAILABLE TOOLS:
{tool_descriptions}

RESPONSE FORMAT
---------------
Single tool call (default):
{{"thought": "<your reasoning>", "tool_name": "<tool_name>", "tool_input": {{<key-value args>}}}}

Multiple independent tools in one step (parallel):
{{"thought": "<your reasoning>", "tool_calls": [{{"tool_name": "...", "tool_input": {{...}}}}, ...]}}

Output ONLY a single valid JSON object — no extra text, no markdown.

RULES:
1. Always include "thought" and either "tool_name"+"tool_input" OR "tool_calls".
2. To finish, call the finish tool:
   {{"thought": "I have the answer", "tool_name": "finish", "tool_input": {{"answer": "<full answer>"}}}}
3. If a tool returns an ERROR, adjust your approach and try a different strategy. Do NOT retry \
the exact same tool call with identical inputs.
4. Never invent data — only use facts from tool observations.
5. The "thought" field is your private reasoning — be concise.
6. Memory check: before calling any tool, scan previous observations. If you already called that \
tool with identical inputs and received a result, reuse that observation — do not call it again.
7. Early exit: if a tool error is clearly unrecoverable (service unavailable, data not found, \
missing dependency), call finish immediately with a partial answer that states what succeeded, \
what failed, and why — do not keep retrying.
8. Parallel execution: when multiple independent tool calls are needed, use the "tool_calls" \
array format so they run simultaneously — this is faster than sequential calls.
{extra_rules}"""


def _build_system_prompt(executor: ToolExecutor, extra_rules: str = "") -> str:
    extra = f"9. {extra_rules}" if extra_rules else ""
    return _SYSTEM_PROMPT_TEMPLATE.format(
        tool_descriptions=executor.describe_all(),
        extra_rules=extra,
    )


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _call_key(tool_name: str, tool_input: dict[str, Any]) -> str:
    """Stable deduplication key for a tool call."""
    return f"{tool_name}({json.dumps(tool_input, sort_keys=True)})"


def _build_transcript(
    goal: str,
    steps: list[AgentStep],
    seen_calls: dict[str, str] | None = None,
) -> str:
    """Build the rolling conversation transcript passed to the LLM each step."""
    lines: list[str] = [f"TASK: {goal}\n"]

    # Inject deduplication memo so the LLM avoids redundant tool calls
    if seen_calls:
        memo = ["PRIOR TOOL RESULTS (reuse these — do not repeat the same calls):"]
        for key, obs in seen_calls.items():
            preview = obs[:_MEMO_PREVIEW_CHARS] + ("..." if len(obs) > _MEMO_PREVIEW_CHARS else "")
            memo.append(f"  {key}  =>  {preview}")
        lines.append("\n".join(memo) + "\n")

    for step in steps:
        action = json.dumps(
            {
                "thought": step.thought,
                "tool_name": step.tool_name,
                "tool_input": step.tool_input,
            }
        )
        lines.append(f"Assistant: {action}")
        lines.append(f"Observation: {step.observation}\n")
    lines.append("Your next action (JSON only):")
    return "\n".join(lines)


def _parse_response(
    text: str,
) -> tuple[str | None, list[_ToolCall]]:
    """Parse the LLM text into ``(thought, list_of_tool_calls)``.

    Handles:

    - Single-call format: ``{"thought": "...", "tool_name": "...", "tool_input": {...}}``
    - Parallel-call format: ``{"thought": "...", "tool_calls": [...]}``
    - JSON inside markdown code fences
    - Malformed responses (falls back to ``finish`` with raw text as answer)
    """
    raw = text.strip()

    # Strip markdown code fences  ``` or ```json
    raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE)
    raw = re.sub(r"\s*```\s*$", "", raw, flags=re.MULTILINE)
    raw = raw.strip()

    # Extract the first {...} block
    match = re.search(r"\{.*\}", raw, re.DOTALL)
    if match:
        raw = match.group(0)

    try:
        data = json.loads(raw)
        thought: str | None = data.get("thought") or None

        # ── Parallel format ──────────────────────────────────────────────
        tool_calls_raw = data.get("tool_calls")
        if isinstance(tool_calls_raw, list) and tool_calls_raw:
            calls: list[_ToolCall] = []
            for item in tool_calls_raw:
                if not isinstance(item, dict):
                    continue
                name = str(item.get("tool_name", FINISH_TOOL)).strip()
                inp = item.get("tool_input", {})
                if not isinstance(inp, dict):
                    inp = {"value": str(inp)}
                calls.append((name, inp))
            if calls:
                return thought, calls

        # ── Single-call format ───────────────────────────────────────────
        tool_name = str(data.get("tool_name", FINISH_TOOL)).strip()
        tool_input = data.get("tool_input", {})
        if not isinstance(tool_input, dict):
            tool_input = {"value": str(tool_input)}
        return thought, [(tool_name, tool_input)]

    except (json.JSONDecodeError, ValueError):
        return None, [(FINISH_TOOL, {"answer": text})]


def _build_partial_answer(goal: str, steps: list[AgentStep], last_error: str) -> str:
    """Summarise what succeeded before the circuit breaker fired."""
    successes = [
        f"• {s.tool_name}: {s.observation[:200]}"
        for s in steps
        if not s.observation.startswith("ERROR") and not s.is_finish
    ]
    body = "\n".join(successes) if successes else "No tools completed successfully."
    return (
        f"The agent was unable to complete the full task due to repeated tool failures.\n\n"
        f"**Goal:** {goal}\n\n"
        f"**What succeeded:**\n{body}\n\n"
        f"**Last error:** {last_error}"
    )


def _coerce_response_format(
    answer: str,
    model_class: Any,
    kit: Any,
    prompt: Any,
) -> Any:
    """Parse *answer* into *model_class*.

    Attempt 1: direct JSON parse + Pydantic validation.
    Attempt 2: one LLM correction call to reformat the answer.
    Returns ``None`` on total failure.
    """
    # Attempt 1 — direct parse
    try:
        raw = answer.strip()
        m = re.search(r"\{.*\}", raw, re.DOTALL)
        if m:
            raw = m.group(0)
        return model_class.model_validate(json.loads(raw))
    except Exception:
        pass

    # Attempt 2 — ask LLM to reformat
    try:
        from ractogateway._models.chat import ChatConfig

        msg = (
            f"Reformat the following text as a valid JSON object matching the "
            f"{model_class.__name__} schema. Return ONLY the JSON object.\n\nText:\n{answer}"
        )
        resp = kit.chat(ChatConfig(user_message=msg, prompt=prompt))
        raw = (resp.content or "").strip()
        m = re.search(r"\{.*\}", raw, re.DOTALL)
        if m:
            raw = m.group(0)
        return model_class.model_validate(json.loads(raw))
    except Exception:
        return None


async def _acoerce_response_format(
    answer: str,
    model_class: Any,
    kit: Any,
    prompt: Any,
) -> Any:
    """Async variant of :func:`_coerce_response_format`."""
    # Attempt 1 — direct parse
    try:
        raw = answer.strip()
        m = re.search(r"\{.*\}", raw, re.DOTALL)
        if m:
            raw = m.group(0)
        return model_class.model_validate(json.loads(raw))
    except Exception:
        pass

    # Attempt 2 — ask LLM to reformat
    try:
        from ractogateway._models.chat import ChatConfig

        msg = (
            f"Reformat the following text as a valid JSON object matching the "
            f"{model_class.__name__} schema. Return ONLY the JSON object.\n\nText:\n{answer}"
        )
        resp = await kit.achat(ChatConfig(user_message=msg, prompt=prompt))
        raw = (resp.content or "").strip()
        m = re.search(r"\{.*\}", raw, re.DOTALL)
        if m:
            raw = m.group(0)
        return model_class.model_validate(json.loads(raw))
    except Exception:
        return None


# ---------------------------------------------------------------------------
# AgentPipeline
# ---------------------------------------------------------------------------


[docs] class AgentPipeline: """Autonomous ReAct agent with sync ``run()`` and async ``arun()``. Parameters ---------- kit: Any RactoGateway developer kit (must support ``chat()`` and ``achat()`` methods). tools: List of plain callables or ``@tool``-decorated functions to register. Each is registered by its ``__name__`` (or ``__tool_name__`` if set by the ``@tool`` decorator). rag_pipeline: Optional ``RactoRAG`` instance — auto-registers a ``rag_search`` tool. sql_pipeline: Optional ``SQLAnalystPipeline`` — auto-registers a ``sql_query`` tool. enable_http: When ``True``, registers an ``http_get`` tool that fetches URLs via ``httpx`` (requires ``pip install ractogateway[pipelines-agent-http]``). agent_memory: Any dict-like or object with ``get``/``set`` methods. Auto-registers ``memory_read`` and ``memory_write`` tools. max_steps: Hard cap on reasoning steps before the loop stops with :attr:`StopReason.MAX_STEPS`. max_consecutive_errors: Number of consecutive tool errors that trigger the circuit breaker (:attr:`StopReason.CIRCUIT_BREAK`). Default ``3``. tool_retries: How many times to retry a failing tool before reporting the error to the LLM. Default ``0`` (no retry). max_step_extension: Maximum additional steps the agent may request via ``request_more_steps``. ``0`` disables the feature. max_parallel_tools: Maximum number of tools to run simultaneously when the LLM requests a parallel batch. ``1`` forces sequential execution. Default ``4``. system_prompt: Fully override the auto-generated system prompt (advanced). extra_rules: Append an extra numbered rule to the default system prompt. safe_mode: Catch all exceptions and surface them in ``result.error`` instead of re-raising. tracer: Optional :class:`ractogateway.telemetry.RactoTracer`. metrics: Optional :class:`ractogateway.telemetry.GatewayMetricsMiddleware`. rate_limiter: Duck-typed rate limiter with ``check_and_consume(user_id, tokens)`` and ``get_remaining(user_id)`` methods. user_id: Default user identifier passed to the rate limiter. """ def __init__( self, kit: Any, *, tools: list[Callable[..., Any]] | None = None, rag_pipeline: Any = None, sql_pipeline: Any = None, enable_http: bool = False, agent_memory: Any = None, max_steps: int = 10, max_consecutive_errors: int = 3, tool_retries: int = 0, max_step_extension: int = 0, max_parallel_tools: int = 4, system_prompt: str | None = None, extra_rules: str = "", safe_mode: bool = False, tracer: Any = None, metrics: Any = None, rate_limiter: Any = None, user_id: str = "default", ) -> None: self._kit = kit self._max_steps = max_steps self._max_consecutive_errors = max_consecutive_errors self._max_step_extension = max_step_extension self._max_parallel_tools = max(1, max_parallel_tools) self._safe_mode = safe_mode self._tracer = tracer self._metrics = metrics self._rate_limiter = rate_limiter self._user_id = user_id # ── Build tool registry ─────────────────────────────────────────── tool_map: dict[str, Callable[..., Any]] = {} # 1. Finish — always present fname, ffn = make_finish_tool() tool_map[fname] = ffn # 2. RAG search if rag_pipeline is not None: tname, tfn = make_rag_tool(rag_pipeline) tool_map[tname] = tfn # 3. SQL query if sql_pipeline is not None: tname, tfn = make_sql_tool(sql_pipeline) tool_map[tname] = tfn # 4. HTTP fetch (opt-in) if enable_http: tname, tfn = make_http_tool() tool_map[tname] = tfn # 5. Memory read/write if agent_memory is not None: for tname, tfn in make_memory_tools(agent_memory): tool_map[tname] = tfn # 6. Dynamic step extension (intercepted in loop; executor never calls it) def request_more_steps(additional: int = 3, _reason: str = "") -> str: """Request additional reasoning steps when the task needs more work.""" return f"Requested {additional} more steps." tool_map[REQUEST_STEPS_TOOL] = request_more_steps # 7. User-supplied tools (last, so they can shadow built-ins) for fn in tools or []: name: str = getattr(fn, "__tool_name__", None) or fn.__name__ tool_map[name] = fn self._executor = ToolExecutor(tool_map, max_retries=tool_retries) # ── System prompt ───────────────────────────────────────────────── self._system_prompt = ( system_prompt if system_prompt is not None else _build_system_prompt(self._executor, extra_rules) ) # ── RactoPrompt for ChatConfig ───────────────────────────────────── from ractogateway.prompts.engine import RactoPrompt self._prompt = RactoPrompt( role="You are an autonomous AI agent.", aim=self._system_prompt, constraints=["Follow all instructions in the aim section exactly."], tone="Systematic, precise, and tool-focused.", output_format=( 'Single: {"thought": "...", "tool_name": "...", "tool_input": {...}} ' 'OR parallel: {"thought": "...", "tool_calls": [{"tool_name": ..., "tool_input": ...}, ...]}' ), anti_hallucination=False, ) # ── Rate limiter ───────────────────────────────────────────────────────── def _check_rate_limit(self, user_id: str) -> None: if self._rate_limiter is None: return allowed = self._rate_limiter.check_and_consume(user_id, 1) if not allowed: raise AgentRateLimitExceededError( f"Rate limit exceeded for user '{user_id}'. " f"Remaining: {self._rate_limiter.get_remaining(user_id)}" ) # ── Public sync API ──────────────────────────────────────────────────────
[docs] def run( self, goal: str, *, max_steps: Any = _UNSET, user_id: Any = _UNSET, session_id: str | None = None, response_format: type | None = None, ) -> AgentResult: """Run the agent synchronously until it finishes or hits ``max_steps``. Parameters ---------- goal: The task or question the agent should solve. max_steps: Per-call override for the constructor ``max_steps``. user_id: Per-call override for the rate-limiter user identifier. session_id: Optional session tag passed to tracer spans (informational). response_format: A Pydantic ``BaseModel`` subclass. When provided, the agent's final answer is parsed and validated into an instance stored in :attr:`AgentResult.parsed_output`. Returns ------- AgentResult Contains the final answer, all steps, stop reason, token usage, and (when *response_format* is set) the parsed structured output. """ uid = self._user_id if user_id is _UNSET else user_id steps_cap = self._max_steps if max_steps is _UNSET else int(max_steps) if self._safe_mode: try: return self._run_loop(goal, steps_cap, uid, session_id, response_format) except Exception as exc: return AgentResult( goal=goal, stop_reason=StopReason.ERROR, error=f"{type(exc).__name__}: {exc}", ) return self._run_loop(goal, steps_cap, uid, session_id, response_format)
# ── Public async API ─────────────────────────────────────────────────────
[docs] async def arun( self, goal: str, *, max_steps: Any = _UNSET, user_id: Any = _UNSET, session_id: str | None = None, response_format: type | None = None, ) -> AgentResult: """Async variant of :meth:`run`. Uses ``kit.achat()`` for LLM calls and runs sync tools in a thread pool so the event loop is never blocked. Parameters ---------- goal: The task or question the agent should solve. max_steps: Per-call override for the constructor ``max_steps``. user_id: Per-call override for the rate-limiter user identifier. session_id: Optional session tag for tracing. response_format: A Pydantic ``BaseModel`` subclass for structured output parsing. """ uid = self._user_id if user_id is _UNSET else user_id steps_cap = self._max_steps if max_steps is _UNSET else int(max_steps) if self._safe_mode: try: return await self._arun_loop(goal, steps_cap, uid, session_id, response_format) except Exception as exc: return AgentResult( goal=goal, stop_reason=StopReason.ERROR, error=f"{type(exc).__name__}: {exc}", ) return await self._arun_loop(goal, steps_cap, uid, session_id, response_format)
# ── Private helpers ─────────────────────────────────────────────────────── def _filter_calls( self, calls: list[_ToolCall], step_cap: int, steps: list[AgentStep], step_num: int, thought: str | None, ) -> tuple[list[_ToolCall], int]: """Filter finish/request_more_steps from *calls*; apply step extension. Returns ``(tool_calls, updated_step_cap)``. """ tool_calls: list[_ToolCall] = [] for call_name, call_inp in calls: if call_name == FINISH_TOOL: continue if call_name == REQUEST_STEPS_TOOL: additional = min( int(call_inp.get("additional", 3)), self._max_step_extension, ) if additional > 0: step_cap += additional steps.append( AgentStep( step_num=step_num, thought=thought, tool_name=REQUEST_STEPS_TOOL, tool_input=call_inp, observation=f"Granted {additional} extra steps. New cap: {step_cap}.", duration_ms=0.0, ) ) else: tool_calls.append((call_name, call_inp)) return tool_calls, step_cap def _exec_sync(self, tool_calls: list[_ToolCall]) -> list[tuple[str, float]]: """Execute tool calls synchronously, in parallel when appropriate.""" if len(tool_calls) == 1 or self._max_parallel_tools <= 1: return [self._executor.execute(name, inp) for name, inp in tool_calls] workers = min(len(tool_calls), self._max_parallel_tools) with ThreadPoolExecutor(max_workers=workers) as pool: futures = [ pool.submit(self._executor.execute, name, inp) for name, inp in tool_calls ] return [f.result() for f in futures] async def _exec_async(self, tool_calls: list[_ToolCall]) -> list[tuple[str, float]]: """Execute tool calls asynchronously, in parallel when appropriate.""" if len(tool_calls) == 1 or self._max_parallel_tools <= 1: return [await self._executor.aexecute(name, inp) for name, inp in tool_calls] capped = tool_calls[: self._max_parallel_tools] overflow = tool_calls[self._max_parallel_tools :] results: list[tuple[str, float]] = list( await self._executor.aexecute_parallel(capped) ) for name, inp in overflow: results.append(await self._executor.aexecute(name, inp)) return results def _apply_results( self, tool_calls: list[_ToolCall], exec_results: list[tuple[str, float]], steps: list[AgentStep], seen_calls: dict[str, str], step_num: int, thought: str | None, consecutive_errors: int, goal: str, usage: AgentUsage, ) -> tuple[int, AgentResult | None]: """Record tool results into *steps*/*seen_calls*; return circuit-break result if triggered.""" step_thought: str | None = thought for (name, inp), (obs, dur) in zip(tool_calls, exec_results, strict=False): seen_calls[_call_key(name, inp)] = obs steps.append( AgentStep( step_num=step_num, thought=step_thought, tool_name=name, tool_input=inp, observation=obs, duration_ms=dur, ) ) step_thought = None if obs.startswith("ERROR"): consecutive_errors += 1 if consecutive_errors >= self._max_consecutive_errors: partial = _build_partial_answer(goal, steps, obs) return consecutive_errors, AgentResult( goal=goal, final_answer=partial, steps=steps, stop_reason=StopReason.CIRCUIT_BREAK, error=f"Circuit breaker: {consecutive_errors} consecutive tool errors.", usage=usage, ) else: consecutive_errors = 0 return consecutive_errors, None # ── Sync execution loop ─────────────────────────────────────────────────── def _run_loop( self, goal: str, max_steps: int, user_id: str, _session_id: str | None, response_format: type | None, ) -> AgentResult: from ractogateway._models.chat import ChatConfig self._check_rate_limit(user_id) usage = AgentUsage() steps: list[AgentStep] = [] consecutive_errors = 0 seen_calls: dict[str, str] = {} step_cap = max_steps step_num = 0 while step_num < step_cap: step_num += 1 transcript = _build_transcript(goal, steps, seen_calls) response = self._kit.chat(ChatConfig(user_message=transcript, prompt=self._prompt)) if hasattr(response, "usage") and isinstance(response.usage, dict): raw: dict[str, Any] = response.usage in_tok: int = int(raw.get("input_tokens") or raw.get("prompt_tokens") or 0) out_tok: int = int(raw.get("output_tokens") or raw.get("completion_tokens") or 0) usage.total_input_tokens += in_tok usage.total_output_tokens += out_tok usage.steps_taken = step_num thought, calls = _parse_response(response.content or "") # ── Finish (single finish call) ────────────────────────────── if len(calls) == 1 and calls[0][0] == FINISH_TOOL: tool_input = calls[0][1] answer = tool_input.get("answer", response.content or "") steps.append( AgentStep( step_num=step_num, thought=thought, tool_name=FINISH_TOOL, tool_input=tool_input, observation="", is_finish=True, ) ) parsed = ( _coerce_response_format(str(answer), response_format, self._kit, self._prompt) if response_format is not None else None ) return AgentResult( goal=goal, final_answer=str(answer), steps=steps, stop_reason=StopReason.FINISHED, usage=usage, parsed_output=parsed, ) # ── Filter calls; intercept request_more_steps ─────────────── tool_calls, step_cap = self._filter_calls(calls, step_cap, steps, step_num, thought) if not tool_calls: continue # ── Execute tools (parallel if multiple) ───────────────────── exec_results = self._exec_sync(tool_calls) usage.tools_called += len(exec_results) # ── Record results; check circuit breaker ───────────────────── consecutive_errors, cb_result = self._apply_results( tool_calls, exec_results, steps, seen_calls, step_num, thought, consecutive_errors, goal, usage, ) if cb_result is not None: return cb_result # ── Max steps reached ──────────────────────────────────────────── return AgentResult( goal=goal, final_answer=None, steps=steps, stop_reason=StopReason.MAX_STEPS, usage=usage, ) # ── Async execution loop ────────────────────────────────────────────────── async def _arun_loop( self, goal: str, max_steps: int, user_id: str, _session_id: str | None, response_format: type | None, ) -> AgentResult: from ractogateway._models.chat import ChatConfig self._check_rate_limit(user_id) usage = AgentUsage() steps: list[AgentStep] = [] consecutive_errors = 0 seen_calls: dict[str, str] = {} step_cap = max_steps step_num = 0 while step_num < step_cap: step_num += 1 transcript = _build_transcript(goal, steps, seen_calls) response = await self._kit.achat( ChatConfig(user_message=transcript, prompt=self._prompt) ) if hasattr(response, "usage") and isinstance(response.usage, dict): raw: dict[str, Any] = response.usage in_tok: int = int(raw.get("input_tokens") or raw.get("prompt_tokens") or 0) out_tok: int = int(raw.get("output_tokens") or raw.get("completion_tokens") or 0) usage.total_input_tokens += in_tok usage.total_output_tokens += out_tok usage.steps_taken = step_num thought, calls = _parse_response(response.content or "") # ── Finish ─────────────────────────────────────────────────── if len(calls) == 1 and calls[0][0] == FINISH_TOOL: tool_input = calls[0][1] answer = tool_input.get("answer", response.content or "") steps.append( AgentStep( step_num=step_num, thought=thought, tool_name=FINISH_TOOL, tool_input=tool_input, observation="", is_finish=True, ) ) parsed = ( await _acoerce_response_format( str(answer), response_format, self._kit, self._prompt ) if response_format is not None else None ) return AgentResult( goal=goal, final_answer=str(answer), steps=steps, stop_reason=StopReason.FINISHED, usage=usage, parsed_output=parsed, ) # ── Filter calls; intercept request_more_steps ─────────────── tool_calls, step_cap = self._filter_calls(calls, step_cap, steps, step_num, thought) if not tool_calls: continue # ── Execute tools (parallel via asyncio.gather) ─────────────── exec_results = await self._exec_async(tool_calls) usage.tools_called += len(exec_results) # ── Record results; check circuit breaker ───────────────────── consecutive_errors, cb_result = self._apply_results( tool_calls, exec_results, steps, seen_calls, step_num, thought, consecutive_errors, goal, usage, ) if cb_result is not None: return cb_result # ── Max steps reached ──────────────────────────────────────────── return AgentResult( goal=goal, final_answer=None, steps=steps, stop_reason=StopReason.MAX_STEPS, usage=usage, ) # ── Utilities ───────────────────────────────────────────────────────────── @property def registered_tools(self) -> list[str]: """Sorted list of all registered tool names.""" return self._executor.names @property def system_prompt(self) -> str: """The system prompt sent to the LLM each step (read-only).""" return self._system_prompt
# --------------------------------------------------------------------------- # Async-only variant (for FastAPI / async servers) # ---------------------------------------------------------------------------
[docs] class AsyncAgentPipeline: """Async-only variant of :class:`AgentPipeline`. Exposes a single ``async run()`` method - suitable for FastAPI endpoints where a sync ``run()`` should not be in the public API. All constructor parameters are identical to :class:`AgentPipeline`. """ def __init__(self, *args: Any, **kwargs: Any) -> None: self._inner = AgentPipeline(*args, **kwargs)
[docs] async def run( self, goal: str, **kwargs: Any, ) -> AgentResult: """Async-only agent entrypoint. See :meth:`AgentPipeline.arun`.""" return await self._inner.arun(goal, **kwargs)
@property def registered_tools(self) -> list[str]: """Sorted list of all registered tool names.""" return self._inner.registered_tools @property def system_prompt(self) -> str: """The system prompt used by the inner pipeline.""" return self._inner.system_prompt