ractogateway.pipelines.agent._executor

Tool registration and execution engine for AgentPipeline.

ToolExecutor wraps a dict of callables and provides:
  • Synchronous execution with timing

  • Async execution (runs sync callables in a thread pool)

  • Parallel async execution via asyncio.gather

  • Human-readable tool descriptions for the system prompt

Built-in tool factories:

make_finish_tool() - Always registered; signals task completion make_rag_tool(rag) - Auto-registered when rag_pipeline is provided make_sql_tool(sql) - Auto-registered when sql_pipeline is provided make_http_tool() - Opt-in; fetches URLs via httpx make_memory_tools(mem) - Auto-registered when agent_memory is provided

class ractogateway.pipelines.agent._executor.ToolExecutor(tools, max_retries=0)[source]

Bases: object

Runs registered tools by name with sync and async support.

Parameters:
  • tools (dict[str, Callable]) – Mapping of tool name to callable.

  • max_retries (int) – How many times to retry a tool that raises an exception before reporting an error observation to the LLM. Default 0 = no retry.

property names: list[str]

Sorted list of registered tool names.

describe_all()[source]

Build the tools section for the agent system prompt.

Return type:

str

execute(tool_name, tool_input)[source]

Execute tool_name synchronously, retrying up to max_retries times on exception.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute(tool_name, tool_input)[source]

Execute tool_name asynchronously, retrying up to max_retries times on exception.

Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute_parallel(calls)[source]

Execute multiple tool calls concurrently via asyncio.gather.

Parameters:

calls (list[tuple[str, dict[str, Any]]]) – List of (tool_name, tool_input) pairs to run in parallel.

Return type:

list[tuple[str, float]]

Returns:

list[tuple[str, float]] – Results in the same order as calls.

ractogateway.pipelines.agent._executor.make_finish_tool()[source]

Return the always-present finish tool.

When the LLM calls finish(answer=...), the agent loop stops and returns the answer as AgentResult.final_answer.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_rag_tool(rag_pipeline)[source]

Return a rag_search tool backed by a RactoRAG pipeline.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_rag_tool_async(rag_pipeline)[source]

Return an async rag_search tool backed by an async RactoRAG.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_sql_tool(sql_pipeline)[source]

Return a sql_query tool backed by a SQLAnalystPipeline.

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_http_tool()[source]

Return an http_get tool that fetches URL content via httpx.

Requires httpx: pip install ractogateway[pipelines-agent-http]

Return type:

tuple[str, Callable]

ractogateway.pipelines.agent._executor.make_memory_tools(agent_memory)[source]

Return memory_read and memory_write tools backed by agent_memory.

agent_memory can be any object supporting:

memory.get(key) -> Any
memory.set(key, value) -> None

or a plain dict.

Return type:

list[tuple[str, Callable]]