AgentPipeline
AgentPipeline is an autonomous ReAct (Reason + Act) agent that solves
multi-step tasks by iteratively reasoning, calling tools, and observing
results. It works with any RactoGateway developer kit (OpenAI, Google,
Anthropic, Ollama, HuggingFace) and requires no extra dependencies for the
core loop.
How it works
┌────────────────────────────────────────────────────────┐
│ GOAL │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ LLM receives: │ │
│ │ system prompt (tool list + rules) │ │
│ │ transcript (goal + all prior steps) │ │
│ │ │ │
│ │ LLM outputs: │ │
│ │ {"thought": "…", "tool_name": "…", │ │
│ │ "tool_input": {…}} │ │
│ └──────────────┬───────────────────────────────────┘ │
│ │ │
│ tool_name == "finish"? ──── Yes ───► RESULT │
│ │ No │
│ ▼ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ ToolExecutor.execute(tool_name, tool_input) │ │
│ │ → observation (string, max 4 000 chars) │ │
│ └──────────────┬───────────────────────────────────┘ │
│ │ │
│ steps < max_steps? ──── No ────► MAX_STEPS │
│ │ Yes │
│ └──────────────► repeat │
└────────────────────────────────────────────────────────┘
The LLM never sees raw function signatures — every tool is described in plain English inside the system prompt, making the pattern provider-agnostic.
Installation
# Core agent — no extra deps
pip install ractogateway
# Optional: http_get tool (fetches URLs)
pip install ractogateway[pipelines-agent-http]
Quickstart
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.agent import AgentPipeline
# 1. Define your tools as plain Python functions
def get_weather(city: str) -> str:
"""Return the current weather for a city."""
# In real code: call a weather API here
return f"Sunny, 22 °C in {city}"
def convert_currency(amount: float, from_cur: str, to_cur: str) -> str:
"""Convert an amount between two currency codes (e.g. USD, EUR)."""
rates = {"USD_EUR": 0.92, "EUR_USD": 1.09}
rate = rates.get(f"{from_cur}_{to_cur}", 1.0)
return f"{amount * rate:.2f} {to_cur}"
# 2. Build the agent
agent = AgentPipeline(
kit=Chat(model="gpt-4o-mini"),
tools=[get_weather, convert_currency],
max_steps=8,
safe_mode=True,
)
# 3. Run!
result = agent.run("What is the weather in Paris, and convert 100 USD to EUR?")
print(result.final_answer)
print(f"Steps taken: {result.usage.steps_taken}")
print(f"Tokens used: {result.usage.total_tokens}")
print(result.to_markdown()) # Full step trace
Async usage (FastAPI, async servers)
# Sync + async in the same class:
result = await agent.arun("What is 6 * 7?")
# Or use AsyncAgentPipeline for async-only servers:
from ractogateway.pipelines.agent import AsyncAgentPipeline
agent = AsyncAgentPipeline(
kit=Chat(model="gpt-4o-mini"),
tools=[get_weather],
)
# FastAPI endpoint:
@app.post("/ask")
async def ask(question: str) -> dict:
result = await agent.run(question)
return {"answer": result.final_answer, "steps": len(result.steps)}
Constructor reference
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
Any kit |
— |
RactoGateway developer kit |
|
|
|
Plain Python callables to register |
|
|
|
Auto-registers a |
|
|
|
Auto-registers a |
|
|
|
Register |
|
dict-like |
|
Auto-registers |
|
|
|
Hard cap on tool calls |
|
|
|
Fully override the generated prompt |
|
|
|
Append a custom rule line |
|
|
|
Return error instead of raising |
|
|
|
OpenTelemetry tracing |
|
middleware |
|
Prometheus metrics |
|
duck-typed |
|
|
|
|
|
Default rate-limiter user |
run() and arun() accept per-call overrides: max_steps, user_id, session_id.
Built-in tools
Tool |
Registered when |
Description |
|---|---|---|
|
Always |
Signals task completion; sets |
|
|
Searches a |
|
|
Natural-language SQL via |
|
|
Fetches URL content (requires |
|
|
Reads a key from agent memory |
|
|
Writes a key to agent memory |
Writing custom tools
Any Python callable becomes a tool. The docstring’s first line is shown to the LLM as the tool’s description, so make it descriptive:
def search_products(query: str, max_results: int = 5) -> str:
"""Search the product catalogue and return matching item names and prices."""
# … your implementation …
return "\n".join(f"{p['name']}: ${p['price']}" for p in results[:max_results])
Async tools work too — ToolExecutor awaits them when arun() is used:
async def get_stock_price(ticker: str) -> str:
"""Fetch the latest stock price for a ticker symbol (e.g. AAPL)."""
async with httpx.AsyncClient() as client:
r = await client.get(f"https://api.example.com/price/{ticker}")
return r.json()["price"]
The @tool decorator from ractogateway.tools sets __tool_name__ to
control the name exposed to the LLM:
from ractogateway.tools import tool
@tool(name="lookup_customer")
def get_customer_by_id(customer_id: str) -> str:
"""Look up a customer record by their unique ID."""
...
Plugging in RAG and SQL
from ractogateway.rag import RactoRAG
from ractogateway.pipelines import SQLAnalystPipeline, AgentPipeline
from ractogateway.openai_developer_kit import Chat
kit = Chat(model="gpt-4o")
# RAG pipeline (documents already indexed)
rag = RactoRAG(...)
# SQL pipeline (connected to a database)
sql = SQLAnalystPipeline(
kit=kit,
connection_string="postgresql://user:pass@localhost/sales",
)
# Agent that can search docs AND query the database
agent = AgentPipeline(
kit=kit,
rag_pipeline=rag,
sql_pipeline=sql,
max_steps=12,
)
result = agent.run(
"What were the top 3 products in Q1, and summarise what the docs say about them?"
)
print(result.final_answer)
Memory persistence between turns
Use agent_memory for short-term key-value storage within a single session,
or plug in a Redis-backed store for multi-session persistence:
from ractogateway.redis import RedisChatMemory # for multi-session
from ractogateway.pipelines.agent import AgentPipeline
# Simple dict memory (in-process)
memory = {}
agent = AgentPipeline(
kit=kit,
tools=[get_weather],
agent_memory=memory,
max_steps=10,
)
# The agent can now call memory_write / memory_read between steps
result = agent.run("Remember that Alice prefers Celsius, then tell me her preferred unit.")
Rate limiting
from ractogateway.redis import RedisRateLimiter, RateLimitConfig
limiter = RedisRateLimiter(
config=RateLimitConfig(max_tokens_per_minute=100),
url="redis://localhost:6379",
)
agent = AgentPipeline(
kit=kit,
tools=[get_weather],
rate_limiter=limiter,
user_id="tenant_42",
)
# Per-call override:
result = agent.run("Task", user_id="tenant_99")
Reading the result
result = agent.run("What is the GDP of Germany?")
# Basic
print(result.final_answer) # "The GDP of Germany is ~4.1 trillion USD."
print(result.succeeded()) # True
print(result.stop_reason) # StopReason.FINISHED
print(result.usage.total_tokens) # 842
# Steps
for step in result.steps:
print(f"[{step.step_num}] {step.tool_name}: {step.thought}")
# Aggregated helpers
tool_calls = result.get_tool_calls() # [(name, input), ...]
observations = result.get_observations() # ["obs1", "obs2", ...]
# Export
json_str = result.to_json() # JSON string
result.to_json("run_result.json") # write to file
md_str = result.to_markdown() # Markdown trace string
result.to_markdown("run_trace.md") # write to file
Sample Markdown output:
# Agent Run
**Goal:** What is the weather in Paris?
**Status:** `finished` | **Steps:** 2 | **Tokens:** 312
---
## Step 1: `get_weather`
**Thought:** I need to call get_weather for Paris.
**Input:**
```json
{
"city": "Paris"
}
Observation: (12 ms)
Sunny, 22 °C in Paris
FINISH: finish
Thought: I have the weather data.
Input:
{
"answer": "It is sunny and 22 °C in Paris."
}
Final Answer
It is sunny and 22 °C in Paris.
---
## Telemetry
```python
from ractogateway.telemetry import RactoTracer
tracer = RactoTracer(otlp_endpoint="http://localhost:4317")
agent = AgentPipeline(kit=kit, tools=[get_weather], tracer=tracer)
result = agent.run("Weather in Tokyo?")
Each step emits a span named agent_step with attributes:
step_num, tool_name, duration_ms, input_tokens, output_tokens.
Error handling
Scenario |
|
|
|---|---|---|
LLM raises |
exception propagates |
|
Unknown tool |
observation contains |
same |
Tool raises |
observation contains |
same |
Rate limited |
|
same (not caught) |
Max steps hit |
|
same |
Industrial use cases
Industry |
Example goal |
|---|---|
E-commerce |
“Find the 3 cheapest in-stock laptops and compare specs” |
Finance |
“Summarise Q3 earnings from the docs and pull revenue from DB” |
Healthcare |
“Check patient record, retrieve drug interactions, and draft a note” |
DevOps |
“Check which services are down and fetch their last error logs” |
Legal |
“Search case law for relevant precedents on data-privacy breaches” |
Customer support |
“Identify the customer’s issue, check their order, and draft a reply” |