Source code for ractogateway.anthropic_developer_kit.kit

"""Anthropic Claude Developer Kit — production-grade Claude interface.

Usage::

    from ractogateway import anthropic_developer_kit as anth

    kit = anth.AnthropicDeveloperKit(model="claude-sonnet-4-5-20250929", default_prompt=my_prompt)
    response = kit.chat(anth.ChatConfig(user_message="Hello"))

    for chunk in kit.stream(anth.ChatConfig(user_message="Hello")):
        print(chunk.delta.text, end="", flush=True)

Note: Anthropic does NOT have a native embedding API.  Use OpenAI or
Google kits for embeddings.
"""

from __future__ import annotations

import json as _json
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any

from ractogateway._models.chat import ChatConfig
from ractogateway._models.stream import StreamChunk, StreamDelta
from ractogateway.adapters.anthropic_kit import AnthropicLLMKit
from ractogateway.adapters.base import FinishReason, LLMResponse, ToolCallResult
from ractogateway.prompts.engine import RactoPrompt


def _require_anthropic() -> Any:
    try:
        import anthropic
    except ImportError as exc:
        raise ImportError(
            "The 'anthropic' package is required for AnthropicDeveloperKit. "
            "Install it with:  pip install ractogateway[anthropic]"
        ) from exc
    return anthropic


[docs] class AnthropicDeveloperKit: """Complete Anthropic Claude developer kit — chat and streaming. Parameters ---------- model: Claude model (e.g. ``"claude-sonnet-4-5-20250929"``, ``"claude-opus-4-6"``). api_key: Anthropic API key. Falls back to ``ANTHROPIC_API_KEY`` env var. default_prompt: RACTO prompt used when ``ChatConfig.prompt`` is ``None``. """ provider: str = "anthropic" def __init__( self, model: str = "claude-sonnet-4-5-20250929", *, api_key: str | None = None, default_prompt: RactoPrompt | None = None, ) -> None: self._model = model self._api_key = api_key self._default_prompt = default_prompt self._adapter = AnthropicLLMKit(model=model, api_key=api_key) # ------------------------------------------------------------------ # Client factories # ------------------------------------------------------------------ def _sync_client(self) -> Any: anthropic = _require_anthropic() key = self._api_key or os.environ.get("ANTHROPIC_API_KEY") kw: dict[str, Any] = {"api_key": key} if key else {} return anthropic.Anthropic(**kw) def _async_client(self) -> Any: anthropic = _require_anthropic() key = self._api_key or os.environ.get("ANTHROPIC_API_KEY") kw: dict[str, Any] = {"api_key": key} if key else {} return anthropic.AsyncAnthropic(**kw) def _resolve_prompt(self, config: ChatConfig) -> RactoPrompt: prompt = config.prompt or self._default_prompt if prompt is None: raise ValueError( "No prompt in ChatConfig and no default_prompt on the kit. Set one of them." ) return prompt # ------------------------------------------------------------------ # Chat (sync / async) # ------------------------------------------------------------------
[docs] def chat(self, config: ChatConfig) -> LLMResponse: """Synchronous chat completion.""" prompt = self._resolve_prompt(config) response = self._adapter.run( prompt, config.user_message, tools=config.tools, temperature=config.temperature, max_tokens=config.max_tokens, **config.extra, ) return _maybe_validate(response, config)
[docs] async def achat(self, config: ChatConfig) -> LLMResponse: """Async chat completion.""" prompt = self._resolve_prompt(config) response = await self._adapter.arun( prompt, config.user_message, tools=config.tools, temperature=config.temperature, max_tokens=config.max_tokens, **config.extra, ) return _maybe_validate(response, config)
# ------------------------------------------------------------------ # Stream (sync / async) # ------------------------------------------------------------------
[docs] def stream(self, config: ChatConfig) -> Iterator[StreamChunk]: """Synchronous streaming via Anthropic's ``messages.stream()``. Example:: for chunk in kit.stream(config): print(chunk.delta.text, end="", flush=True) if chunk.is_final: print(f"\\nTokens: {chunk.usage}") """ prompt = self._resolve_prompt(config) client = self._sync_client() request = self._adapter._build_request( prompt, config.user_message, tools=config.tools, temperature=config.temperature, max_tokens=config.max_tokens, **config.extra, ) accumulated = "" tc_acc: dict[int, dict[str, Any]] = {} finish_reason = FinishReason.STOP usage: dict[str, int] = {} with client.messages.stream(**request) as stream_resp: for event in stream_resp: chunk = self._process_anthropic_event( event, accumulated, tc_acc, finish_reason, usage, ) if chunk is not None: accumulated = chunk.accumulated_text if chunk.finish_reason is not None: finish_reason = chunk.finish_reason if chunk.usage: usage = chunk.usage yield chunk
[docs] async def astream(self, config: ChatConfig) -> AsyncIterator[StreamChunk]: """Async streaming via Anthropic's async ``messages.stream()``.""" prompt = self._resolve_prompt(config) client = self._async_client() request = self._adapter._build_request( prompt, config.user_message, tools=config.tools, temperature=config.temperature, max_tokens=config.max_tokens, **config.extra, ) accumulated = "" tc_acc: dict[int, dict[str, Any]] = {} finish_reason = FinishReason.STOP usage: dict[str, int] = {} async with client.messages.stream(**request) as stream_resp: async for event in stream_resp: chunk = self._process_anthropic_event( event, accumulated, tc_acc, finish_reason, usage, ) if chunk is not None: accumulated = chunk.accumulated_text if chunk.finish_reason is not None: finish_reason = chunk.finish_reason if chunk.usage: usage = chunk.usage yield chunk
# ------------------------------------------------------------------ # Internal — Anthropic stream event processing # ------------------------------------------------------------------ @staticmethod def _process_anthropic_event( event: Any, accumulated: str, tc_acc: dict[int, dict[str, Any]], finish_reason: FinishReason, usage: dict[str, int], ) -> StreamChunk | None: etype = event.type if etype == "content_block_start": block = event.content_block if block.type == "tool_use": tc_acc[event.index] = { "id": block.id, "name": block.name, "args": "", } return StreamChunk( accumulated_text=accumulated, raw=event, ) if etype == "content_block_delta": delta = event.delta if delta.type == "text_delta": text = delta.text accumulated += text return StreamChunk( delta=StreamDelta(text=text), accumulated_text=accumulated, raw=event, ) if delta.type == "input_json_delta": idx = event.index if idx in tc_acc: tc_acc[idx]["args"] += delta.partial_json return StreamChunk( delta=StreamDelta( tool_call_args_fragment=delta.partial_json, ), accumulated_text=accumulated, raw=event, ) return None if etype == "message_delta": stop_reason = getattr(event.delta, "stop_reason", None) fr = AnthropicLLMKit._map_finish_reason(stop_reason) u: dict[str, int] = {} if hasattr(event, "usage") and event.usage: inp = getattr(event.usage, "input_tokens", 0) or 0 out = getattr(event.usage, "output_tokens", 0) or 0 u = { "prompt_tokens": inp, "completion_tokens": out, "total_tokens": inp + out, } return StreamChunk( accumulated_text=accumulated, finish_reason=fr, usage=u, raw=event, ) if etype == "message_stop": return StreamChunk( accumulated_text=accumulated, finish_reason=finish_reason, tool_calls=_flush_tool_calls(tc_acc), usage=usage, is_final=True, raw=event, ) return None
# ====================================================================== # Module-level helpers # ====================================================================== def _flush_tool_calls(acc: dict[int, dict[str, Any]]) -> list[ToolCallResult]: results: list[ToolCallResult] = [] for entry in acc.values(): try: args = _json.loads(entry["args"]) if entry["args"] else {} except _json.JSONDecodeError: args = {"_raw": entry["args"]} results.append( ToolCallResult(id=entry["id"], name=entry["name"], arguments=args), ) return results def _maybe_validate(response: LLMResponse, config: ChatConfig) -> LLMResponse: if config.response_model is not None and isinstance(response.parsed, dict): try: validated = config.response_model.model_validate(response.parsed) response.parsed = validated.model_dump() except Exception as exc: warning = f"[RactoGateway] response_model validation failed: {exc}" response.content = f"{response.content}\n\n{warning}" if response.content else warning return response