ractogateway.pipelines.agent
AgentPipeline — autonomous ReAct agent with pluggable tools.
Exports
- AgentPipeline
Sync
run()+ asyncarun()ReAct loop.- AsyncAgentPipeline
Async-only wrapper suitable for FastAPI / async servers.
- AgentResult, AgentStep, AgentUsage, StopReason
Result and step models.
- AgentRateLimitExceededError
Raised when the rate limiter blocks a run.
Tool factories (for advanced use)
make_finish_tool, make_rag_tool, make_rag_tool_async, make_sql_tool, make_http_tool, make_memory_tools
Pre-built tool factories used internally; exposed for custom wiring.
- ToolExecutor
Low-level sync / async tool runner.
Install
No extra dependencies required for the core agent.
HTTP tool requires httpx:
pip install ractogateway[pipelines-agent-http]
- class ractogateway.pipelines.agent.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectAutonomous ReAct agent with sync
run()and asyncarun().- Parameters:
kit (
Any) – Any RactoGateway developer kit (must supportchat()andachat()methods).tools (
list[Callable[...,Any]] |None) – List of plain callables or@tool-decorated functions to register. Each is registered by its__name__(or__tool_name__if set by the@tooldecorator).rag_pipeline (
Any) – OptionalRactoRAGinstance — auto-registers arag_searchtool.sql_pipeline (
Any) – OptionalSQLAnalystPipeline— auto-registers asql_querytool.enable_http (
bool) – WhenTrue, registers anhttp_gettool that fetches URLs viahttpx(requirespip install ractogateway[pipelines-agent-http]).agent_memory (
Any) – Any dict-like or object withget/setmethods. Auto-registersmemory_readandmemory_writetools.max_steps (
int) – Hard cap on reasoning steps before the loop stops withStopReason.MAX_STEPS.max_consecutive_errors (
int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default3.tool_retries (
int) – How many times to retry a failing tool before reporting the error to the LLM. Default0(no retry).max_step_extension (
int) – Maximum additional steps the agent may request viarequest_more_steps.0disables the feature.max_parallel_tools (
int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch.1forces sequential execution. Default4.system_prompt (
str|None) – Fully override the auto-generated system prompt (advanced).extra_rules (
str) – Append an extra numbered rule to the default system prompt.safe_mode (
bool) – Catch all exceptions and surface them inresult.errorinstead of re-raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracer.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Run the agent synchronously until it finishes or hits
max_steps.- Parameters:
goal (
str) – The task or question the agent should solve.max_steps (
Any) – Per-call override for the constructormax_steps.user_id (
Any) – Per-call override for the rate-limiter user identifier.session_id (
str|None) – Optional session tag passed to tracer spans (informational).response_format (
type|None) – A PydanticBaseModelsubclass. When provided, the agent’s final answer is parsed and validated into an instance stored inAgentResult.parsed_output.
- Return type:
- Returns:
AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.
- async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Async variant of
run().Uses
kit.achat()for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.- Parameters:
- Return type:
- property system_prompt: str
The system prompt sent to the LLM each step (read-only).
- class ractogateway.pipelines.agent.AsyncAgentPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
AgentPipeline.Exposes a single
async run()method - suitable for FastAPI endpoints where a syncrun()should not be in the public API.All constructor parameters are identical to
AgentPipeline.- async run(goal, **kwargs)[source]
Async-only agent entrypoint. See
AgentPipeline.arun().- Return type:
- property system_prompt: str
The system prompt used by the inner pipeline.
- class ractogateway.pipelines.agent.AgentResult(**data)[source]
Bases:
BaseModelFull output of an
AgentPipelinerun.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- goal: str
The original user goal / task string.
- stop_reason: StopReason
Why the loop terminated.
- usage: AgentUsage
Token and step counts.
- parsed_output: Any
Validated Pydantic model instance when
response_formatwas passed torun(). Not included in JSON serialisation; callresult.parsed_output.model_dump()manually.
- get_tool_calls()[source]
Return (tool_name, tool_input) for every non-finish step.
- to_json(path=None, *, indent=2)[source]
Serialise to JSON. Returns string when path is
None.
- class ractogateway.pipelines.agent.AgentStep(**data)[source]
Bases:
BaseModelOne reasoning + action step in the ReAct loop.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- step_num: int
1-based position in the execution sequence.
- tool_name: str
Name of the tool that was invoked.
- observation: str
The tool’s return value (truncated to 4000 chars when long).
- duration_ms: float
Wall-clock time for tool execution in milliseconds.
- is_finish: bool
True when this step invoked the finish() termination tool.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.agent.AgentUsage(**data)[source]
Bases:
BaseModelToken and step accounting across the full agent run.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- total_input_tokens: int
Cumulative input tokens across all LLM calls.
- total_output_tokens: int
Cumulative output tokens across all LLM calls.
- steps_taken: int
Number of reasoning+action steps executed.
- tools_called: int
Number of non-finish tool invocations.
- property total_tokens: int
Sum of all input and output tokens.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.agent.StopReason(*values)[source]
-
Why the agent loop terminated.
- FINISHED = 'finished'
- MAX_STEPS = 'max_steps'
- ERROR = 'error'
- CIRCUIT_BREAK = 'circuit_break'
- exception ractogateway.pipelines.agent.AgentRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter blocks an agent run.
- class ractogateway.pipelines.agent.ToolExecutor(tools, max_retries=0)[source]
Bases:
objectRuns registered tools by name with sync and async support.
- Parameters:
- execute(tool_name, tool_input)[source]
Execute tool_name synchronously, retrying up to max_retries times on exception.
- async aexecute(tool_name, tool_input)[source]
Execute tool_name asynchronously, retrying up to max_retries times on exception.
Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.
- async aexecute_parallel(calls)[source]
Execute multiple tool calls concurrently via
asyncio.gather.
- ractogateway.pipelines.agent.make_finish_tool()[source]
Return the always-present
finishtool.When the LLM calls
finish(answer=...), the agent loop stops and returns the answer asAgentResult.final_answer.
- ractogateway.pipelines.agent.make_http_tool()[source]
Return an
http_gettool that fetches URL content via httpx.Requires
httpx:pip install ractogateway[pipelines-agent-http]
- ractogateway.pipelines.agent.make_memory_tools(agent_memory)[source]
Return
memory_readandmemory_writetools backed by agent_memory.agent_memory can be any object supporting:
memory.get(key) -> Any memory.set(key, value) -> None
or a plain
dict.
- ractogateway.pipelines.agent.make_rag_tool(rag_pipeline)[source]
Return a
rag_searchtool backed by aRactoRAGpipeline.
- ractogateway.pipelines.agent.make_rag_tool_async(rag_pipeline)[source]
Return an async
rag_searchtool backed by an asyncRactoRAG.