ractogateway.pipelines.agent.pipeline

AgentPipeline - autonomous ReAct agent with pluggable tools.

Implements the Reason+Act (ReAct) loop:

  1. The LLM receives a system prompt listing all registered tools and a growing conversation transcript (goal + previous steps).

  2. The LLM responds with a JSON object — either a single tool call: {"thought": "...", "tool_name": "...", "tool_input": {...}} or a parallel batch: {"thought": "...", "tool_calls": [{"tool_name": "...", "tool_input": {...}}, ...]}

  3. The tool(s) are executed; results (observations) are appended to the transcript.

  4. The loop repeats until the LLM calls finish(answer=...) or max_steps is reached.

Usage:

from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.agent import AgentPipeline

def get_weather(city: str) -> str:
    """Return the current weather for a city."""
    return f"Sunny, 22 C in {city}"

agent = AgentPipeline(
    kit=Chat(model="gpt-4o"),
    tools=[get_weather],
    max_steps=6,
    safe_mode=True,
    max_parallel_tools=4,
)

result = agent.run("What is the weather in Paris and London?")
print(result.final_answer)
print(result.to_markdown())

# Async variant (FastAPI / async servers):
result = await agent.arun("What is the weather in Tokyo?")
class ractogateway.pipelines.agent.pipeline.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Autonomous ReAct agent with sync run() and async arun().

Parameters:
  • kit (Any) – Any RactoGateway developer kit (must support chat() and achat() methods).

  • tools (list[Callable[..., Any]] | None) – List of plain callables or @tool-decorated functions to register. Each is registered by its __name__ (or __tool_name__ if set by the @tool decorator).

  • rag_pipeline (Any) – Optional RactoRAG instance — auto-registers a rag_search tool.

  • sql_pipeline (Any) – Optional SQLAnalystPipeline — auto-registers a sql_query tool.

  • enable_http (bool) – When True, registers an http_get tool that fetches URLs via httpx (requires pip install ractogateway[pipelines-agent-http]).

  • agent_memory (Any) – Any dict-like or object with get/set methods. Auto-registers memory_read and memory_write tools.

  • max_steps (int) – Hard cap on reasoning steps before the loop stops with StopReason.MAX_STEPS.

  • max_consecutive_errors (int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default 3.

  • tool_retries (int) – How many times to retry a failing tool before reporting the error to the LLM. Default 0 (no retry).

  • max_step_extension (int) – Maximum additional steps the agent may request via request_more_steps. 0 disables the feature.

  • max_parallel_tools (int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch. 1 forces sequential execution. Default 4.

  • system_prompt (str | None) – Fully override the auto-generated system prompt (advanced).

  • extra_rules (str) – Append an extra numbered rule to the default system prompt.

  • safe_mode (bool) – Catch all exceptions and surface them in result.error instead of re-raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Run the agent synchronously until it finishes or hits max_steps.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag passed to tracer spans (informational).

  • response_format (type | None) – A Pydantic BaseModel subclass. When provided, the agent’s final answer is parsed and validated into an instance stored in AgentResult.parsed_output.

Return type:

AgentResult

Returns:

AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.

async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Async variant of run().

Uses kit.achat() for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag for tracing.

  • response_format (type | None) – A Pydantic BaseModel subclass for structured output parsing.

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt sent to the LLM each step (read-only).

class ractogateway.pipelines.agent.pipeline.AsyncAgentPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of AgentPipeline.

Exposes a single async run() method - suitable for FastAPI endpoints where a sync run() should not be in the public API.

All constructor parameters are identical to AgentPipeline.

async run(goal, **kwargs)[source]

Async-only agent entrypoint. See AgentPipeline.arun().

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt used by the inner pipeline.