ractogateway.pipelines.agent.pipeline
AgentPipeline - autonomous ReAct agent with pluggable tools.
Implements the Reason+Act (ReAct) loop:
The LLM receives a system prompt listing all registered tools and a growing conversation transcript (goal + previous steps).
The LLM responds with a JSON object — either a single tool call:
{"thought": "...", "tool_name": "...", "tool_input": {...}}or a parallel batch:{"thought": "...", "tool_calls": [{"tool_name": "...", "tool_input": {...}}, ...]}The tool(s) are executed; results (observations) are appended to the transcript.
The loop repeats until the LLM calls
finish(answer=...)ormax_stepsis reached.
Usage:
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.agent import AgentPipeline
def get_weather(city: str) -> str:
"""Return the current weather for a city."""
return f"Sunny, 22 C in {city}"
agent = AgentPipeline(
kit=Chat(model="gpt-4o"),
tools=[get_weather],
max_steps=6,
safe_mode=True,
max_parallel_tools=4,
)
result = agent.run("What is the weather in Paris and London?")
print(result.final_answer)
print(result.to_markdown())
# Async variant (FastAPI / async servers):
result = await agent.arun("What is the weather in Tokyo?")
- class ractogateway.pipelines.agent.pipeline.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectAutonomous ReAct agent with sync
run()and asyncarun().- Parameters:
kit (
Any) – Any RactoGateway developer kit (must supportchat()andachat()methods).tools (
list[Callable[...,Any]] |None) – List of plain callables or@tool-decorated functions to register. Each is registered by its__name__(or__tool_name__if set by the@tooldecorator).rag_pipeline (
Any) – OptionalRactoRAGinstance — auto-registers arag_searchtool.sql_pipeline (
Any) – OptionalSQLAnalystPipeline— auto-registers asql_querytool.enable_http (
bool) – WhenTrue, registers anhttp_gettool that fetches URLs viahttpx(requirespip install ractogateway[pipelines-agent-http]).agent_memory (
Any) – Any dict-like or object withget/setmethods. Auto-registersmemory_readandmemory_writetools.max_steps (
int) – Hard cap on reasoning steps before the loop stops withStopReason.MAX_STEPS.max_consecutive_errors (
int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default3.tool_retries (
int) – How many times to retry a failing tool before reporting the error to the LLM. Default0(no retry).max_step_extension (
int) – Maximum additional steps the agent may request viarequest_more_steps.0disables the feature.max_parallel_tools (
int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch.1forces sequential execution. Default4.system_prompt (
str|None) – Fully override the auto-generated system prompt (advanced).extra_rules (
str) – Append an extra numbered rule to the default system prompt.safe_mode (
bool) – Catch all exceptions and surface them inresult.errorinstead of re-raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracer.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Run the agent synchronously until it finishes or hits
max_steps.- Parameters:
goal (
str) – The task or question the agent should solve.max_steps (
Any) – Per-call override for the constructormax_steps.user_id (
Any) – Per-call override for the rate-limiter user identifier.session_id (
str|None) – Optional session tag passed to tracer spans (informational).response_format (
type|None) – A PydanticBaseModelsubclass. When provided, the agent’s final answer is parsed and validated into an instance stored inAgentResult.parsed_output.
- Return type:
- Returns:
AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.
- async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Async variant of
run().Uses
kit.achat()for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.- Parameters:
- Return type:
- property system_prompt: str
The system prompt sent to the LLM each step (read-only).
- class ractogateway.pipelines.agent.pipeline.AsyncAgentPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
AgentPipeline.Exposes a single
async run()method - suitable for FastAPI endpoints where a syncrun()should not be in the public API.All constructor parameters are identical to
AgentPipeline.- async run(goal, **kwargs)[source]
Async-only agent entrypoint. See
AgentPipeline.arun().- Return type:
- property system_prompt: str
The system prompt used by the inner pipeline.