API Reference — AgentPipeline
AgentPipeline — autonomous ReAct agent with pluggable tools.
Exports
- AgentPipeline
Sync
run()+ asyncarun()ReAct loop.- AsyncAgentPipeline
Async-only wrapper suitable for FastAPI / async servers.
- AgentResult, AgentStep, AgentUsage, StopReason
Result and step models.
- AgentRateLimitExceededError
Raised when the rate limiter blocks a run.
Tool factories (for advanced use)
make_finish_tool, make_rag_tool, make_rag_tool_async, make_sql_tool, make_http_tool, make_memory_tools
Pre-built tool factories used internally; exposed for custom wiring.
- ToolExecutor
Low-level sync / async tool runner.
Install
No extra dependencies required for the core agent.
HTTP tool requires httpx:
pip install ractogateway[pipelines-agent-http]
- class ractogateway.pipelines.agent.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectAutonomous ReAct agent with sync
run()and asyncarun().- Parameters:
kit (
Any) – Any RactoGateway developer kit (must supportchat()andachat()methods).tools (
list[Callable[...,Any]] |None) – List of plain callables or@tool-decorated functions to register. Each is registered by its__name__(or__tool_name__if set by the@tooldecorator).rag_pipeline (
Any) – OptionalRactoRAGinstance — auto-registers arag_searchtool.sql_pipeline (
Any) – OptionalSQLAnalystPipeline— auto-registers asql_querytool.enable_http (
bool) – WhenTrue, registers anhttp_gettool that fetches URLs viahttpx(requirespip install ractogateway[pipelines-agent-http]).agent_memory (
Any) – Any dict-like or object withget/setmethods. Auto-registersmemory_readandmemory_writetools.max_steps (
int) – Hard cap on reasoning steps before the loop stops withStopReason.MAX_STEPS.max_consecutive_errors (
int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default3.tool_retries (
int) – How many times to retry a failing tool before reporting the error to the LLM. Default0(no retry).max_step_extension (
int) – Maximum additional steps the agent may request viarequest_more_steps.0disables the feature.max_parallel_tools (
int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch.1forces sequential execution. Default4.system_prompt (
str|None) – Fully override the auto-generated system prompt (advanced).extra_rules (
str) – Append an extra numbered rule to the default system prompt.safe_mode (
bool) – Catch all exceptions and surface them inresult.errorinstead of re-raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracer.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Run the agent synchronously until it finishes or hits
max_steps.- Parameters:
goal (
str) – The task or question the agent should solve.max_steps (
Any) – Per-call override for the constructormax_steps.user_id (
Any) – Per-call override for the rate-limiter user identifier.session_id (
str|None) – Optional session tag passed to tracer spans (informational).response_format (
type|None) – A PydanticBaseModelsubclass. When provided, the agent’s final answer is parsed and validated into an instance stored inAgentResult.parsed_output.
- Return type:
- Returns:
AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.
- async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Async variant of
run().Uses
kit.achat()for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.- Parameters:
- Return type:
- class ractogateway.pipelines.agent.AsyncAgentPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
AgentPipeline.Exposes a single
async run()method - suitable for FastAPI endpoints where a syncrun()should not be in the public API.All constructor parameters are identical to
AgentPipeline.- async run(goal, **kwargs)[source]
Async-only agent entrypoint. See
AgentPipeline.arun().- Return type:
- class ractogateway.pipelines.agent.AgentResult(**data)[source]
Bases:
BaseModelFull output of an
AgentPipelinerun.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- stop_reason: StopReason
Why the loop terminated.
- usage: AgentUsage
Token and step counts.
- parsed_output: Any
Validated Pydantic model instance when
response_formatwas passed torun(). Not included in JSON serialisation; callresult.parsed_output.model_dump()manually.
- class ractogateway.pipelines.agent.AgentStep(**data)[source]
Bases:
BaseModelOne reasoning + action step in the ReAct loop.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.agent.AgentUsage(**data)[source]
Bases:
BaseModelToken and step accounting across the full agent run.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.agent.StopReason(*values)[source]
-
Why the agent loop terminated.
- FINISHED = 'finished'
- MAX_STEPS = 'max_steps'
- ERROR = 'error'
- CIRCUIT_BREAK = 'circuit_break'
- exception ractogateway.pipelines.agent.AgentRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter blocks an agent run.
- class ractogateway.pipelines.agent.ToolExecutor(tools, max_retries=0)[source]
Bases:
objectRuns registered tools by name with sync and async support.
- Parameters:
- execute(tool_name, tool_input)[source]
Execute tool_name synchronously, retrying up to max_retries times on exception.
- async aexecute(tool_name, tool_input)[source]
Execute tool_name asynchronously, retrying up to max_retries times on exception.
Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.
- ractogateway.pipelines.agent.make_finish_tool()[source]
Return the always-present
finishtool.When the LLM calls
finish(answer=...), the agent loop stops and returns the answer asAgentResult.final_answer.
- ractogateway.pipelines.agent.make_http_tool()[source]
Return an
http_gettool that fetches URL content via httpx.Requires
httpx:pip install ractogateway[pipelines-agent-http]
- ractogateway.pipelines.agent.make_memory_tools(agent_memory)[source]
Return
memory_readandmemory_writetools backed by agent_memory.agent_memory can be any object supporting:
memory.get(key) -> Any memory.set(key, value) -> None
or a plain
dict.
- ractogateway.pipelines.agent.make_rag_tool(rag_pipeline)[source]
Return a
rag_searchtool backed by aRactoRAGpipeline.
- ractogateway.pipelines.agent.make_rag_tool_async(rag_pipeline)[source]
Return an async
rag_searchtool backed by an asyncRactoRAG.
- ractogateway.pipelines.agent.make_sql_tool(sql_pipeline)[source]
Return a
sql_querytool backed by aSQLAnalystPipeline.
Models for AgentPipeline.
- class ractogateway.pipelines.agent._models.StopReason(*values)[source]
-
Why the agent loop terminated.
- FINISHED = 'finished'
- MAX_STEPS = 'max_steps'
- ERROR = 'error'
- CIRCUIT_BREAK = 'circuit_break'
- class ractogateway.pipelines.agent._models.AgentUsage(**data)[source]
Bases:
BaseModelToken and step accounting across the full agent run.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.agent._models.AgentStep(**data)[source]
Bases:
BaseModelOne reasoning + action step in the ReAct loop.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.agent._models.AgentResult(**data)[source]
Bases:
BaseModelFull output of an
AgentPipelinerun.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- stop_reason: StopReason
Why the loop terminated.
- usage: AgentUsage
Token and step counts.
- parsed_output: Any
Validated Pydantic model instance when
response_formatwas passed torun(). Not included in JSON serialisation; callresult.parsed_output.model_dump()manually.
- exception ractogateway.pipelines.agent._models.AgentRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter blocks an agent run.
Tool registration and execution engine for AgentPipeline.
- ToolExecutor wraps a dict of callables and provides:
Synchronous execution with timing
Async execution (runs sync callables in a thread pool)
Parallel async execution via asyncio.gather
Human-readable tool descriptions for the system prompt
- Built-in tool factories:
make_finish_tool() - Always registered; signals task completion make_rag_tool(rag) - Auto-registered when rag_pipeline is provided make_sql_tool(sql) - Auto-registered when sql_pipeline is provided make_http_tool() - Opt-in; fetches URLs via httpx make_memory_tools(mem) - Auto-registered when agent_memory is provided
- class ractogateway.pipelines.agent._executor.ToolExecutor(tools, max_retries=0)[source]
Bases:
objectRuns registered tools by name with sync and async support.
- Parameters:
- execute(tool_name, tool_input)[source]
Execute tool_name synchronously, retrying up to max_retries times on exception.
- async aexecute(tool_name, tool_input)[source]
Execute tool_name asynchronously, retrying up to max_retries times on exception.
Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.
- ractogateway.pipelines.agent._executor.make_finish_tool()[source]
Return the always-present
finishtool.When the LLM calls
finish(answer=...), the agent loop stops and returns the answer asAgentResult.final_answer.
- ractogateway.pipelines.agent._executor.make_rag_tool(rag_pipeline)[source]
Return a
rag_searchtool backed by aRactoRAGpipeline.
- ractogateway.pipelines.agent._executor.make_rag_tool_async(rag_pipeline)[source]
Return an async
rag_searchtool backed by an asyncRactoRAG.
- ractogateway.pipelines.agent._executor.make_sql_tool(sql_pipeline)[source]
Return a
sql_querytool backed by aSQLAnalystPipeline.
- ractogateway.pipelines.agent._executor.make_http_tool()[source]
Return an
http_gettool that fetches URL content via httpx.Requires
httpx:pip install ractogateway[pipelines-agent-http]
- ractogateway.pipelines.agent._executor.make_memory_tools(agent_memory)[source]
Return
memory_readandmemory_writetools backed by agent_memory.agent_memory can be any object supporting:
memory.get(key) -> Any memory.set(key, value) -> None
or a plain
dict.
AgentPipeline - autonomous ReAct agent with pluggable tools.
Implements the Reason+Act (ReAct) loop:
The LLM receives a system prompt listing all registered tools and a growing conversation transcript (goal + previous steps).
The LLM responds with a JSON object — either a single tool call:
{"thought": "...", "tool_name": "...", "tool_input": {...}}or a parallel batch:{"thought": "...", "tool_calls": [{"tool_name": "...", "tool_input": {...}}, ...]}The tool(s) are executed; results (observations) are appended to the transcript.
The loop repeats until the LLM calls
finish(answer=...)ormax_stepsis reached.
Usage:
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.agent import AgentPipeline
def get_weather(city: str) -> str:
"""Return the current weather for a city."""
return f"Sunny, 22 C in {city}"
agent = AgentPipeline(
kit=Chat(model="gpt-4o"),
tools=[get_weather],
max_steps=6,
safe_mode=True,
max_parallel_tools=4,
)
result = agent.run("What is the weather in Paris and London?")
print(result.final_answer)
print(result.to_markdown())
# Async variant (FastAPI / async servers):
result = await agent.arun("What is the weather in Tokyo?")
- class ractogateway.pipelines.agent.pipeline.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectAutonomous ReAct agent with sync
run()and asyncarun().- Parameters:
kit (
Any) – Any RactoGateway developer kit (must supportchat()andachat()methods).tools (
list[Callable[...,Any]] |None) – List of plain callables or@tool-decorated functions to register. Each is registered by its__name__(or__tool_name__if set by the@tooldecorator).rag_pipeline (
Any) – OptionalRactoRAGinstance — auto-registers arag_searchtool.sql_pipeline (
Any) – OptionalSQLAnalystPipeline— auto-registers asql_querytool.enable_http (
bool) – WhenTrue, registers anhttp_gettool that fetches URLs viahttpx(requirespip install ractogateway[pipelines-agent-http]).agent_memory (
Any) – Any dict-like or object withget/setmethods. Auto-registersmemory_readandmemory_writetools.max_steps (
int) – Hard cap on reasoning steps before the loop stops withStopReason.MAX_STEPS.max_consecutive_errors (
int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default3.tool_retries (
int) – How many times to retry a failing tool before reporting the error to the LLM. Default0(no retry).max_step_extension (
int) – Maximum additional steps the agent may request viarequest_more_steps.0disables the feature.max_parallel_tools (
int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch.1forces sequential execution. Default4.system_prompt (
str|None) – Fully override the auto-generated system prompt (advanced).extra_rules (
str) – Append an extra numbered rule to the default system prompt.safe_mode (
bool) – Catch all exceptions and surface them inresult.errorinstead of re-raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracer.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Run the agent synchronously until it finishes or hits
max_steps.- Parameters:
goal (
str) – The task or question the agent should solve.max_steps (
Any) – Per-call override for the constructormax_steps.user_id (
Any) – Per-call override for the rate-limiter user identifier.session_id (
str|None) – Optional session tag passed to tracer spans (informational).response_format (
type|None) – A PydanticBaseModelsubclass. When provided, the agent’s final answer is parsed and validated into an instance stored inAgentResult.parsed_output.
- Return type:
- Returns:
AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.
- async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Async variant of
run().Uses
kit.achat()for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.- Parameters:
- Return type:
- class ractogateway.pipelines.agent.pipeline.AsyncAgentPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
AgentPipeline.Exposes a single
async run()method - suitable for FastAPI endpoints where a syncrun()should not be in the public API.All constructor parameters are identical to
AgentPipeline.- async run(goal, **kwargs)[source]
Async-only agent entrypoint. See
AgentPipeline.arun().- Return type: