Source code for ractogateway.mcp.agent

"""MCPAgent — agentic tool-execution loop for OpenAI, Google, and Anthropic kits.

``MCPAgent`` bridges the gap between an LLM developer kit and a
:class:`~ractogateway.tools.registry.ToolRegistry` (populated from one or
more MCP servers) by running the full agentic loop automatically:

.. code-block:: text

    LLM call

       ├─ finish_reason == "tool_call"  ──►  execute all tool calls
       │                                         │
       │                                         └──► append results as
       │                                              user follow-up

       └─ finish_reason != "tool_call"  ──►  return final LLMResponse

The loop repeats up to *max_turns* times to prevent infinite recursion.

Works with all three provider developer kits — the kit is duck-typed via a
lightweight :class:`_ChatKitProtocol` so no provider package is needed at
import time.

Usage
-----
::

    from ractogateway.openai_developer_kit import OpenAIDeveloperKit
    from ractogateway.mcp import MCPAgent, MCPClientConfig, RactoMCPClient
    from ractogateway._models.chat import ChatConfig

    # Build a ToolRegistry from an MCP server
    config = MCPClientConfig(transport="stdio", command="python",
                             args=["-m", "my_server"])
    registry = RactoMCPClient(config).list_tools_sync()   # one-shot fetch

    # Or build the registry async:
    # async with RactoMCPClient(config) as c:
    #     registry = await c.to_registry()

    kit   = OpenAIDeveloperKit(model="gpt-4o")
    agent = MCPAgent(kit, registry, max_turns=8)

    response = agent.run(
        ChatConfig(user_message="What is the weather in Tokyo and London?")
    )
    print(response.content)

Same code works for Google and Anthropic::

    from ractogateway.google_developer_kit import GoogleDeveloperKit
    kit = GoogleDeveloperKit(model="gemini-2.0-flash")
    agent = MCPAgent(kit, registry)

    from ractogateway.anthropic_developer_kit import AnthropicDeveloperKit
    kit = AnthropicDeveloperKit(model="claude-opus-4-6")
    agent = MCPAgent(kit, registry)
"""

from __future__ import annotations

import asyncio
import inspect
from typing import Any, Protocol, runtime_checkable

from ractogateway._models.chat import ChatConfig, Message, MessageRole
from ractogateway.adapters.base import FinishReason, LLMResponse, ToolCallResult
from ractogateway.mcp._models import MCPClientConfig
from ractogateway.tools.registry import ToolRegistry

# ---------------------------------------------------------------------------
# Duck-typed kit protocol — no provider SDK imported here
# ---------------------------------------------------------------------------


@runtime_checkable
class _ChatKitProtocol(Protocol):
    """Minimal interface expected of any developer kit."""

    def chat(self, config: ChatConfig) -> LLMResponse:
        """Synchronous chat completion."""
        ...

    async def achat(self, config: ChatConfig) -> LLMResponse:
        """Asynchronous chat completion."""
        ...


# ---------------------------------------------------------------------------
# Tool result formatting
# ---------------------------------------------------------------------------

_TOOL_RESULTS_HEADER = "Tool execution results (use these to answer the user):\n"


def _format_tool_results(
    tool_calls: list[ToolCallResult],
    results: list[str],
) -> str:
    """Format tool call results as a plain-text follow-up user message.

    This provider-agnostic format works with the existing
    :class:`~ractogateway._models.chat.Message` model (``role`` + ``content``)
    across all three developer kits without any schema changes.

    Parameters
    ----------
    tool_calls:
        The tool calls requested by the LLM in the previous turn.
    results:
        String output for each call (parallel ordering with *tool_calls*).

    Returns
    -------
    str
        A human-readable summary that the LLM can consume as a user message.
    """
    lines: list[str] = [_TOOL_RESULTS_HEADER]
    for tc, result in zip(tool_calls, results, strict=True):
        args_repr = ", ".join(f"{k}={v!r}" for k, v in tc.arguments.items())
        lines.append(f"  {tc.name}({args_repr}) → {result}")
    return "\n".join(lines)


def _execute_tool_calls_sync(
    tool_calls: list[ToolCallResult],
    registry: ToolRegistry,
) -> list[str]:
    """Execute tool calls synchronously; return string results."""
    results: list[str] = []
    for tc in tool_calls:
        fn = registry.get_callable(tc.name)
        if fn is None:
            results.append(f"[Error: no callable registered for tool {tc.name!r}]")
            continue
        try:
            raw = fn(**tc.arguments)
            results.append(str(raw))
        except Exception as exc:
            results.append(f"[Error executing {tc.name!r}: {exc}]")
    return results


async def _execute_tool_calls_async(
    tool_calls: list[ToolCallResult],
    registry: ToolRegistry,
) -> list[str]:
    """Execute tool calls (sync or async); return string results."""
    results: list[str] = []
    for tc in tool_calls:
        fn = registry.get_callable(tc.name)
        if fn is None:
            results.append(f"[Error: no callable registered for tool {tc.name!r}]")
            continue
        try:
            if inspect.iscoroutinefunction(fn):
                raw = await fn(**tc.arguments)
            else:
                raw = fn(**tc.arguments)
            results.append(str(raw))
        except Exception as exc:
            results.append(f"[Error executing {tc.name!r}: {exc}]")
    return results


# ---------------------------------------------------------------------------
# MCPAgent
# ---------------------------------------------------------------------------


[docs] class MCPAgent: """Agentic tool-execution loop compatible with all three developer kits. Runs the LLM → tool-call → execute → continue loop automatically, returning the final :class:`~ractogateway.adapters.base.LLMResponse` once the LLM produces a non-tool response or *max_turns* is reached. Parameters ---------- kit: Any developer kit with ``chat()`` / ``achat()`` methods: :class:`~ractogateway.openai_developer_kit.OpenAIDeveloperKit`, :class:`~ractogateway.google_developer_kit.GoogleDeveloperKit`, or :class:`~ractogateway.anthropic_developer_kit.AnthropicDeveloperKit`. registry: Tool registry containing callables for each tool the LLM can call. Typically populated via :meth:`RactoMCPClient.to_registry` or :meth:`MCPMultiClient.to_registry`. max_turns: Maximum number of tool-call rounds before the loop stops and returns the last response. Prevents infinite recursion. Example ------- :: from ractogateway.openai_developer_kit import OpenAIDeveloperKit from ractogateway.mcp import MCPAgent, RactoMCPClient, MCPClientConfig from ractogateway._models.chat import ChatConfig cfg = MCPClientConfig(transport="stdio", command="python", args=["-m", "my_server"]) reg = RactoMCPClient(cfg).list_tools_sync() kit = OpenAIDeveloperKit(model="gpt-4o") agent = MCPAgent(kit, reg, max_turns=6) result = agent.run(ChatConfig(user_message="Search for recent AI papers")) print(result.content) """ def __init__( self, kit: Any, registry: ToolRegistry, *, max_turns: int = 10, ) -> None: if not isinstance(kit, _ChatKitProtocol): raise TypeError( f"kit must implement chat() and achat() methods, got {type(kit).__name__!r}.\n" "Pass an OpenAIDeveloperKit, GoogleDeveloperKit, or " "AnthropicDeveloperKit instance." ) if max_turns < 1: raise ValueError(f"max_turns must be >= 1, got {max_turns}.") self._kit = kit self._registry = registry self._max_turns = max_turns # ------------------------------------------------------------------ # Classmethod constructors # ------------------------------------------------------------------
[docs] @classmethod def from_mcp( cls, kit: Any, configs: list[MCPClientConfig], *, max_turns: int = 10, ) -> MCPAgent: """Build an agent by fetching tools from one or more MCP servers. Opens a **one-shot** connection per config, fetches tools, closes the connection, and constructs the agent with the merged registry. .. note:: This is a **sync** classmethod; it uses :func:`asyncio.run` and therefore **cannot** be called from within a running event loop. In async code, build the registry yourself:: async with MCPMultiClient(configs) as multi: registry = await multi.to_registry() agent = MCPAgent(kit, registry) Parameters ---------- kit: Any RactoGateway developer kit. configs: MCP server connection configs. max_turns: Maximum tool-call rounds. Returns ------- MCPAgent Ready to call :meth:`run` or :meth:`arun`. """ try: asyncio.get_running_loop() except RuntimeError: pass else: raise RuntimeError( "MCPAgent.from_mcp() cannot be called from a running event loop.\n" "Build the registry with 'await MCPMultiClient(configs).to_registry()' " "and pass it to MCPAgent() directly." ) from ractogateway.mcp.multi_client import MCPMultiClient async def _fetch() -> ToolRegistry: async with MCPMultiClient(configs) as multi: return await multi.to_registry() registry = asyncio.run(_fetch()) return cls(kit, registry, max_turns=max_turns)
# ------------------------------------------------------------------ # Sync agentic loop # ------------------------------------------------------------------
[docs] def run(self, config: ChatConfig) -> LLMResponse: """Run the agentic loop synchronously. Injects the tool registry from this agent into *config* (overriding ``config.tools`` if already set). Parameters ---------- config: Initial chat config. ``prompt`` must be set here or on the kit. Returns ------- LLMResponse Final response after tool calls are resolved. """ working_config = config.model_copy(update={"tools": self._registry}) response = self._kit.chat(working_config) for _ in range(self._max_turns): if ( response.finish_reason != FinishReason.TOOL_CALL or not response.tool_calls ): break results = _execute_tool_calls_sync(response.tool_calls, self._registry) tool_msg = _format_tool_results(response.tool_calls, results) new_history = [ *working_config.history, Message(role=MessageRole.ASSISTANT, content=response.content or ""), Message(role=MessageRole.USER, content=tool_msg), ] working_config = working_config.model_copy( update={ "history": new_history, "user_message": "Continue based on the tool results above.", "tools": self._registry, } ) response = self._kit.chat(working_config) return response
# ------------------------------------------------------------------ # Async agentic loop # ------------------------------------------------------------------
[docs] async def arun(self, config: ChatConfig) -> LLMResponse: """Run the agentic loop asynchronously. Supports async tool callables (``async def``); sync callables are called directly. Parameters ---------- config: Initial chat config. Returns ------- LLMResponse Final response after tool calls are resolved. """ working_config = config.model_copy(update={"tools": self._registry}) response = await self._kit.achat(working_config) for _ in range(self._max_turns): if ( response.finish_reason != FinishReason.TOOL_CALL or not response.tool_calls ): break results = await _execute_tool_calls_async( response.tool_calls, self._registry ) tool_msg = _format_tool_results(response.tool_calls, results) new_history = [ *working_config.history, Message(role=MessageRole.ASSISTANT, content=response.content or ""), Message(role=MessageRole.USER, content=tool_msg), ] working_config = working_config.model_copy( update={ "history": new_history, "user_message": "Continue based on the tool results above.", "tools": self._registry, } ) response = await self._kit.achat(working_config) return response
# ------------------------------------------------------------------ # Informational # ------------------------------------------------------------------ @property def registry(self) -> ToolRegistry: """The :class:`ToolRegistry` used by this agent.""" return self._registry @property def max_turns(self) -> int: """Maximum number of tool-call rounds per :meth:`run` call.""" return self._max_turns def __repr__(self) -> str: kit_name = type(self._kit).__name__ return ( f"MCPAgent(kit={kit_name!r}, " f"tools={self._registry.schemas.__len__()}, " f"max_turns={self._max_turns})" )