API Reference — AgentPipeline

AgentPipeline — autonomous ReAct agent with pluggable tools.

Exports

AgentPipeline

Sync run() + async arun() ReAct loop.

AsyncAgentPipeline

Async-only wrapper suitable for FastAPI / async servers.

AgentResult, AgentStep, AgentUsage, StopReason

Result and step models.

AgentRateLimitExceededError

Raised when the rate limiter blocks a run.

Tool factories (for advanced use)

make_finish_tool, make_rag_tool, make_rag_tool_async, make_sql_tool, make_http_tool, make_memory_tools

Pre-built tool factories used internally; exposed for custom wiring.

ToolExecutor

Low-level sync / async tool runner.

Install

No extra dependencies required for the core agent. HTTP tool requires httpx:

pip install ractogateway[pipelines-agent-http]
class ractogateway.pipelines.agent.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Autonomous ReAct agent with sync run() and async arun().

Parameters:
  • kit (Any) – Any RactoGateway developer kit (must support chat() and achat() methods).

  • tools (list[Callable[..., Any]] | None) – List of plain callables or @tool-decorated functions to register. Each is registered by its __name__ (or __tool_name__ if set by the @tool decorator).

  • rag_pipeline (Any) – Optional RactoRAG instance — auto-registers a rag_search tool.

  • sql_pipeline (Any) – Optional SQLAnalystPipeline — auto-registers a sql_query tool.

  • enable_http (bool) – When True, registers an http_get tool that fetches URLs via httpx (requires pip install ractogateway[pipelines-agent-http]).

  • agent_memory (Any) – Any dict-like or object with get/set methods. Auto-registers memory_read and memory_write tools.

  • max_steps (int) – Hard cap on reasoning steps before the loop stops with StopReason.MAX_STEPS.

  • max_consecutive_errors (int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default 3.

  • tool_retries (int) – How many times to retry a failing tool before reporting the error to the LLM. Default 0 (no retry).

  • max_step_extension (int) – Maximum additional steps the agent may request via request_more_steps. 0 disables the feature.

  • max_parallel_tools (int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch. 1 forces sequential execution. Default 4.

  • system_prompt (str | None) – Fully override the auto-generated system prompt (advanced).

  • extra_rules (str) – Append an extra numbered rule to the default system prompt.

  • safe_mode (bool) – Catch all exceptions and surface them in result.error instead of re-raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Run the agent synchronously until it finishes or hits max_steps.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag passed to tracer spans (informational).

  • response_format (type | None) – A Pydantic BaseModel subclass. When provided, the agent’s final answer is parsed and validated into an instance stored in AgentResult.parsed_output.

Return type:

AgentResult

Returns:

AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.

async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Async variant of run().

Uses kit.achat() for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag for tracing.

  • response_format (type | None) – A Pydantic BaseModel subclass for structured output parsing.

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt sent to the LLM each step (read-only).

class ractogateway.pipelines.agent.AsyncAgentPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of AgentPipeline.

Exposes a single async run() method - suitable for FastAPI endpoints where a sync run() should not be in the public API.

All constructor parameters are identical to AgentPipeline.

async run(goal, **kwargs)[source]

Async-only agent entrypoint. See AgentPipeline.arun().

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt used by the inner pipeline.

class ractogateway.pipelines.agent.AgentResult(**data)[source]

Bases: BaseModel

Full output of an AgentPipeline run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

goal: str

The original user goal / task string.

final_answer: str | None

The agent’s answer when finish() was called (None on max_steps/error).

steps: list[AgentStep]

All reasoning+action steps in execution order.

stop_reason: StopReason

Why the loop terminated.

usage: AgentUsage

Token and step counts.

error: str | None

Exception message when stop_reason is ERROR.

parsed_output: Any

Validated Pydantic model instance when response_format was passed to run(). Not included in JSON serialisation; call result.parsed_output.model_dump() manually.

get_tool_calls()[source]

Return (tool_name, tool_input) for every non-finish step.

Return type:

list[tuple[str, dict[str, Any]]]

get_observations()[source]

Return all tool observations in execution order.

Return type:

list[str]

succeeded()[source]

True when the agent reached a final answer.

Return type:

bool

to_json(path=None, *, indent=2)[source]

Serialise to JSON. Returns string when path is None.

Return type:

str | None

to_markdown(path=None)[source]

Build a step-by-step Markdown trace. Returns string when path is None.

Return type:

str | None

class ractogateway.pipelines.agent.AgentStep(**data)[source]

Bases: BaseModel

One reasoning + action step in the ReAct loop.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

step_num: int

1-based position in the execution sequence.

thought: str | None

The LLM’s reasoning text before selecting a tool.

tool_name: str

Name of the tool that was invoked.

tool_input: dict[str, Any]

Arguments passed to the tool as a flat dict.

observation: str

The tool’s return value (truncated to 4000 chars when long).

duration_ms: float

Wall-clock time for tool execution in milliseconds.

is_finish: bool

True when this step invoked the finish() termination tool.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.agent.AgentUsage(**data)[source]

Bases: BaseModel

Token and step accounting across the full agent run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

total_input_tokens: int

Cumulative input tokens across all LLM calls.

total_output_tokens: int

Cumulative output tokens across all LLM calls.

steps_taken: int

Number of reasoning+action steps executed.

tools_called: int

Number of non-finish tool invocations.

property total_tokens: int

Sum of all input and output tokens.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.agent.StopReason(*values)[source]

Bases: str, Enum

Why the agent loop terminated.

FINISHED = 'finished'
MAX_STEPS = 'max_steps'
ERROR = 'error'
CIRCUIT_BREAK = 'circuit_break'
exception ractogateway.pipelines.agent.AgentRateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter blocks an agent run.

class ractogateway.pipelines.agent.ToolExecutor(tools, max_retries=0)[source]

Bases: object

Runs registered tools by name with sync and async support.

Parameters:
  • tools (dict[str, Callable]) – Mapping of tool name to callable.

  • max_retries (int) – How many times to retry a tool that raises an exception before reporting an error observation to the LLM. Default 0 = no retry.

property names: list[str]

Sorted list of registered tool names.

describe_all()[source]

Build the tools section for the agent system prompt.

Return type:

str

execute(tool_name, tool_input)[source]

Execute tool_name synchronously, retrying up to max_retries times on exception.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute(tool_name, tool_input)[source]

Execute tool_name asynchronously, retrying up to max_retries times on exception.

Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute_parallel(calls)[source]

Execute multiple tool calls concurrently via asyncio.gather.

Parameters:

calls (list[tuple[str, dict[str, Any]]]) – List of (tool_name, tool_input) pairs to run in parallel.

Return type:

list[tuple[str, float]]

Returns:

list[tuple[str, float]] – Results in the same order as calls.

ractogateway.pipelines.agent.make_finish_tool()[source]

Return the always-present finish tool.

When the LLM calls finish(answer=...), the agent loop stops and returns the answer as AgentResult.final_answer.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent.make_http_tool()[source]

Return an http_get tool that fetches URL content via httpx.

Requires httpx: pip install ractogateway[pipelines-agent-http]

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent.make_memory_tools(agent_memory)[source]

Return memory_read and memory_write tools backed by agent_memory.

agent_memory can be any object supporting:

memory.get(key) -> Any
memory.set(key, value) -> None

or a plain dict.

Return type:

list[tuple[str, Callable]]

ractogateway.pipelines.agent.make_rag_tool(rag_pipeline)[source]

Return a rag_search tool backed by a RactoRAG pipeline.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent.make_rag_tool_async(rag_pipeline)[source]

Return an async rag_search tool backed by an async RactoRAG.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent.make_sql_tool(sql_pipeline)[source]

Return a sql_query tool backed by a SQLAnalystPipeline.

Return type:

tuple[str, Callable]

Models for AgentPipeline.

class ractogateway.pipelines.agent._models.StopReason(*values)[source]

Bases: str, Enum

Why the agent loop terminated.

FINISHED = 'finished'
MAX_STEPS = 'max_steps'
ERROR = 'error'
CIRCUIT_BREAK = 'circuit_break'
class ractogateway.pipelines.agent._models.AgentUsage(**data)[source]

Bases: BaseModel

Token and step accounting across the full agent run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

total_input_tokens: int

Cumulative input tokens across all LLM calls.

total_output_tokens: int

Cumulative output tokens across all LLM calls.

steps_taken: int

Number of reasoning+action steps executed.

tools_called: int

Number of non-finish tool invocations.

property total_tokens: int

Sum of all input and output tokens.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.agent._models.AgentStep(**data)[source]

Bases: BaseModel

One reasoning + action step in the ReAct loop.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

step_num: int

1-based position in the execution sequence.

thought: str | None

The LLM’s reasoning text before selecting a tool.

tool_name: str

Name of the tool that was invoked.

tool_input: dict[str, Any]

Arguments passed to the tool as a flat dict.

observation: str

The tool’s return value (truncated to 4000 chars when long).

duration_ms: float

Wall-clock time for tool execution in milliseconds.

is_finish: bool

True when this step invoked the finish() termination tool.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.agent._models.AgentResult(**data)[source]

Bases: BaseModel

Full output of an AgentPipeline run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

goal: str

The original user goal / task string.

final_answer: str | None

The agent’s answer when finish() was called (None on max_steps/error).

steps: list[AgentStep]

All reasoning+action steps in execution order.

stop_reason: StopReason

Why the loop terminated.

usage: AgentUsage

Token and step counts.

error: str | None

Exception message when stop_reason is ERROR.

parsed_output: Any

Validated Pydantic model instance when response_format was passed to run(). Not included in JSON serialisation; call result.parsed_output.model_dump() manually.

get_tool_calls()[source]

Return (tool_name, tool_input) for every non-finish step.

Return type:

list[tuple[str, dict[str, Any]]]

get_observations()[source]

Return all tool observations in execution order.

Return type:

list[str]

succeeded()[source]

True when the agent reached a final answer.

Return type:

bool

to_json(path=None, *, indent=2)[source]

Serialise to JSON. Returns string when path is None.

Return type:

str | None

to_markdown(path=None)[source]

Build a step-by-step Markdown trace. Returns string when path is None.

Return type:

str | None

exception ractogateway.pipelines.agent._models.AgentRateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter blocks an agent run.

Tool registration and execution engine for AgentPipeline.

ToolExecutor wraps a dict of callables and provides:
  • Synchronous execution with timing

  • Async execution (runs sync callables in a thread pool)

  • Parallel async execution via asyncio.gather

  • Human-readable tool descriptions for the system prompt

Built-in tool factories:

make_finish_tool() - Always registered; signals task completion make_rag_tool(rag) - Auto-registered when rag_pipeline is provided make_sql_tool(sql) - Auto-registered when sql_pipeline is provided make_http_tool() - Opt-in; fetches URLs via httpx make_memory_tools(mem) - Auto-registered when agent_memory is provided

class ractogateway.pipelines.agent._executor.ToolExecutor(tools, max_retries=0)[source]

Bases: object

Runs registered tools by name with sync and async support.

Parameters:
  • tools (dict[str, Callable]) – Mapping of tool name to callable.

  • max_retries (int) – How many times to retry a tool that raises an exception before reporting an error observation to the LLM. Default 0 = no retry.

property names: list[str]

Sorted list of registered tool names.

describe_all()[source]

Build the tools section for the agent system prompt.

Return type:

str

execute(tool_name, tool_input)[source]

Execute tool_name synchronously, retrying up to max_retries times on exception.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute(tool_name, tool_input)[source]

Execute tool_name asynchronously, retrying up to max_retries times on exception.

Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute_parallel(calls)[source]

Execute multiple tool calls concurrently via asyncio.gather.

Parameters:

calls (list[tuple[str, dict[str, Any]]]) – List of (tool_name, tool_input) pairs to run in parallel.

Return type:

list[tuple[str, float]]

Returns:

list[tuple[str, float]] – Results in the same order as calls.

ractogateway.pipelines.agent._executor.make_finish_tool()[source]

Return the always-present finish tool.

When the LLM calls finish(answer=...), the agent loop stops and returns the answer as AgentResult.final_answer.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_rag_tool(rag_pipeline)[source]

Return a rag_search tool backed by a RactoRAG pipeline.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_rag_tool_async(rag_pipeline)[source]

Return an async rag_search tool backed by an async RactoRAG.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_sql_tool(sql_pipeline)[source]

Return a sql_query tool backed by a SQLAnalystPipeline.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_http_tool()[source]

Return an http_get tool that fetches URL content via httpx.

Requires httpx: pip install ractogateway[pipelines-agent-http]

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_memory_tools(agent_memory)[source]

Return memory_read and memory_write tools backed by agent_memory.

agent_memory can be any object supporting:

memory.get(key) -> Any
memory.set(key, value) -> None

or a plain dict.

Return type:

list[tuple[str, Callable]]

AgentPipeline - autonomous ReAct agent with pluggable tools.

Implements the Reason+Act (ReAct) loop:

  1. The LLM receives a system prompt listing all registered tools and a growing conversation transcript (goal + previous steps).

  2. The LLM responds with a JSON object — either a single tool call: {"thought": "...", "tool_name": "...", "tool_input": {...}} or a parallel batch: {"thought": "...", "tool_calls": [{"tool_name": "...", "tool_input": {...}}, ...]}

  3. The tool(s) are executed; results (observations) are appended to the transcript.

  4. The loop repeats until the LLM calls finish(answer=...) or max_steps is reached.

Usage:

from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.agent import AgentPipeline

def get_weather(city: str) -> str:
    """Return the current weather for a city."""
    return f"Sunny, 22 C in {city}"

agent = AgentPipeline(
    kit=Chat(model="gpt-4o"),
    tools=[get_weather],
    max_steps=6,
    safe_mode=True,
    max_parallel_tools=4,
)

result = agent.run("What is the weather in Paris and London?")
print(result.final_answer)
print(result.to_markdown())

# Async variant (FastAPI / async servers):
result = await agent.arun("What is the weather in Tokyo?")
class ractogateway.pipelines.agent.pipeline.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Autonomous ReAct agent with sync run() and async arun().

Parameters:
  • kit (Any) – Any RactoGateway developer kit (must support chat() and achat() methods).

  • tools (list[Callable[..., Any]] | None) – List of plain callables or @tool-decorated functions to register. Each is registered by its __name__ (or __tool_name__ if set by the @tool decorator).

  • rag_pipeline (Any) – Optional RactoRAG instance — auto-registers a rag_search tool.

  • sql_pipeline (Any) – Optional SQLAnalystPipeline — auto-registers a sql_query tool.

  • enable_http (bool) – When True, registers an http_get tool that fetches URLs via httpx (requires pip install ractogateway[pipelines-agent-http]).

  • agent_memory (Any) – Any dict-like or object with get/set methods. Auto-registers memory_read and memory_write tools.

  • max_steps (int) – Hard cap on reasoning steps before the loop stops with StopReason.MAX_STEPS.

  • max_consecutive_errors (int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default 3.

  • tool_retries (int) – How many times to retry a failing tool before reporting the error to the LLM. Default 0 (no retry).

  • max_step_extension (int) – Maximum additional steps the agent may request via request_more_steps. 0 disables the feature.

  • max_parallel_tools (int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch. 1 forces sequential execution. Default 4.

  • system_prompt (str | None) – Fully override the auto-generated system prompt (advanced).

  • extra_rules (str) – Append an extra numbered rule to the default system prompt.

  • safe_mode (bool) – Catch all exceptions and surface them in result.error instead of re-raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Run the agent synchronously until it finishes or hits max_steps.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag passed to tracer spans (informational).

  • response_format (type | None) – A Pydantic BaseModel subclass. When provided, the agent’s final answer is parsed and validated into an instance stored in AgentResult.parsed_output.

Return type:

AgentResult

Returns:

AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.

async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Async variant of run().

Uses kit.achat() for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag for tracing.

  • response_format (type | None) – A Pydantic BaseModel subclass for structured output parsing.

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt sent to the LLM each step (read-only).

class ractogateway.pipelines.agent.pipeline.AsyncAgentPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of AgentPipeline.

Exposes a single async run() method - suitable for FastAPI endpoints where a sync run() should not be in the public API.

All constructor parameters are identical to AgentPipeline.

async run(goal, **kwargs)[source]

Async-only agent entrypoint. See AgentPipeline.arun().

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt used by the inner pipeline.