ractogateway

RactoGateway - unified AI SDK with anti-hallucination prompting via RACTO.

The package surface stays import-friendly while deferring heavy imports until a specific symbol is accessed.

class ractogateway.MCPAgent(kit, registry, *, max_turns=10)[source]

Bases: object

Agentic tool-execution loop compatible with all three developer kits.

Runs the LLM → tool-call → execute → continue loop automatically, returning the final LLMResponse once the LLM produces a non-tool response or max_turns is reached.

Parameters:
  • kit (Any) – Any developer kit with chat() / achat() methods: OpenAIDeveloperKit, GoogleDeveloperKit, or AnthropicDeveloperKit.

  • registry (ToolRegistry) – Tool registry containing callables for each tool the LLM can call. Typically populated via RactoMCPClient.to_registry() or MCPMultiClient.to_registry().

  • max_turns (int) – Maximum number of tool-call rounds before the loop stops and returns the last response. Prevents infinite recursion.

Example

from ractogateway.openai_developer_kit import OpenAIDeveloperKit
from ractogateway.mcp import MCPAgent, RactoMCPClient, MCPClientConfig
from ractogateway._models.chat import ChatConfig

cfg    = MCPClientConfig(transport="stdio", command="python",
                         args=["-m", "my_server"])
reg    = RactoMCPClient(cfg).list_tools_sync()
kit    = OpenAIDeveloperKit(model="gpt-4o")
agent  = MCPAgent(kit, reg, max_turns=6)
result = agent.run(ChatConfig(user_message="Search for recent AI papers"))
print(result.content)
classmethod from_mcp(kit, configs, *, max_turns=10)[source]

Build an agent by fetching tools from one or more MCP servers.

Opens a one-shot connection per config, fetches tools, closes the connection, and constructs the agent with the merged registry.

Note

This is a sync classmethod; it uses asyncio.run() and therefore cannot be called from within a running event loop. In async code, build the registry yourself:

async with MCPMultiClient(configs) as multi:
    registry = await multi.to_registry()
agent = MCPAgent(kit, registry)
Parameters:
  • kit (Any) – Any RactoGateway developer kit.

  • configs (list[MCPClientConfig]) – MCP server connection configs.

  • max_turns (int) – Maximum tool-call rounds.

Return type:

MCPAgent

Returns:

MCPAgent – Ready to call run() or arun().

run(config)[source]

Run the agentic loop synchronously.

Injects the tool registry from this agent into config (overriding config.tools if already set).

Parameters:

config (ChatConfig) – Initial chat config. prompt must be set here or on the kit.

Return type:

LLMResponse

Returns:

LLMResponse – Final response after tool calls are resolved.

async arun(config)[source]

Run the agentic loop asynchronously.

Supports async tool callables (async def); sync callables are called directly.

Parameters:

config (ChatConfig) – Initial chat config.

Return type:

LLMResponse

Returns:

LLMResponse – Final response after tool calls are resolved.

property registry: ToolRegistry

The ToolRegistry used by this agent.

property max_turns: int

Maximum number of tool-call rounds per run() call.

class ractogateway.MCPClientConfig(**data)[source]

Bases: BaseModel

Configuration for connecting to an MCP server.

Parameters:
  • transport (Literal['stdio', 'sse', 'streamable-http']) – MCP transport to use. "stdio" is standard for subprocess-based servers (e.g. Claude Desktop). "sse" / "streamable-http" are for HTTP-based servers.

  • command (str | None) – Executable to launch the server process — required for stdio.

  • args (list[str]) – Command-line arguments passed to command (stdio only).

  • env (dict[str, str]) – Extra environment variables injected into the server process (stdio only). Merged on top of the inherited environment.

  • url (str | None) – Server URL — required for sse / streamable-http. Example: "http://localhost:8000/sse".

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

transport: Literal['stdio', 'sse', 'streamable-http']
command: str | None
args: list[str]
env: dict[str, str]
url: str | None
model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.MCPMultiClient(configs)[source]

Bases: object

Connect to multiple MCP servers and present them as a single tool surface.

Tools from all servers are merged into one flat namespace. If two servers advertise the same tool name, the later server’s definition wins (and a warning is embedded in the tool description noting the override).

Routing is O(1): an internal dict[tool_name server_index] maps each tool back to its origin server for call_tool dispatch.

Parameters:

configs (list[MCPClientConfig]) – One MCPClientConfig per server. At least one config is required.

async list_tools()[source]

Return the merged list of tool schemas from all servers.

Return type:

list[ToolSchema]

Returns:

list[ToolSchema] – Deduplicated (last-server-wins) tool schemas sorted by name.

async call_tool(name, arguments=None)[source]

Call a tool on whichever server originally advertised it.

Routing is O(1) via the internal tool_name server_index map.

Parameters:
  • name (str) – Tool name (must exist in the merged namespace).

  • arguments (dict[str, Any] | None) – Tool arguments; None or {} for parameterless tools.

Return type:

MCPToolResult

Returns:

MCPToolResult – Tool output.

Raises:
  • KeyError – If name is not in the merged tool namespace.

  • RuntimeError – If called outside an async with block.

async to_registry()[source]

Return a merged ToolRegistry with remote callables.

Each callable in the registry makes a fresh one-shot connection to the correct origin server when invoked. This keeps the registry self-contained and usable outside an async with block.

Return type:

ToolRegistry

Returns:

ToolRegistry – Merged registry compatible with all three developer kits.

list_tools_sync()[source]

Synchronous wrapper: connect all, list merged tools, disconnect all.

Raises:

RuntimeError – If called from within a running event loop.

Return type:

list[ToolSchema]

call_tool_sync(name, arguments=None)[source]

Synchronous wrapper: connect all, call tool, disconnect all.

Raises:

RuntimeError – If called from within a running event loop.

Return type:

MCPToolResult

property tool_names: list[str]

Sorted list of all tool names across all servers.

property server_count: int

Number of configured MCP servers.

class ractogateway.MCPServerConfig(**data)[source]

Bases: BaseModel

Configuration for a RactoMCPServer instance.

Parameters:
  • name (str) – Server name shown to MCP clients (e.g. Claude Desktop).

  • description (str) – Optional human-readable description of the server.

  • version (str) – Server version string (SemVer recommended).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

name: str
description: str
version: str
model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.MCPToolResult(**data)[source]

Bases: BaseModel

Result returned from calling a remote MCP tool.

Parameters:
  • content (str) – Text content of the tool response. Multiple content blocks (e.g. from parallel calls) are joined with "\n".

  • is_error (bool) – True when the tool itself reported an error condition.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

content: str
is_error: bool
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.RactoMCPClient(config)[source]

Bases: object

Connect to an MCP server and consume its tools as ToolSchema objects.

This is an async context manager. Keep it alive to reuse the underlying connection for multiple tool calls (O(1) per call after connection setup). For single calls from synchronous code, use the *_sync() convenience methods.

Parameters:
  • config (MCPClientConfig) – Connection configuration (transport, command / URL, env, …).

  • (recommended) (Example — async)

  • ------------------------------

  • ::

    config = MCPClientConfig(transport=”stdio”, command=”python”,

    args=[“-m”, “my_server”])

    async with RactoMCPClient(config) as client:

    # Reuse this connection for all calls. tools = await client.list_tools() result = await client.call_tool(“search”, {“query”: “AI”}) registry = await client.to_registry()

  • one-shot (Example — sync)

  • -----------------------

  • :: – client = RactoMCPClient(config) tools = client.list_tools_sync()

async list_tools()[source]

List all tools exposed by the MCP server.

Return type:

list[ToolSchema]

Returns:

list[ToolSchema] – Provider-agnostic tool schemas — ready to be registered in any ToolRegistry or passed directly to a developer kit via ChatConfig(tools=…).

Raises:

RuntimeError – If called outside an async with block.

async call_tool(name, arguments=None)[source]

Call a remote MCP tool.

Parameters:
  • name (str) – Tool name (must exist on the server).

  • arguments (dict[str, Any] | None) – Keyword arguments to pass to the tool. Pass None or {} for tools with no parameters.

Return type:

MCPToolResult

Returns:

MCPToolResultcontent contains all text blocks joined by "\n". is_error is True when the server signals a tool error.

Raises:

RuntimeError – If called outside an async with block.

async to_registry()[source]

Return a ToolRegistry populated with all server tools.

Each callable in the registry makes a fresh one-shot MCP connection when invoked. This keeps the returned registry self-contained and usable outside an async with block.

For high-throughput usage, hold the RactoMCPClient context manager alive and call call_tool() directly.

Return type:

ToolRegistry

Returns:

ToolRegistry – Registry compatible with all three developer kits via ChatConfig(tools=registry).

list_tools_sync()[source]

Synchronous wrapper: connect, list tools, disconnect.

Return type:

list[ToolSchema]

Returns:

list[ToolSchema] – All tool schemas exposed by the server.

Raises:

RuntimeError – If called from within a running event loop.

call_tool_sync(name, arguments=None)[source]

Synchronous wrapper: connect, call tool, disconnect.

Parameters:
Return type:

MCPToolResult

Returns:

MCPToolResult – Tool output.

Raises:

RuntimeError – If called from within a running event loop.

class ractogateway.RactoMCPServer(name, *, description='', version='0.1.0')[source]

Bases: object

Expose a ToolRegistry (or individual functions) as a Model Context Protocol server.

Supported transports

  • stdio (default) — standard I/O; ideal for Claude Desktop and any subprocess-based MCP client.

  • sse — HTTP Server-Sent Events; for remote / browser-based clients. Requires pip install ractogateway[mcp-sse].

type name:

str

param name:

Server name visible to MCP clients.

type description:

str

param description:

Optional human-readable description.

type version:

str

param version:

Server version string.

Example

from ractogateway import ToolRegistry
from ractogateway.mcp import RactoMCPServer

registry = ToolRegistry()

@registry.register
def add(a: int, b: int) -> int:
    '''Add two integers.'''
    return a + b

server = RactoMCPServer.from_registry(registry, name="math-tools")
server.run()  # stdio, blocking
classmethod from_registry(registry, *, name='ractogateway-server', description='RactoGateway MCP Server', version='0.1.0')[source]

Build a server from a populated ToolRegistry.

Every registered callable becomes an MCP tool. Tools registered only as Pydantic models (no backing callable) are silently skipped.

Parameters:
  • registry (ToolRegistry) – A ToolRegistry with one or more registered tools.

  • name (str) – MCP server name shown to clients.

  • description (str) – Optional server description.

  • version (str) – Server version string.

Return type:

RactoMCPServer

Returns:

RactoMCPServer – Ready-to-run server instance.

add_tool(fn, *, name=None, description=None)[source]

Register a single function as an MCP tool.

The function’s type annotations drive the JSON Schema; its docstring provides the description (overridable via description).

Parameters:
  • fn (Callable[..., Any]) – The callable to expose. Both sync and async functions are supported.

  • name (str | None) – Override the tool name (defaults to fn.__name__).

  • description (str | None) – Override the tool description (defaults to the docstring).

Return type:

None

run(transport='stdio', *, host='0.0.0.0', port=8000)[source]

Start the MCP server (blocking).

Parameters:
  • transport (Literal['stdio', 'sse']) – "stdio" — standard I/O (default; integrates with Claude Desktop and subprocess clients). "sse" — HTTP Server-Sent Events (requires pip install ractogateway[mcp-sse]).

  • host (str) – Bind host for SSE transport.

  • port (int) – Bind port for SSE transport.

Return type:

None

get_asgi_app()[source]

Return a Starlette ASGI app for SSE transport.

Use this to mount the MCP server into an existing web application rather than starting a standalone server with run().

Requires pip install ractogateway[mcp-sse].

Return type:

Any

Example

import uvicorn
from ractogateway.mcp import RactoMCPServer

server = RactoMCPServer.from_registry(registry, name="tools")
app = server.get_asgi_app()
uvicorn.run(app, host="0.0.0.0", port=8000)
property tool_names: list[str]

Sorted list of registered tool names.

exception ractogateway.RactoGatewayError[source]

Bases: Exception

Base class for every RactoGateway runtime error.

exception ractogateway.RactoGatewayTimeoutError[source]

Bases: RactoGatewayError

The upstream provider did not respond within the allowed time.

exception ractogateway.RactoGatewayAPIError(message, *, status_code=None)[source]

Bases: RactoGatewayError

The upstream provider returned an error response.

status_code

HTTP status code returned by the provider, when available.

exception ractogateway.RactoGatewayAuthError[source]

Bases: RactoGatewayError

API key missing, invalid, or not authorised for the requested resource.

exception ractogateway.ResponseModelValidationError(message, *, attempts, last_error, raw_response=None)[source]

Bases: RactoGatewayError

Raised when response_model validation cannot be satisfied.

The LLM returned structurally valid JSON but Pydantic rejected it (e.g. a field value was out of range) and all automatic retry attempts were exhausted.

attempts

Total number of API calls made (1 initial + N retries).

last_error

The final pydantic.ValidationError that triggered this exception.

raw_response

The LLM’s raw text from the last attempt, available for inspection or manual recovery.

Example::

from ractogateway.exceptions import ResponseModelValidationError

try:

response = kit.chat(config)

except ResponseModelValidationError as e:

print(e.last_error) # Pydantic ValidationError details print(e.raw_response) # raw JSON string from the LLM print(e.attempts) # how many times we tried

class ractogateway.AnthropicFineTuner(api_key=None)[source]

Bases: object

Fine-tune Anthropic Claude models using the fine-tuning API.

Parameters:

api_key (str | None) – Anthropic API key. Falls back to the ANTHROPIC_API_KEY environment variable when not supplied.

Examples

End-to-end pipeline:

from ractogateway.finetune import RactoDataset, AnthropicFineTuner

ds = RactoDataset.from_pairs(
    [("Summarise this: …", "The text discusses…")],
    system="You are a concise summariser.",
)
tuner = AnthropicFineTuner()
model = tuner.run_pipeline(ds, model="claude-3-haiku-20240307")
print(model)   # "claude-3-haiku-20240307:ft:org-xxx:suffix:abc123"
upload_dataset(dataset)[source]

Upload dataset as an Anthropic training file.

Parameters:

dataset (RactoDataset) – The training examples to upload.

Return type:

str

Returns:

str – The Anthropic file ID used in create_job().

create_job(training_file, model='claude-3-haiku-20240307', *, validation_file=None, suffix=None, hyperparameters=None)[source]

Submit a fine-tuning job.

Parameters:
  • training_file (str) – File ID returned by upload_dataset().

  • model (str) – Base Claude model to fine-tune.

  • validation_file (str | None) – Optional validation file ID.

  • suffix (str | None) – Short label appended to the fine-tuned model name.

  • hyperparameters (dict[str, Any] | None) – Optional overrides, e.g. {"n_epochs": 3}.

Return type:

str

Returns:

str – The fine-tuning job ID.

get_status(job_id)[source]

Retrieve the current status of a fine-tuning job.

Return type:

dict[str, Any]

Returns:

dict – Keys: id, status, model, fine_tuned_model, created_at, finished_at, error.

list_jobs(limit=10)[source]

Return the most recent fine-tuning jobs (newest first).

Return type:

list[dict[str, Any]]

cancel_job(job_id)[source]

Cancel a running fine-tuning job.

Return type:

dict[str, Any]

wait_for_completion(job_id, *, poll_interval=60, verbose=True)[source]

Block until a fine-tuning job finishes.

Parameters:
  • job_id (str) – The job ID returned by create_job().

  • poll_interval (int) – Seconds between status-check API calls.

  • verbose (bool) – Print status lines to stdout.

Return type:

str

Returns:

str – Fine-tuned model name — pass directly to AnthropicDeveloperKit(model=...).

Raises:

RuntimeError – If the job ends in "failed" or "cancelled" state.

run_pipeline(dataset, model='claude-3-haiku-20240307', *, validation_dataset=None, suffix=None, hyperparameters=None, poll_interval=60, verbose=True)[source]

Validate → upload → train → wait in a single call.

Parameters:
  • dataset (RactoDataset) – Training examples.

  • model (str) – Base Claude model to fine-tune.

  • validation_dataset (RactoDataset | None) – Optional held-out validation set.

  • suffix (str | None) – Short label appended to the fine-tuned model name.

  • hyperparameters (dict[str, Any] | None) – Optional overrides, e.g. {"n_epochs": 3}.

  • poll_interval (int) – Seconds between status polls.

  • verbose (bool) – Print progress to stdout.

Return type:

str

Returns:

str – Fine-tuned model identifier — pass directly to AnthropicDeveloperKit(model=...).

Raises:
class ractogateway.GeminiFineTuner(api_key=None)[source]

Bases: object

Fine-tune Google Gemini models using the Generative AI tuning API.

Parameters:

api_key (str | None) – Google AI API key. Falls back to the GEMINI_API_KEY environment variable when not supplied.

Examples

End-to-end pipeline:

from ractogateway.finetune import RactoDataset, GeminiFineTuner

ds = RactoDataset.from_pairs(
    [("capital of France?", "Paris"), ("capital of Japan?", "Tokyo")],
)
tuner = GeminiFineTuner()
model_name = tuner.run_pipeline(
    ds,
    base_model="models/gemini-1.5-flash-001-tuning",
    display_name="geography-tutor",
)
print(model_name)  # "tunedModels/geography-tutor-abc123"
create_job(dataset, base_model='models/gemini-1.5-flash-001-tuning', *, display_name='', epoch_count=5, batch_size=4, learning_rate=None)[source]

Start a Gemini supervised fine-tuning job.

Parameters:
  • dataset (RactoDataset) – Training examples. Each example must be a single-turn text pair (text_input / output). Examples with attachments or multi-turn conversations are not supported by this adapter — use Vertex AI for those.

  • base_model (str) – Tuning-enabled Gemini model identifier.

  • display_name (str) – Human-readable label for the tuned model.

  • epoch_count (int) – Number of training epochs.

  • batch_size (int) – Training batch size.

  • learning_rate (float | None) – Learning rate. None uses the provider default.

Return type:

Any

Returns:

google.generativeai.types.TunedModel (operation-like object) – Pass to wait_for_completion().

Raises:

ValueError – If the dataset fails validation, or if any examples are multimodal / multi-turn (unsupported by this adapter).

get_model(tuned_model_name)[source]

Retrieve metadata for a tuned model.

Parameters:

tuned_model_name (str) – Full tuned model name, e.g. "tunedModels/my-model-abc123".

Return type:

dict[str, Any]

Returns:

dict – Keys: name, display_name, state, base_model.

list_models()[source]

List all tuned models in this project.

Return type:

list[dict[str, Any]]

delete_model(tuned_model_name)[source]

Permanently delete a tuned model from your project.

Return type:

None

wait_for_completion(operation, *, poll_interval=60, verbose=True)[source]

Block until a tuning operation finishes.

Parameters:
  • operation (Any) – The object returned by create_job().

  • poll_interval (int) – Seconds between metadata checks.

  • verbose (bool) – Print progress to stdout.

Return type:

str

Returns:

str – Tuned model name (e.g. "tunedModels/my-model-abc123"). Pass directly to GoogleDeveloperKit(model=...).

Raises:

RuntimeError – If the tuning job ends in a failed state.

run_pipeline(dataset, base_model='models/gemini-1.5-flash-001-tuning', *, display_name='', epoch_count=5, batch_size=4, learning_rate=None, poll_interval=60, verbose=True)[source]

Validate → create → wait in a single call.

Parameters:
  • dataset (RactoDataset) – Text-pair training examples.

  • base_model (str) – Tuning-enabled Gemini model.

  • display_name (str) – Human-readable label for the tuned model.

  • epoch_count (int) – Training epochs.

  • batch_size (int) – Training batch size.

  • learning_rate (float | None) – Learning rate override.

  • poll_interval (int) – Seconds between status polls.

  • verbose (bool) – Print progress to stdout.

Return type:

str

Returns:

str – Tuned model name — pass to GoogleDeveloperKit(model=...).

class ractogateway.OpenAIFineTuner(api_key=None, *, base_url=None)[source]

Bases: object

Fine-tune OpenAI models using the fine-tuning API.

Parameters:
  • api_key (str | None) – OpenAI API key. Falls back to the OPENAI_API_KEY environment variable when not supplied.

  • base_url (str | None) – Optional custom base URL (Azure OpenAI, proxy, etc.).

Examples

End-to-end pipeline (simplest usage):

from ractogateway.finetune import RactoDataset, OpenAIFineTuner

ds = RactoDataset.from_pairs(
    [("What is Python?", "A high-level programming language.")],
    system="You are a Python tutor.",
)
tuner = OpenAIFineTuner()
model = tuner.run_pipeline(ds, model="gpt-4o-mini-2024-07-18")
print(model)   # "ft:gpt-4o-mini-2024-07-18:org::abc123"
upload_dataset(dataset)[source]

Upload dataset as an OpenAI training file.

Parameters:

dataset (RactoDataset) – The training examples to upload.

Return type:

str

Returns:

str – The OpenAI file ID (e.g. "file-abc123").

create_job(training_file, model='gpt-4o-mini-2024-07-18', *, validation_file=None, n_epochs='auto', batch_size='auto', learning_rate_multiplier='auto', suffix=None)[source]

Submit a fine-tuning job.

Parameters:
  • training_file (str) – File ID returned by upload_dataset().

  • model (str) – Base model to fine-tune.

  • validation_file (str | None) – Optional validation file ID (also produced by upload_dataset()).

  • n_epochs (int | str) – Training epochs.

  • batch_size (int | str) – Per-device batch size.

  • learning_rate_multiplier (float | str) – Scales the default learning rate.

  • suffix (str | None) – Custom label appended to the fine-tuned model name.

Return type:

str

Returns:

str – The fine-tuning job ID (e.g. "ftjob-abc123").

get_status(job_id)[source]

Retrieve the current status of a fine-tuning job.

Return type:

dict[str, Any]

Returns:

dict – Keys: id, status, model, fine_tuned_model, created_at, finished_at, trained_tokens, error.

list_jobs(limit=10)[source]

Return the most recent fine-tuning jobs (newest first).

Return type:

list[dict[str, Any]]

list_events(job_id, limit=20)[source]

Return recent training log events for a job.

Return type:

list[dict[str, Any]]

cancel_job(job_id)[source]

Cancel a running fine-tuning job.

Return type:

dict[str, Any]

wait_for_completion(job_id, *, poll_interval=30, verbose=True)[source]

Block until a fine-tuning job finishes.

Parameters:
  • job_id (str) – The job ID returned by create_job().

  • poll_interval (int) – Seconds between status-check API calls.

  • verbose (bool) – Print status lines to stdout.

Return type:

str

Returns:

str – The fine-tuned model name ready for use in OpenAILLMKit.

Raises:

RuntimeError – If the job ends in "failed" or "cancelled" state.

run_pipeline(dataset, model='gpt-4o-mini-2024-07-18', *, validation_dataset=None, n_epochs='auto', batch_size='auto', learning_rate_multiplier='auto', suffix=None, poll_interval=30, verbose=True)[source]

Validate → upload → train → wait in a single call.

This is the recommended entry-point for most use cases.

Parameters:
  • dataset (RactoDataset) – Training examples.

  • model (str) – Base model to fine-tune.

  • validation_dataset (RactoDataset | None) – Optional held-out validation set (uploaded separately).

  • n_epochs (int | str) – Training hyperparameters. Pass "auto" to let OpenAI decide.

  • batch_size (int | str) – Training hyperparameters. Pass "auto" to let OpenAI decide.

  • learning_rate_multiplier (float | str) – Training hyperparameters. Pass "auto" to let OpenAI decide.

  • suffix (str | None) – Short label appended to the fine-tuned model name.

  • poll_interval (int) – Seconds between status polls while waiting.

  • verbose (bool) – Print progress to stdout.

Return type:

str

Returns:

str – Fine-tuned model identifier — pass directly to OpenAIDeveloperKit(model=...):

kit = opd.OpenAIDeveloperKit(model=fine_tuned_model)

Raises:
class ractogateway.RactoDataset(examples=None)[source]

Bases: object

An ordered collection of RactoTrainingExample objects.

This is the central data container for building, validating, splitting, and exporting fine-tuning datasets for any supported LLM provider.

Parameters:

examples (list[RactoTrainingExample] | None) – Initial examples. An empty dataset is created when omitted.

Examples

Build from (user, assistant) pairs:

ds = RactoDataset.from_pairs(
    [
        ("What is Python?", "Python is a high-level programming language."),
        ("What is a list?", "A list is a mutable ordered sequence."),
    ],
    system="You are a Python tutor.",
)

Add multimodal examples manually:

ds.add(
    RactoTrainingExample.from_pair(
        user="Describe this image.",
        assistant="The image shows a flowchart with three decision nodes.",
        user_attachments=[RactoFile.from_path("diagram.png")],
    )
)

Export to JSONL for fine-tuning:

train_ds, val_ds = ds.split(0.8, seed=42)
train_ds.export_jsonl("train.jsonl", provider="openai")
val_ds.export_jsonl("val.jsonl", provider="openai")
add(example)[source]

Append a single training example.

Return type:

None

extend(examples)[source]

Append multiple training examples at once.

Return type:

None

classmethod from_pairs(pairs, *, system='')[source]

Build a text-only dataset from (user, assistant) pairs.

Parameters:
  • pairs (list[tuple[str, str]]) – Each tuple is (user_message, expected_assistant_response).

  • system (str) – Optional system prompt applied uniformly to every example.

Return type:

RactoDataset

classmethod from_jsonl(path, provider='openai')[source]

Load a JSONL dataset previously exported for provider.

Supports text-only OpenAI, Anthropic, and Gemini formats.

Parameters:
  • path (str | Path) – Path to the .jsonl file.

  • provider (str) – One of "openai", "anthropic", "gemini".

Return type:

RactoDataset

shuffle(seed=None)[source]

Return a new dataset with examples in random order.

Parameters:

seed (int | None) – Optional random seed for reproducibility.

Return type:

RactoDataset

split(train_ratio=0.8, *, seed=None)[source]

Split into train and validation datasets.

Parameters:
  • train_ratio (float) – Fraction of examples for the training split. Must be between 0 and 1 (exclusive).

  • seed (int | None) – Optional random seed for reproducible shuffling.

Return type:

tuple[RactoDataset, RactoDataset]

Returns:

tuple[RactoDataset, RactoDataset](train_dataset, validation_dataset)

validate(provider='openai')[source]

Check examples for common formatting errors.

Parameters:

provider (str) – Provider to validate against ("openai", "anthropic", or "gemini").

Return type:

list[str]

Returns:

list[str] – A list of human-readable error strings. An empty list means the dataset is ready to use.

to_jsonl_string(provider='openai')[source]

Serialize all examples to a JSONL string for provider.

Parameters:

provider (str) – One of "openai" / "generic", "anthropic", "gemini".

Return type:

str

export_jsonl(path, provider='openai', *, overwrite=False)[source]

Write the dataset to a .jsonl file on disk.

Parameters:
  • path (str | Path) – Destination file path.

  • provider (str) – One of "openai", "anthropic", "gemini".

  • overwrite (bool) – When False (default), raise FileExistsError if the file already exists.

Return type:

Path

Returns:

Path – The resolved path of the written file.

summary()[source]

Return brief statistics about the dataset.

Return type:

dict[str, Any]

Returns:

dict – Keys: examples, total_messages, avg_turns_per_example, multimodal_examples.

class ractogateway.RactoTrainingExample(messages)[source]

Bases: object

A complete conversation used as one training record.

Parameters:

messages (list[RactoTrainingMessage]) –

Ordered turns. Typical shapes:

  • Single-turn : [user, assistant]

  • With system : [system, user, assistant]

  • Multi-turn : [system, user, assistant, user, assistant, …]

Examples

>>> ex = RactoTrainingExample.from_pair(
...     user="What is 2 + 2?",
...     assistant="4",
...     system="You are a maths tutor.",
... )
>>> # Multimodal example (image + question)
>>> ex = RactoTrainingExample.from_pair(
...     user="Describe this chart.",
...     assistant="The chart shows monthly revenue for Q4 2024.",
...     user_attachments=[RactoFile.from_path("chart.png")],
... )
classmethod from_pair(user, assistant, *, system='', user_attachments=None)[source]

Create a single-turn (prompt → completion) training example.

Parameters:
  • user (str) – The user prompt.

  • assistant (str) – The desired model response.

  • system (str) – Optional system prompt prepended to the conversation.

  • user_attachments (list[RactoFile] | None) – Images or other files attached to the user turn.

Return type:

RactoTrainingExample

classmethod from_conversation(turns)[source]

Build from a list of (role, content) tuples.

Parameters:

turns (list[tuple[Literal['system', 'user', 'assistant'], str]]) – E.g. [("system", "…"), ("user", "…"), ("assistant", "…")]

Return type:

RactoTrainingExample

to_openai_dict()[source]

Serialize to OpenAI fine-tuning JSONL record.

Output format:

{"messages": [{"role": "system", "content": "…"}, …]}
Return type:

dict[str, Any]

to_anthropic_dict()[source]

Serialize to Anthropic fine-tuning JSONL record.

Output format:

{"system": "…", "messages": [{"role": "user", …}, …]}

The system key is only present when a system message exists.

Return type:

dict[str, Any]

to_gemini_dict()[source]

Serialize to Gemini tuning record.

For text-only single-turn examples (most common) the output is:

{"text_input": "…", "output": "…"}

For multimodal or multi-turn examples the Vertex AI contents format is used:

{"contents": [{"role": "user", "parts": […]}, …]}
Return type:

dict[str, Any]

class ractogateway.RactoTrainingMessage(role, content, attachments=<factory>)[source]

Bases: object

One conversational turn inside a training example.

Parameters:
  • role (Literal['system', 'user', 'assistant']) – Speaker role.

  • content (str) – Text content of the message.

  • attachments (list[RactoFile]) – Optional images / PDFs for multimodal training examples. Use RactoFile.from_path() or RactoFile.from_bytes().

role: Literal['system', 'user', 'assistant']
content: str
attachments: list[RactoFile]
to_openai()[source]

Return an OpenAI-compatible message dict.

Text-only messages produce {"role": ..., "content": str}. Messages with attachments produce a content-block list: {"role": ..., "content": [image_url_block, ..., text_block]}.

Return type:

dict[str, Any]

to_anthropic()[source]

Return an Anthropic-compatible message dict.

System messages should be lifted to the top-level system field — RactoTrainingExample.to_anthropic_dict() handles this automatically.

Return type:

dict[str, Any]

to_gemini_parts()[source]

Return a list of Gemini content parts (text + inline_data).

Return type:

list[dict[str, Any]]

class ractogateway.Gateway(adapter, *, tools=None, default_prompt=None)[source]

Bases: object

Unified entry point that wraps any BaseLLMAdapter.

Parameters:
  • adapter (BaseLLMAdapter) – A concrete adapter instance (OpenAILLMKit, GoogleLLMKit, AnthropicLLMKit).

  • tools (ToolRegistry | None) – An optional ToolRegistry containing registered tools that the LLM is allowed to call.

  • default_prompt (RactoPrompt | None) – An optional RactoPrompt to use when run() is called without an explicit prompt.

  • Usage::

    from ractogateway import RactoPrompt, Gateway from ractogateway.adapters import OpenAILLMKit

    adapter = OpenAILLMKit(model=”gpt-4o”, api_key=”sk-…”) prompt = RactoPrompt(…) gw = Gateway(adapter=adapter)

    response = gw.run(prompt, “Analyse this code for bugs.”) print(response.parsed) # auto-parsed JSON dict

run(prompt=None, user_message='', *, tools=None, temperature=0.0, max_tokens=4096, response_model=None, **kwargs)[source]

Execute a request and return a standardised LLMResponse.

Parameters:
  • prompt (RactoPrompt | None) – The RACTO prompt. Falls back to default_prompt.

  • user_message (str) – The end-user’s query.

  • tools (ToolRegistry | None) – Override the gateway-level tool registry for this call.

  • temperature (float) – Sampling temperature.

  • max_tokens (int) – Maximum tokens in the response.

  • response_model (type[BaseModel] | None) – Optional Pydantic model to validate parsed output against. If provided and the LLM returns valid JSON, it is validated through this model and attached to response.parsed.

  • **kwargs (Any) – Passed through to the adapter.

Return type:

LLMResponse

async arun(prompt=None, user_message='', *, tools=None, temperature=0.0, max_tokens=4096, response_model=None, **kwargs)[source]

Async variant of run().

Return type:

LLMResponse

class ractogateway.LLMResponse(**data)[source]

Bases: BaseModel

Unified, provider-agnostic response envelope.

Every adapter’s run() method returns one of these, regardless of whether the underlying provider is OpenAI, Gemini, or Anthropic.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

content: str | None
thinking: str | None
parsed: dict[str, Any] | list[Any] | None
tool_calls: list[ToolCallResult]
finish_reason: FinishReason
usage: dict[str, int]
raw: Any
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.RactoFile(data, mime_type, name='')[source]

Bases: object

A file attachment that can be passed to RactoPrompt.to_messages().

Create from a file path (MIME type is auto-detected) or directly from raw bytes with an explicit MIME type.

Parameters:
  • data (bytes) – Raw bytes of the file.

  • mime_type (str) – MIME type string, e.g. "image/jpeg" or "application/pdf".

  • name (str) – Optional filename hint used for display / debugging.

Examples

>>> # From a file path
>>> img = RactoFile.from_path("/tmp/photo.jpg")
>>> # From bytes
>>> img = RactoFile.from_bytes(open("photo.jpg", "rb").read(), "image/jpeg")
classmethod from_path(path)[source]

Load a file from path and auto-detect its MIME type.

Parameters:

path (str | Path) – Absolute or relative path to the file on disk.

Raises:

FileNotFoundError – If path does not exist.

Return type:

RactoFile

classmethod from_bytes(data, mime_type, name='')[source]

Create a RactoFile directly from data bytes.

Parameters:
  • data (bytes) – Raw file bytes.

  • mime_type (str) – MIME type of the data, e.g. "image/png".

  • name (str) – Optional filename string (no file I/O is performed).

Return type:

RactoFile

property base64_data: str

Return file bytes encoded as a base-64 ASCII string.

property is_image: bool

True when the MIME type is a supported image type.

property is_pdf: bool
property is_text: bool
class ractogateway.RactoPrompt(**data)[source]

Bases: BaseModel

A strictly validated RACTO prompt definition.

Parameters:
  • role (str) – A sentence (or short paragraph) describing who the LLM is.

  • aim (str) – A clear statement of the task objective.

  • constraints (list[str]) – Hard rules the model must obey. At least one is required.

  • tone (str) – The desired communication style (e.g. “Professional and concise”).

  • output_format (str | type[BaseModel]) – Either a format keyword ("json", "text", "markdown"), a free-form format description, or a Pydantic model class whose JSON Schema will be embedded in the prompt.

  • context (str | None) – Optional extra context paragraph injected between AIM and CONSTRAINTS. Useful for passing domain-specific background knowledge that the model needs to reason about.

  • examples (list[dict[str, str]] | None) – Optional list of example input/output pairs that are included in the prompt to steer the model via few-shot learning.

  • anti_hallucination (bool) – When True (the default), the compiler appends explicit anti-hallucination directives at the end of the prompt.

Notes

Legacy compatibility:

Older RactoGateway versions accepted instructions without the full RACTO field set. That shape is still accepted and mapped to aim with sensible defaults for missing RACTO fields.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

role: str
aim: str
constraints: list[str]
tone: str
output_format: str | type[BaseModel]
context: str | None
examples: list[dict[str, str]] | None
anti_hallucination: bool
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

compile()[source]

Compile the RACTO fields into an optimized system prompt string.

The resulting prompt is structured into clearly delimited sections so that the LLM can parse each instruction block unambiguously.

Return type:

str

Returns:

str – A ready-to-use system prompt.

to_messages(user_message, *, attachments=None, provider='generic')[source]

Return a ready-to-send message list for a given LLM provider.

Parameters:
  • user_message (str) – The end-user’s query or input.

  • attachments (list[RactoFile] | None) –

    Optional list of RactoFile objects to send alongside the text message. Accepted inputs per file:

    • File path — use RactoFile.from_path():

      RactoFile.from_path("/tmp/diagram.png")
      
    • Raw bytes — use RactoFile.from_bytes():

      RactoFile.from_bytes(img_bytes, "image/png")
      

    Each file is re-encoded into the content-block schema expected by the target provider (image_url for OpenAI, image / document for Anthropic, inline_data for Google, images list for Ollama).

  • provider (str) – One of "openai", "anthropic", "google", "ollama", or "generic". Controls the system-role key name and the content-block format used for attachments.

Return type:

list[dict[str, Any]]

Returns:

list[dict[str, Any]] – A list of message dicts suitable for the provider’s API.

class ractogateway.ToolRegistry[source]

Bases: object

A registry that collects tools and exposes them as canonical schemas.

Usage:

registry = ToolRegistry()

@registry.register
def get_weather(city: str, unit: str = "celsius") -> str:
    '''Get the current weather for a city.'''
    ...

# Or register a Pydantic model:
registry.register(WeatherRequest)

# Iterate schemas:
for schema in registry.schemas:
    print(schema.name)
register(target=None, *, name=None, description=None)[source]

Register a function or Pydantic model as a tool.

Works as a decorator (@registry.register) or as a direct call (registry.register(MyModel)).

Return type:

Callable[..., Any] | type[BaseModel]

property schemas: list[ToolSchema]

Return all registered tool schemas.

property tools: dict[str, ToolSchema]

Backward-compatible mapping of tool name -> schema.

get_schema(name)[source]

Look up a single tool schema by name.

Return type:

ToolSchema | None

get_callable(name)[source]

Look up the original callable by tool name.

Return type:

Callable[..., Any] | None

ractogateway.tool(fn=None, *, name=None, description=None, registry=None)[source]

Decorator that marks a function as an LLM-callable tool.

Can be used bare (@tool) or with overrides (@tool(name="my_tool", description="…")).

You can also bind registration directly: @tool(registry) or @tool(registry=registry).

The decorated function gains a _tool_schema attribute containing the canonical ToolSchema.

Return type:

Callable[..., Any]

class ractogateway.RactoRAG(vector_store=None, embedder=None, *, store=None, chunker=None, processors=None, llm_kit=None, context_template=None, reader_registry=None, default_prompt=None)[source]

Bases: object

Production-grade RAG pipeline for RactoGateway.

Parameters:
  • vector_store (BaseVectorStore | None) – Any BaseVectorStore instance.

  • embedder (BaseEmbedder | None) – Any BaseEmbedder instance.

  • chunker (BaseChunker | None) – How to split documents. Defaults to RecursiveChunker with chunk_size=512, overlap=50.

  • processors (list[BaseProcessor] | None) – List of text processors applied to each chunk before embedding. Defaults to [TextCleaner()].

  • llm_kit (Any | None) – Any developer kit (OpenAIDeveloperKit, GoogleDeveloperKit, or AnthropicDeveloperKit). Required for query() / aquery().

  • context_template (str | None) – Template string for injecting retrieved context into the LLM prompt. Must contain {context} and {question} placeholders.

  • reader_registry (FileReaderRegistry | None) – Custom FileReaderRegistry. Defaults to a registry with all built-in readers.

  • default_prompt (RactoPrompt | None) – RACTO prompt used for generation. Falls back to a built-in RAG prompt.

ingest(path, **metadata)[source]

Read, chunk, embed, and store a single file.

Parameters:
  • path (str | Path) – Path to the file to ingest.

  • **metadata (Any) – Extra metadata merged into each chunk’s ChunkMetadata.extra.

Return type:

list[Chunk]

Returns:

list[Chunk] – The chunks that were added to the vector store.

ingest_dir(directory, pattern='**/*', **metadata)[source]

Recursively ingest all supported files in a directory.

Parameters:
  • directory (str | Path) – Root directory to scan.

  • pattern (str) – Glob pattern relative to directory.

  • **metadata (Any) – Extra metadata merged into every chunk.

Return type:

list[Chunk]

Returns:

list[Chunk] – All chunks added across all ingested files.

ingest_text(text, source='manual', **metadata)[source]

Ingest a raw text string directly (no file needed).

Parameters:
  • text (str) – The text content to ingest.

  • source (str) – A label identifying the source (stored in metadata).

  • **metadata (Any) – Extra metadata merged into each chunk.

Return type:

list[Chunk]

async aingest(path, **metadata)[source]

Async variant of ingest().

Return type:

list[Chunk]

async aingest_dir(directory, pattern='**/*', **metadata)[source]

Async variant of ingest_dir().

Return type:

list[Chunk]

async aingest_text(text, source='manual', **metadata)[source]

Async variant of ingest_text().

Return type:

list[Chunk]

retrieve(query, top_k=5, filters=None)[source]

Embed query and retrieve the top-k most relevant chunks.

Parameters:
  • query (str) – Natural-language question or search phrase.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked results (rank 1 = most relevant).

async aretrieve(query, top_k=5, filters=None)[source]

Async variant of retrieve().

Return type:

list[RetrievalResult]

query(question, top_k=5, filters=None, prompt=None, temperature=0.0, max_tokens=2048)[source]

Retrieve relevant chunks and generate an answer.

Parameters:
  • question (str) – The user’s question.

  • top_k (int) – Number of context chunks to retrieve.

  • filters (dict[str, Any] | None) – Optional metadata filters for retrieval.

  • prompt (RactoPrompt | None) – Override the default RACTO prompt for generation.

  • temperature (float) – LLM temperature (default 0.0 for factual answers).

  • max_tokens (int) – Maximum tokens in the generated answer.

Return type:

RAGResponse

Returns:

RAGResponse – Contains the generated answer plus the retrieved source chunks.

Raises:

RuntimeError – If no llm_kit was provided.

async aquery(question, top_k=5, filters=None, prompt=None, temperature=0.0, max_tokens=2048)[source]

Async variant of query().

Return type:

RAGResponse

property store: BaseVectorStore

The underlying vector store.

property embedder: BaseEmbedder

The underlying embedder.

count()[source]

Return the total number of indexed chunks.

Return type:

int

clear()[source]

Remove all indexed chunks from the vector store.

Return type:

None

class ractogateway.FixedChunker(chunk_size=512, overlap=50)[source]

Bases: BaseChunker

Split text into fixed-size character windows with overlap.

Parameters:
  • chunk_size (int) – Maximum number of characters per chunk.

  • overlap (int) – Number of characters to repeat at the start of the next chunk. Must be less than chunk_size.

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

class ractogateway.RecursiveChunker(chunk_size=512, overlap=50, separators=None)[source]

Bases: BaseChunker

Split text recursively using a priority list of separators.

Parameters:
  • chunk_size (int) – Maximum number of characters per chunk.

  • overlap (int) – Number of characters of overlap between consecutive chunks.

  • separators (list[str] | None) – Ordered list of separator strings to try. The first separator that produces pieces within chunk_size is used.

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

class ractogateway.SemanticChunker(embedder, threshold=0.5, min_chunk_size=2, language='english')[source]

Bases: BaseChunker

Split documents where the semantic similarity between adjacent sentences drops below a threshold.

Parameters:
  • embedder (BaseEmbedder) – Any BaseEmbedder instance.

  • threshold (float) – Cosine similarity below which a split is inserted (default: 0.5).

  • min_chunk_size (int) – Minimum number of sentences per chunk (prevents ultra-fine splits).

  • language (str) – NLTK sentence tokenizer language.

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

class ractogateway.SentenceChunker(sentences_per_chunk=5, overlap_sentences=1, language='english')[source]

Bases: BaseChunker

Split text into groups of sentences using NLTK.

Parameters:
  • sentences_per_chunk (int) – Number of sentences per chunk.

  • overlap_sentences (int) – Number of sentences to repeat at the start of the next chunk.

  • language (str) – Language for the NLTK sentence tokenizer (default: "english").

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

class ractogateway.GoogleEmbedder(model='text-embedding-004', *, api_key=None, task_type=None, batch_size=100)[source]

Bases: BaseEmbedder

Embed texts using the Google Gemini Embeddings API.

Parameters:
  • model (str) – Gemini embedding model (default "text-embedding-004").

  • api_key (str | None) – Gemini API key. Falls back to GEMINI_API_KEY env var.

  • task_type (str | None) – Gemini task type hint (e.g. "RETRIEVAL_DOCUMENT", "RETRIEVAL_QUERY"). None lets the API decide.

  • batch_size (int) – Maximum number of texts per API call.

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

class ractogateway.OpenAIEmbedder(model='text-embedding-3-small', *, api_key=None, base_url=None, dimensions=None, batch_size=256)[source]

Bases: BaseEmbedder

Embed texts using the OpenAI Embeddings API.

Parameters:
  • model (str) – OpenAI embedding model (default "text-embedding-3-small").

  • api_key (str | None) – OpenAI API key. Falls back to OPENAI_API_KEY env var.

  • base_url (str | None) – Custom base URL (Azure OpenAI or proxy).

  • dimensions (int | None) – Override output dimensionality (supported for text-embedding-3-*).

  • batch_size (int) – Maximum number of texts per API call.

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

class ractogateway.VoyageEmbedder(model='voyage-3', *, api_key=None, input_type='document', batch_size=128)[source]

Bases: BaseEmbedder

Embed texts using the Voyage AI API.

Voyage AI embeddings are optimised for Anthropic Claude RAG pipelines and are the recommended choice when using Claude as the generation LLM.

Parameters:
  • model (str) – Voyage model name (default "voyage-3").

  • api_key (str | None) – Voyage API key. Falls back to VOYAGE_API_KEY env var.

  • input_type (str | None) – "query" for queries, "document" for documents to index. Using the correct type improves retrieval quality.

  • batch_size (int) – Maximum texts per API call.

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

class ractogateway.Lemmatizer(use_pos_tagging=True)[source]

Bases: BaseProcessor

Reduce words to their base (lemma) form using NLTK WordNet.

Parameters:

use_pos_tagging (bool) – If True, use POS tagging to improve lemmatization accuracy. Slightly slower but produces better results.

process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

class ractogateway.ProcessingPipeline(processors)[source]

Bases: BaseProcessor

Apply a sequence of BaseProcessor objects to text.

Example:

pipeline = ProcessingPipeline([TextCleaner(), Lemmatizer()])
processed = pipeline.process("  Hello,   worlds!  ")
Parameters:

processors (list[BaseProcessor]) – Ordered list of processors to apply. Each processor receives the output of the previous one.

process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

class ractogateway.TextCleaner(normalize_unicode=True, strip_html=True, strip_control_chars=True, collapse_whitespace=True, collapse_blank_lines=True)[source]

Bases: BaseProcessor

Normalise text for embedding and retrieval.

Steps applied (all optional via constructor flags):

  1. Unicode normalisation (NFC)

  2. Strip residual HTML tags

  3. Remove control characters

  4. Collapse multiple spaces to one

  5. Collapse runs of blank lines to at most two newlines

  6. Strip leading/trailing whitespace

Parameters:
  • normalize_unicode (bool) – Apply unicodedata.normalize("NFC", text).

  • strip_html (bool) – Remove <tag> patterns.

  • strip_control_chars (bool) – Remove non-printable control characters.

  • collapse_whitespace (bool) – Collapse sequences of spaces/tabs to a single space.

  • collapse_blank_lines (bool) – Collapse 3+ consecutive newlines to 2.

process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

class ractogateway.FileReaderRegistry(readers=None)[source]

Bases: object

Registry that maps file extensions to BaseReader instances.

By default all built-in readers are registered. You can add custom readers with register().

Example:

registry = FileReaderRegistry()
doc = registry.read("report.pdf")
register(reader)[source]

Add reader to the registry for all its supported extensions.

Return type:

None

get_reader(path)[source]

Return the reader for path’s extension.

Raises:

ValueError – If no reader supports the file’s extension.

Return type:

BaseReader

read(path)[source]

Convenience method: detect reader and return a Document.

Return type:

Document

property supported_extensions: frozenset[str]

All extensions currently registered.

class ractogateway.ChromaStore(collection='ractogateway', *, path=None, host=None, port=8000, distance_function='cosine')[source]

Bases: BaseVectorStore

Vector store backed by ChromaDB.

Supports both in-process (path or None for ephemeral) and HTTP-client modes (host + port).

Parameters:
  • collection (str) – Name of the ChromaDB collection.

  • path (str | None) – Persist directory for a local persistent client. None = ephemeral.

  • host (str | None) – ChromaDB server host (enables HTTP client mode).

  • port (int) – ChromaDB server port (default 8000).

  • distance_function (str) – "cosine", "l2", or "ip" (inner product).

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.FAISSStore(dimension=None, index_type='flat_ip')[source]

Bases: BaseVectorStore

Vector store backed by Facebook AI Similarity Search (FAISS).

Stores embeddings in a flat L2 or cosine (Inner Product) index. All data is in-memory; call save() / load() to persist.

Parameters:
  • dimension (int | None) – Embedding dimension. Inferred from the first add() call if None.

  • index_type (str) – "flat_l2" or "flat_ip" (inner product / cosine when normalised).

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

save(path)[source]

Persist the FAISS index to path.index and chunks to path.chunks.

Return type:

None

load(path)[source]

Load a previously saved index from path.

Return type:

None

class ractogateway.InMemoryVectorStore(similarity='cosine')[source]

Bases: BaseVectorStore

Pure-Python brute-force vector store — no extra dependencies.

This store keeps all chunks and their embeddings in memory. It is not suitable for production-scale corpora but requires no installation.

Parameters:

similarity (str) – Similarity function to use. Currently only "cosine" is supported.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.MilvusStore(collection='ractogateway', *, host='localhost', port=19530, uri=None, token=None, dimension=None, metric_type='IP', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Milvus or Zilliz Cloud.

Parameters:
  • collection (str) – Milvus collection name.

  • host (str) – Milvus server host (default "localhost").

  • port (int) – Milvus server port (default 19530).

  • uri (str | None) – Zilliz Cloud URI (overrides host/port when set).

  • token (str | None) – Zilliz Cloud API token.

  • dimension (int | None) – Embedding dimension. Inferred on first add.

  • metric_type (str) – "IP" (inner product / cosine) or "L2".

  • batch_size (int) – Vectors per insert batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.PGVectorStore(dsn, *, table='rag_chunks', dimension=None, distance='cosine', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by PostgreSQL with the pgvector extension.

Parameters:
  • dsn (str) – PostgreSQL connection string (e.g. "postgresql://user:pass@localhost/mydb").

  • table (str) – Table name (default "rag_chunks").

  • dimension (int | None) – Embedding dimension. Inferred on first add.

  • distance (str) – "cosine", "l2", or "inner".

  • batch_size (int) – Rows per INSERT batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.PineconeStore(index_name, *, api_key=None, namespace='', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Pinecone cloud.

Parameters:
  • index_name (str) – Name of the Pinecone index (must already exist).

  • api_key (str | None) – Pinecone API key. Falls back to PINECONE_API_KEY env var.

  • namespace (str) – Pinecone namespace for logical data isolation.

  • environment – Deprecated Pinecone environment string (for legacy pod-based indexes).

  • batch_size (int) – Number of vectors per upsert batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.QdrantStore(collection='ractogateway', *, url=None, api_key=None, distance='cosine', dimension=None, batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Qdrant.

Parameters:
  • collection (str) – Qdrant collection name.

  • url (str | None) – Qdrant server URL. None = in-memory.

  • api_key (str | None) – Qdrant cloud API key (optional).

  • distance (str) – "cosine", "euclid", or "dot".

  • dimension (int | None) – Vector dimension. Inferred on first add if None.

  • batch_size (int) – Points per upsert batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.WeaviateStore(class_name='RactoChunk', *, url=None, api_key=None, additional_headers=None, distance_metric='cosine', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Weaviate.

Supports embedded (local, no server needed), local server, and Weaviate Cloud (WCS) connections.

Parameters:
  • class_name (str) – Weaviate class (collection) name.

  • url (str | None) – Weaviate server URL. None = use embedded Weaviate.

  • api_key (str | None) – Weaviate Cloud API key.

  • additional_headers (dict[str, str] | None) – Extra HTTP headers (e.g. for OpenAI API key pass-through to Weaviate).

  • distance_metric (str) – "cosine" or "l2-squared".

  • batch_size (int) – Objects per batch import.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

class ractogateway.CacheConfig(**data)[source]

Bases: BaseModel

Configuration for cache instances.

Parameters:
  • max_size (int) – Maximum number of entries to hold. When full, the least-recently-used entry is evicted (LRU policy). 0 means unlimited.

  • ttl_seconds (float | None) – Time-to-live in seconds. Entries older than this are treated as misses and evicted lazily. None disables TTL.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_size: int
ttl_seconds: float | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.CacheStats(**data)[source]

Bases: BaseModel

Snapshot of cache performance counters.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

hits: int
misses: int
size: int
property total: int

Total requests seen by the cache.

property hit_rate: float

Fraction of requests that were cache hits (0.0-1.0).

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.ExactMatchCache(max_size=1024, ttl_seconds=None)[source]

Bases: object

Ultra-low-latency key-value cache for identical LLM requests.

Parameters:
  • max_size (int) – LRU capacity. 0 = unlimited (no eviction).

  • ttl_seconds (float | None) – Entries older than ttl_seconds are treated as misses and transparently evicted. None disables expiry.

  • Example::

    from ractogateway.cache import ExactMatchCache

    cache = ExactMatchCache(max_size=512, ttl_seconds=3600)

    # Wire into a kit: kit = OpenAIDeveloperKit(model=”gpt-4o”, exact_cache=cache)

get(user_message, system_prompt, model, temperature, max_tokens)[source]

Return a cached response or None on a miss.

O(1) — dictionary lookup + optional move-to-end.

Return type:

LLMResponse | None

put(user_message, system_prompt, model, temperature, max_tokens, response)[source]

Store a response. Evicts LRU entry when at capacity.

O(1) amortised — dictionary insert + optional popitem(last=False).

Return type:

None

invalidate(user_message, system_prompt, model, temperature, max_tokens)[source]

Remove a specific entry. Returns True if it was present.

Return type:

bool

clear()[source]

Evict all cached entries and reset counters.

Return type:

None

property stats: CacheStats

Return a snapshot of hit/miss/size counters.

class ractogateway.SemanticCache(embed_fn, similarity_threshold=0.95, max_size=512, ttl_seconds=None)[source]

Bases: object

Vector-similarity cache — returns cached answers for semantically similar queries, costing $0 in API calls.

Parameters:
  • embed_fn (Callable[[str], list[float]]) – Any callable (text: str) -> list[float]. Called once per new query (cache miss) and once at put() time.

  • similarity_threshold (float) – Minimum cosine similarity to declare a hit. Default 0.95 is intentionally strict to avoid incorrect responses.

  • max_size (int) – Maximum number of entries (LRU eviction). 0 = unlimited.

  • ttl_seconds (float | None) – Optional per-entry TTL. None disables expiry.

Examples

import ractogateway.openai_developer_kit as gpt

kit = gpt.OpenAIDeveloperKit(model="gpt-4o")

def embed(text: str) -> list[float]:
    import openai
    r = openai.OpenAI().embeddings.create(
        model="text-embedding-3-small", input=text
    )
    return r.data[0].embedding

cache = SemanticCache(embed_fn=embed, similarity_threshold=0.95)
get(query)[source]

Embed query and return a cached response if cosine-sim ≥ threshold.

Returns None on a cache miss (caller should make the real API call and then invoke put()).

Complexity: O(n·d) where n = number of entries, d = embedding dim.

Return type:

LLMResponse | None

put(query, response)[source]

Embed query and store response for future similar queries.

Evicts LRU entry when at capacity.

Return type:

None

clear()[source]

Remove all entries and reset counters.

Return type:

None

property stats: CacheStats

Return a snapshot of hit/miss/size counters.

class ractogateway.ChatMemoryConfig(**data)[source]

Bases: BaseModel

Configuration for RedisChatMemory.

Parameters:
  • max_turns (int) – Maximum number of conversation turns to retain. Each turn consists of one user message and one assistant message, so up to max_turns * 2 raw messages are stored per conversation.

  • ttl_seconds (float | None) – Optional TTL. Every append() call refreshes the expiry on the underlying Redis list. None disables expiry.

  • key_prefix (str) – Redis key namespace. Each conversation is stored at "{key_prefix}:{conversation_id}".

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_turns: int
ttl_seconds: float | None
key_prefix: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.RateLimitConfig(**data)[source]

Bases: BaseModel

Configuration for RedisRateLimiter.

Parameters:
  • max_tokens_per_minute (int) – Maximum LLM tokens a single user_id may consume in any 60-second window.

  • key_prefix (str) – Redis key namespace. All rate-limit keys are stored under "{key_prefix}:{user_id}:{unix_minute}".

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_tokens_per_minute: int
key_prefix: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.RedisChatMemory(*, url='redis://localhost:6379/0', client=None, config=None)[source]

Bases: object

Shared, bounded conversation history backed by Redis.

Parameters:
  • url (str) – Redis connection URL. Ignored when client is provided.

  • client (Any | None) – Pre-built redis.Redis client.

  • config (ChatMemoryConfig | None) – ChatMemoryConfig controlling turn limit, TTL, and key namespace. Defaults are applied when None.

append(conversation_id, role, content)[source]

Append a message to the conversation history.

After appending, the list is trimmed to the last config.max_turns * 2 messages (oldest dropped first). If a TTL is configured, it is refreshed on every append so the window slides with activity.

Parameters:
  • conversation_id (str) – Opaque identifier for the conversation (e.g. session UUID).

  • role (str) – The message author: "user", "assistant", or "system".

  • content (str) – Text content of the message.

Return type:

None

get_history(conversation_id)[source]

Return all stored messages as a list of {"role", "content"} dicts.

The list is ordered oldest-first, matching the ChatConfig.history convention.

Returns an empty list when the conversation does not exist or has expired.

Return type:

list[dict[str, str]]

clear(conversation_id)[source]

Delete the conversation history from Redis.

Return type:

None

count(conversation_id)[source]

Return the number of messages stored for this conversation.

Returns 0 when the conversation does not exist.

Return type:

int

class ractogateway.RedisExactCache(*, url='redis://localhost:6379/0', client=None, ttl_seconds=None, key_prefix='ractogateway:exact')[source]

Bases: object

Distributed exact-match LRU cache backed by Redis.

Parameters:
  • url (str) – Redis connection URL (e.g. "redis://localhost:6379/0"). Ignored when client is provided.

  • client (Any | None) – Pre-built redis.Redis (or compatible) client. Useful when you manage the connection pool yourself or use a mock in tests.

  • ttl_seconds (float | None) – Optional TTL for each entry. Passed directly to Redis SET EX. None means entries never expire (Redis default).

  • key_prefix (str) – Namespace for all Redis keys managed by this instance.

get(user_message, system_prompt, model, temperature, max_tokens)[source]

Return a cached response or None on a miss.

O(1) Redis GET.

Return type:

LLMResponse | None

put(user_message, system_prompt, model, temperature, max_tokens, response)[source]

Store a response in Redis.

O(1) Redis SET [EX ttl].

Return type:

None

invalidate(user_message, system_prompt, model, temperature, max_tokens)[source]

Remove a specific entry. Returns True if it was present.

Return type:

bool

clear()[source]

Delete all entries matching this instance’s key prefix.

Uses SCAN to iterate safely (no KEYS * in production). Also resets in-memory stats counters.

Return type:

None

property stats: CacheStats

Return a snapshot of hit/miss counters plus current Redis key count.

class ractogateway.RedisRateLimiter(*, url='redis://localhost:6379/0', client=None, config=None)[source]

Bases: object

Fleet-wide token-budget rate limiter backed by a shared Redis instance.

Parameters:
  • url (str) – Redis connection URL. Ignored when client is provided.

  • client (Any | None) – Pre-built redis.Redis client. Useful for connection-pool sharing or unit-test mocking.

  • config (RateLimitConfig | None) – RateLimitConfig controlling the token budget and Redis key namespace. Defaults are applied when None.

check_and_consume(user_id, tokens=1)[source]

Attempt to consume tokens from user_id’s budget.

Returns True if the request is within budget (tokens are consumed), or False if the rate limit would be exceeded (no tokens consumed).

The check-and-increment is done in a single Redis pipeline, making it safe against concurrent requests from the same user.

Parameters:
  • user_id (str) – Opaque identifier for the caller (e.g. API key, user UUID).

  • tokens (int) – Number of tokens to consume. Defaults to 1 for request-count limiting; pass the estimated LLM token count for cost-based limiting.

Return type:

bool

get_remaining(user_id)[source]

Return the remaining token budget for the current minute.

Returns max_tokens_per_minute if the user has not made any requests in the current window.

Return type:

int

reset(user_id)[source]

Delete all rate-limit keys for user_id (current and any stale windows).

Intended for admin / testing use. Uses SCAN to avoid blocking.

Return type:

None

class ractogateway.KafkaAuditEvent(**data)[source]

Bases: BaseModel

Immutable audit envelope for prompt/response traces.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_id: str
model: str
prompt: str
response: str
request_id: str | None
metadata: dict[str, Any]
timestamp_utc: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaAuditLogger(producer, *, topic='ractogateway.audit')[source]

Bases: object

Asynchronous audit logger for regulated AI workloads.

Writes immutable prompt/response records to a dedicated Kafka topic so audit traces survive web-process crashes and can be replayed later.

property topic: str
log(*, user_id, model, prompt, response, request_id=None, metadata=None, wait=False)[source]

Publish one immutable audit record.

Parameters:

wait (bool) – If True, block until Kafka acknowledges the write. For high-throughput request paths, False is usually preferred.

Return type:

KafkaAuditEvent

class ractogateway.KafkaConsumerClient(*, config, consumer=None, key_deserializer=None, value_deserializer=None)[source]

Bases: object

Typed facade over kafka.KafkaConsumer.

Parameters:
  • config (KafkaConsumerConfig) – Consumer connection and polling options.

  • consumer (Any | None) – Pre-built consumer object; useful for tests and dependency injection.

  • key_deserializer (Callable[[Any], Any] | None) – Optional override for key decoding.

  • value_deserializer (Callable[[Any], Any] | None) – Optional override for value decoding.

poll(*, timeout_ms=None, max_records=None)[source]

Poll records and return normalized KafkaMessage entries.

Return type:

list[KafkaMessage]

commit()[source]

Commit currently consumed offsets.

Return type:

None

close()[source]

Close underlying consumer and release resources.

Return type:

None

class ractogateway.KafkaConsumerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaConsumerClient.

Parameters:
  • topics (list[str]) – Topic list to subscribe on startup.

  • bootstrap_servers (str | list[str]) – Kafka broker list.

  • group_id (str | None) – Consumer group ID. None disables group coordination.

  • auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.

  • enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.

  • max_poll_records (int) – Upper bound on records returned per poll call.

  • poll_timeout_ms (int) – Default poll timeout in milliseconds.

  • session_timeout_ms (int) – Group session timeout.

  • client_id (str) – Consumer client ID.

  • security_protocol (str) – Kafka transport protocol.

  • sasl_mechanism (str | None) – SASL mechanism when applicable.

  • sasl_plain_username (str | None) – Username for SASL auth.

  • sasl_plain_password (str | None) – Password for SASL auth.

  • extra (dict[str, Any]) – Additional kwargs forwarded to kafka.KafkaConsumer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topics: list[str]
bootstrap_servers: str | list[str]
group_id: str | None
auto_offset_reset: Literal['earliest', 'latest', 'none']
enable_auto_commit: bool
max_poll_records: int
poll_timeout_ms: int
session_timeout_ms: int
client_id: str
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaMessage(**data)[source]

Bases: BaseModel

Normalized Kafka record returned by KafkaConsumerClient.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
key: str | bytes | None
value: Any
headers: dict[str, bytes | None]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaProduceResult(**data)[source]

Bases: BaseModel

Broker acknowledgement metadata returned by publish operations.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaProducerClient(*, config=None, producer=None, key_serializer=None, value_serializer=None)[source]

Bases: object

Typed facade over kafka.KafkaProducer.

Parameters:
  • config (KafkaProducerConfig | None) – Producer connection and reliability settings. Defaults are applied when omitted.

  • producer (Any | None) – Pre-built Kafka producer object. Useful for dependency injection in tests or when your runtime provides the producer lifecycle externally.

  • key_serializer (Callable[[Any], bytes] | None) – Optional override for key serialization.

  • value_serializer (Callable[[Any], bytes] | None) – Optional override for value serialization.

publish(topic, value, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]

Publish one event and optionally wait for broker acknowledgement.

Return type:

KafkaProduceResult | None

publish_json(topic, payload, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]

Alias for publish() when value is a JSON payload.

Return type:

KafkaProduceResult | None

flush(*, timeout_s=None)[source]

Force all buffered events to be sent to brokers.

Return type:

None

close(*, timeout_s=5.0)[source]

Close underlying producer and release network resources.

Return type:

None

class ractogateway.KafkaProducerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaProducerClient.

Parameters:
  • bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).

  • client_id (str) – Producer client ID reported to the broker.

  • acks (Literal['all', '0', '1'] | int) – Acknowledgement policy; "all" is safest for durability.

  • linger_ms (int) – Small batching delay to improve throughput.

  • compression_type (str | None) – Optional compression algorithm (e.g. "gzip", "snappy").

  • retries (int) – Retry attempts for transient broker/network failures.

  • request_timeout_ms (int) – Broker request timeout.

  • security_protocol (str) – Kafka transport protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL).

  • sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.

  • sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.

  • sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.

  • extra (dict[str, Any]) – Additional kwargs forwarded directly to kafka.KafkaProducer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

bootstrap_servers: str | list[str]
client_id: str
acks: Literal['all', '0', '1'] | int
linger_ms: int
compression_type: str | None
retries: int
request_timeout_ms: int
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaPublishRequest(**data)[source]

Bases: BaseModel

Explicit publish request used by stream routing handlers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
value: Any
key: str | bytes | None
headers: dict[str, str | bytes | None] | None
partition: int | None
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaStreamConfig(**data)[source]

Bases: BaseModel

Batch processing configuration for KafkaStreamProcessor.

Parameters:
  • batch_size (int) – Maximum messages to process per handler invocation.

  • max_wait_ms (int) – Max wall-clock wait to collect one batch.

  • poll_timeout_ms (int) – Per-poll timeout while collecting a batch.

  • commit_on_success (bool) – Commit offsets after successful handler + publish cycle.

  • wait_for_publish (bool) – If True, block for broker acknowledgement on each produced output.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

batch_size: int
max_wait_ms: int
poll_timeout_ms: int
commit_on_success: bool
wait_for_publish: bool
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]

Bases: object

Real-time stream loop helper for high-throughput event processing.

Parameters:
collect_batch()[source]

Collect up to batch_size messages bounded by max_wait_ms.

Return type:

list[KafkaMessage]

process_once(handler, *, output_topic=None)[source]

Run one batch cycle and return the number of consumed messages.

Return type:

int

Handler return options

  • None: consume-only, no output publish.

  • Sequence[KafkaPublishRequest]: explicit per-message publish plans.

  • Sequence[Any]: publish raw values to output_topic.

run(handler, *, output_topic=None, iterations=None)[source]

Run repeated batch cycles. Returns total consumed message count.

Return type:

int

class ractogateway.CostAwareRouter(tiers)[source]

Bases: object

Routes LLM requests to the appropriate model tier based on message complexity — without making any extra API calls.

Parameters:

tiers (list[RoutingTier]) – Ordered list of RoutingTier objects, sorted ascending by max_score (cheapest first). The last tier’s max_score should be 100 to act as fallback.

Raises:
  • ValueError – If tiers is empty or not sorted ascending by max_score.

  • Example — 3-tier OpenAI ladder:: – from ractogateway.routing import CostAwareRouter, RoutingTier router = CostAwareRouter([ RoutingTier(model=”gpt-4o-mini”, max_score=30), RoutingTier(model=”gpt-4o”, max_score=70), RoutingTier(model=”o3-mini”, max_score=100), ]) model = router.route(“What is 2+2?”) # → “gpt-4o-mini” model = router.route(“Analyze the trade-offs between Redis Cluster and ” “Cassandra for a write-heavy time-series workload …”) # → “o3-mini”

  • Example — binary routing (2 tiers):: – router = CostAwareRouter([ RoutingTier(model=”claude-haiku-4-5-20251001”, max_score=40), RoutingTier(model=”claude-opus-4-6”, max_score=100), ])

score(text)[source]

Compute a complexity score in [0, 100] for text.

A higher score means a more complex task.

Return type:

int

Algorithm

token_pts = min(len(text)//4, SAT) * (MAX_TP / SAT) kw_pts = min(matches * PPK, MAX_KP) score = clamp(token_pts + kw_pts, 0, 100)

route(text)[source]

Return the model identifier for text.

Walks tiers (cheapest first) and returns the first model whose max_score complexity_score. Always returns a model because the last tier has max_score == 100 (validated at construction).

Complexity: O(k) where k = number of tiers.

Return type:

str

property tiers: tuple[RoutingTier, ...]

Immutable view of the configured tiers.

class ractogateway.RoutingTier(**data)[source]

Bases: BaseModel

One tier in the cost-aware routing ladder.

The router evaluates a complexity score (0-100) for each incoming message and selects the first tier whose max_score is >= that score. The last tier in the list always acts as the catch-all fallback.

Parameters:
  • model (str) – The LLM model identifier to use for requests that fall in this tier (e.g. "gpt-4o-mini", "gemini-2.0-flash", "claude-haiku-4-5-20251001").

  • max_score (float) – Inclusive upper bound on the complexity score that routes to this model. Range: 0-100. Set to 100 for the last (most powerful) tier so it catches everything.

Examples

tiers = [
    RoutingTier(model="gpt-4o-mini",  max_score=30),
    RoutingTier(model="gpt-4o",        max_score=70),
    RoutingTier(model="o3-mini",        max_score=100),
]

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model: str
max_score: float
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.TokenTruncator(config=None)[source]

Bases: object

Smart conversation-history trimmer.

Parameters:

config (TruncationConfig | None) – TruncationConfig instance. If omitted a default config is used (approximate counter, 8 k limit).

Examples

from ractogateway.truncation import TokenTruncator, TruncationConfig
import tiktoken

enc = tiktoken.encoding_for_model("gpt-4o")
truncator = TokenTruncator(
    TruncationConfig(
        token_counter=lambda t: len(enc.encode(t)),
        keep_first_n=2,
        keep_last_n=8,
    )
)
kit = OpenAIDeveloperKit(model="gpt-4o", truncator=truncator)
truncate(chat_config, model)[source]

Return a copy of chat_config with trimmed history if necessary.

If the total estimated token count (system prompt + history + user_message) fits within the model’s context limit, the original ChatConfig is returned unchanged.

Parameters:
  • chat_config (ChatConfig) – The chat configuration to potentially truncate.

  • model (str) – The resolved model name used to look up the context-window limit.

Return type:

ChatConfig

Returns:

ChatConfig – A new ChatConfig instance with (possibly shorter) history. The user_message and all other fields are preserved verbatim.

estimate_tokens(text)[source]

Convenience wrapper around the configured token counter.

Return type:

int

class ractogateway.TruncationConfig(**data)[source]

Bases: BaseModel

Configuration for TokenTruncator.

Parameters:
  • max_context_tokens (int | None) – Hard cap on total prompt tokens before calling the API. When None, the truncator looks up the model in MODEL_CONTEXT_LIMITS (falling back to 8 192).

  • keep_first_n (int) – Number of history messages to always preserve from the start of the conversation (anchors context). Defaults to 2.

  • keep_last_n (int) – Number of history messages to always preserve from the most recent end of the conversation. Defaults to 6.

  • token_counter (Callable[[str], int]) –

    Callable (text: str) -> int. Defaults to the built-in approximate counter (len // 4). Swap for tiktoken for exact OpenAI token counts:

    import tiktoken
    enc = tiktoken.encoding_for_model("gpt-4o")
    config = TruncationConfig(token_counter=lambda t: len(enc.encode(t)))
    

  • safety_margin (int) – Extra token budget reserved beyond the system prompt and user message. Defaults to 512.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_context_tokens: int | None
keep_first_n: int
keep_last_n: int
token_counter: Callable[[str], int]
safety_margin: int
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

resolve_limit(model)[source]

Return the effective token limit for model.

Priority: max_context_tokensMODEL_CONTEXT_LIMITS lookup → _DEFAULT_CONTEXT.

Return type:

int

model_post_init(_TruncationConfig__context)[source]

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Return type:

None

class ractogateway.RactoCeleryWorker(app, *, kit, rag=None, retry_config=None)[source]

Bases: object

Celery-backed task queue wrapper for RactoGateway developer kits.

Parameters:
  • app (Any) – A pre-configured celery.Celery instance with broker and backend already set. You create and configure it yourself so you retain full control over serializers, routing, concurrency, etc.

  • kit (Any) – Any RactoGateway developer kit — OpenAIDeveloperKit, GoogleDeveloperKit, or AnthropicDeveloperKit. The kit’s default_prompt is used by generation tasks (prompts are not serialisable over the broker).

  • rag (Any | None) – Optional RactoRAG instance. Required only when calling ingest_document().

  • retry_config (RetryConfig | None) – Exponential-backoff configuration. Defaults are applied when None.

generate(user_message, *, temperature=0.0, max_tokens=4096, history=None, extra=None)[source]

Enqueue an LLM generation task and return immediately.

The task automatically retries with exponential backoff on transient errors (RactoGatewayTimeoutError and RactoGatewayAPIError).

Parameters:
  • user_message (str) – The prompt text for this generation.

  • temperature (float) – Sampling temperature passed to the LLM.

  • max_tokens (int) – Maximum completion tokens.

  • history (list[dict[str, str]] | None) – Previous conversation turns as [{"role": …, "content": …}, …]. Use this to continue a multi-turn dialogue asynchronously.

  • extra (dict[str, Any] | None) – Provider-specific pass-through parameters (top_p, seed, …).

Return type:

Any

Returns:

celery.result.AsyncResult – Call .id for the task UUID. Use wait() or get_result() to retrieve the outcome.

Note

The prompt is not an argument because Pydantic models containing type[BaseModel] fields cannot be serialised over the message broker. Set your prompt on the kit’s default_prompt at construction time.

ingest_document(path, **metadata)[source]

Enqueue a background RAG document-ingestion task.

The full read chunk process embed store pipeline runs in a Celery worker. Your web request returns immediately with an AsyncResult whose .id you can poll later.

Parameters:
  • path (str) – Absolute or relative path to the file to ingest. The string is passed as-is to ingest().

  • **metadata (Any) – Extra key-value metadata merged into every chunk’s ChunkMetadata.extra.

Return type:

Any

Returns:

celery.result.AsyncResult

Raises:

RuntimeError – If rag was not provided to RactoCeleryWorker.

parallel_batch(items, *, temperature=0.0, max_tokens=4096)[source]

Fan a list of items out to the worker pool in parallel.

Uses Celery group() so all tasks are submitted at once and run concurrently across available workers.

Parameters:
  • items (list[BatchItem]) – A list of BatchItem objects. Each item’s user_message becomes one generation task.

  • temperature (float) – Shared sampling temperature for all items.

  • max_tokens (int) – Shared max-tokens limit for all items.

Return type:

Any

Returns:

celery.result.GroupResult – Call wait_parallel() to block until all tasks finish, or iterate .results for individual AsyncResult objects.

get_result(task_id)[source]

Return the current state of a task without blocking.

Parameters:

task_id (str) – The UUID returned by generate(), ingest_document(), or the .id attribute of an AsyncResult.

Return type:

TaskResult

Returns:

TaskResultstatus will be PENDING if the task has not started yet.

wait(task_id, *, timeout_s=None)[source]

Block until a task completes (or times out) and return its result.

Parameters:
  • task_id (str) – The task UUID from generate() or ingest_document().

  • timeout_s (float | None) – Maximum seconds to wait. None = wait indefinitely.

Return type:

TaskResult

Returns:

TaskResultresult.ok is True on success; result.error is set on failure or timeout.

wait_parallel(group_result, *, timeout_s=None)[source]

Block until all tasks from parallel_batch() complete.

Parameters:
  • group_result (Any) – The celery.result.GroupResult returned by parallel_batch().

  • timeout_s (float | None) – Maximum seconds to wait for the entire group. None = wait indefinitely.

Return type:

list[TaskResult]

Returns:

list[TaskResult] – One TaskResult per item, in submission order. Inspect each .ok / .error individually.

class ractogateway.RetryConfig(**data)[source]

Bases: BaseModel

Exponential-backoff retry policy for RactoCeleryWorker tasks.

The backoff delay for attempt n (0-based) is:

delay = min(initial_delay_s * backoff_factor ** n, max_delay_s)

With the defaults this gives: 2 s → 4 s → 8 s (then stops at max_retries=3).

Parameters:
  • max_retries (int) – Maximum number of retry attempts after the first failure. 0 disables retries entirely.

  • initial_delay_s (float) – Countdown (seconds) before the first retry.

  • backoff_factor (float) – Multiplier applied on each successive retry. Must be > 1.

  • max_delay_s (float) – Upper bound on the countdown (seconds). Prevents extremely long waits after many retries.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_retries: int
initial_delay_s: float
backoff_factor: float
max_delay_s: float
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.TaskResult(**data)[source]

Bases: BaseModel

Unified result returned by wait() and get_result().

Parameters:
  • task_id (str) – The Celery task UUID.

  • status (TaskStatus) – Current TaskStatus.

  • result (Any | None) –

    Deserialised task output on success:

    • For generate() — a dict matching LLMResponse.model_dump()`.

    • For ingest_document() — a list of Chunk.model_dump()` dicts.

  • error (str | None) – Exception message string on failure; None on success.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

task_id: str
status: TaskStatus
result: Any | None
error: str | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property ok: bool

True when the task succeeded and produced a result.

class ractogateway.TaskStatus(*values)[source]

Bases: str, Enum

Lifecycle state of a Celery task.

Maps directly to Celery’s native task states so you can compare them without importing Celery’s own constants.

PENDING = 'pending'
STARTED = 'started'
SUCCESS = 'success'
FAILURE = 'failure'
RETRY = 'retry'
REVOKED = 'revoked'
class ractogateway.AnthropicBatchProcessor(model='claude-haiku-4-5-20251001', *, api_key=None, default_prompt=None)[source]

Bases: object

Submit thousands of Claude requests to Anthropic’s Message Batches API at ~50 % of standard API cost.

Parameters:
  • model (str) – Claude model for all items (e.g. "claude-haiku-4-5-20251001").

  • api_key (str | None) – Anthropic API key. Falls back to ANTHROPIC_API_KEY env var.

  • default_prompt (RactoPrompt | None) – RACTO prompt used as the system message for every batch item.

submit_batch / asubmit_batch:

Create a batch job → returns BatchJobInfo.

poll_status / apoll_status:

Fetch current job state → returns updated BatchJobInfo.

get_results / aget_results:

Stream and parse completed job results → list[BatchResult].

submit_and_wait / asubmit_and_wait:

Convenience: submit + poll until done + return results.

provider: str = 'anthropic'
submit_batch(items, *, prompt=None)[source]

Create an Anthropic Message Batch job.

Returns immediately with a BatchJobInfo (status = IN_PROGRESS).

Return type:

BatchJobInfo

poll_status(job_id)[source]

Fetch the current status of batch job job_id.

Return type:

BatchJobInfo

get_results(job_id)[source]

Stream and parse results for a completed batch job.

Raises:

RuntimeError – If the job is not yet completed.

Return type:

list[BatchResult]

submit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]

Submit a batch and block until it completes, then return results.

Parameters:
  • poll_interval_s (float) – Seconds between status-poll API calls. Default 60.0.

  • max_wait_s (float) – Maximum total wait. Default 86400 (24 h).

Return type:

list[BatchResult]

async asubmit_batch(items, *, prompt=None)[source]

Async variant of submit_batch().

Return type:

BatchJobInfo

async apoll_status(job_id)[source]

Async variant of poll_status().

Return type:

BatchJobInfo

async aget_results(job_id)[source]

Async variant of get_results().

Return type:

list[BatchResult]

async asubmit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]

Async variant of submit_and_wait().

Return type:

list[BatchResult]

class ractogateway.BatchItem(**data)[source]

Bases: BaseModel

A single request within a batch job.

Parameters:
  • custom_id (str) – User-supplied identifier used to correlate results. Must be unique within a batch.

  • user_message (str) – The end-user’s query string (equivalent to ChatConfig.user_message).

  • temperature (float) – Sampling temperature. Defaults to 0.0.

  • max_tokens (int) – Maximum tokens for the completion. Defaults to 4096.

  • extra (dict[str, Any]) – Provider-specific pass-through kwargs.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

custom_id: str
user_message: str
temperature: float
max_tokens: int
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.BatchJobInfo(**data)[source]

Bases: BaseModel

Metadata about a submitted batch job.

Returned by submit_batch() and poll_status().

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

job_id: str
provider: str
status: BatchStatus
created_at: float
request_count: int
raw: Any
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.BatchResult(**data)[source]

Bases: BaseModel

The outcome of a single BatchItem.

A result is always present in the results list returned by get_results(); check error to detect failures.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

custom_id: str
response: LLMResponse | None
error: str | None
raw: Any
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property ok: bool

True when the request succeeded (no error, response present).

class ractogateway.BatchStatus(*values)[source]

Bases: str, Enum

Processing state of a batch job.

Maps to the union of OpenAI and Anthropic batch status strings.

PENDING = 'pending'
IN_PROGRESS = 'in_progress'
FINALIZING = 'finalizing'
COMPLETED = 'completed'
FAILED = 'failed'
EXPIRED = 'expired'
CANCELLING = 'cancelling'
CANCELLED = 'cancelled'
class ractogateway.OpenAIBatchProcessor(model='gpt-4o-mini', *, api_key=None, base_url=None, default_prompt=None)[source]

Bases: object

Submit thousands of chat-completion requests to OpenAI’s Batch API at ~50 % of standard API cost.

Parameters:
  • model (str) – Chat model to use for all items in a batch (e.g. "gpt-4o-mini").

  • api_key (str | None) – OpenAI API key. Falls back to OPENAI_API_KEY env var.

  • base_url (str | None) – Custom base URL (Azure OpenAI / proxy).

  • default_prompt (RactoPrompt | None) – RACTO prompt used as the system message for every batch item.

submit_batch / asubmit_batch:

Upload JSONL and create batch job → returns BatchJobInfo.

poll_status / apoll_status:

Fetch current job state → returns updated BatchJobInfo.

get_results / aget_results:

Download and parse completed job results → list[BatchResult].

submit_and_wait / asubmit_and_wait:

Convenience: submit + poll until done + return results.

provider: str = 'openai'
submit_batch(items, *, prompt=None, completion_window='24h')[source]

Upload items as a JSONL file and create an OpenAI batch job.

Returns immediately with a BatchJobInfo (status = IN_PROGRESS).

Return type:

BatchJobInfo

poll_status(job_id)[source]

Fetch the current status of batch job job_id.

Return type:

BatchJobInfo

get_results(job_id)[source]

Download and parse results for a completed batch job.

Raises:

RuntimeError – If the job is not yet completed.

Return type:

list[BatchResult]

submit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]

Submit a batch and block until it completes, then return results.

Parameters:
  • poll_interval_s (float) – Seconds between status-poll API calls. Default 60.0.

  • max_wait_s (float) – Maximum total seconds to wait. Default 86400 (24 h).

Raises:
  • TimeoutError – If the batch does not complete within max_wait_s.

  • RuntimeError – If the batch job fails or is cancelled.

Return type:

list[BatchResult]

async asubmit_batch(items, *, prompt=None, completion_window='24h')[source]

Async variant of submit_batch().

Return type:

BatchJobInfo

async apoll_status(job_id)[source]

Async variant of poll_status().

Return type:

BatchJobInfo

async aget_results(job_id)[source]

Async variant of get_results().

Return type:

list[BatchResult]

async asubmit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]

Async variant of submit_and_wait().

Return type:

list[BatchResult]

class ractogateway.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: SQLAnalystPipeline

Async-first variant of SQLAnalystPipeline.

run() is a coroutine — use await pipeline.run(...) directly. Designed for FastAPI, aiohttp, and other async frameworks.

All constructor parameters and run() parameters are identical to SQLAnalystPipeline.

Example:

from ractogateway.pipelines import AsyncSQLAnalystPipeline
from ractogateway.openai_developer_kit import Chat

pipeline = AsyncSQLAnalystPipeline(
    kit=Chat(model="gpt-4o"),
    pandas_kit=Chat(model="gpt-3.5-turbo"),
    max_rows=5_000,
    safe_mode=True,
)

# In an async context:
result = await pipeline.run(
    user_query="Top 5 products by quantity sold?",
    connection_string="postgresql://user:pass@localhost/shop",
)
if result.error:
    print("Error:", result.error)
else:
    print(result.answer)
    result.plotly_figure.show()
async run(user_query, **kwargs)[source]

Async run() — delegates to SQLAnalystPipeline.arun().

Return type:

SQLAnalystResult

class ractogateway.ChartSpec(**data)[source]

Bases: BaseModel

Specification for a Plotly chart.

Pass to SQLAnalystPipeline as chart=ChartSpec(...) or as a plain dict (e.g. chart={"chart_type": "bar", "x": "customer", "y": "revenue"}). Use chart="auto" to let the pipeline infer the best chart type from the DataFrame’s column dtypes with no extra LLM call.

Supported chart types

bar · line · scatter · pie · histogram · box · area · heatmap · violin · funnel

Example:

from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

result = pipeline.run(
    user_query="Top 5 customers by revenue?",
    ...,
    chart=ChartSpec(chart_type="bar", x="customer_name", y="revenue",
                    title="Top 5 Customers"),
)
result.plotly_figure.show()

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

chart_type: str
x: str | None
y: str | list[str] | None
color: str | None
title: str
x_label: str | None
y_label: str | None
orientation: Literal['v', 'h'] | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.PipelineUsage(**data)[source]

Bases: BaseModel

Aggregated token usage across all LLM calls in the pipeline.

Tracks each step (SQL generation, pandas code generation, markdown answer generation) separately so you can see exactly where tokens are consumed.

Properties

total_input_tokens:

Sum of all prompt tokens across every LLM step.

total_output_tokens:

Sum of all completion tokens across every LLM step.

total_tokens:

Grand total of every token consumed by the pipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

sql_input_tokens: int
sql_output_tokens: int
pandas_input_tokens: int
pandas_output_tokens: int
answer_input_tokens: int
answer_output_tokens: int
property total_input_tokens: int
property total_output_tokens: int
property total_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception ractogateway.RateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter denies a request for a given user.

exception ractogateway.ReadOnlyViolationError[source]

Bases: ValueError

Raised when generated SQL contains a write operation in force_read_only mode.

class ractogateway.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: object

Natural-language to SQL + pandas + Markdown answer + chart pipeline.

Converts a plain-English question into:

  1. A read-only SQL query (LLM step — sql_kit)

  2. Pandas analysis code executed against the SQL result (LLM step — pandas_kit)

  3. A rich Markdown answer with table + insights (LLM step — answer_kit)

  4. An optional Plotly figure built deterministically from a ChartSpec (zero LLM calls — pure dtype heuristics or user-provided spec)

Two variants

  • SQLAnalystPipelinerun() sync, arun() async.

  • AsyncSQLAnalystPipelinerun() is async (same as arun()).

type kit:

Any

param kit:

Default LLM kit used for any step that doesn’t have its own kit.

type sql_kit:

Any | None

param sql_kit:

Override kit for SQL generation. Falls back to kit.

type pandas_kit:

Any | None

param pandas_kit:

Override kit for pandas code generation. Falls back to kit.

type answer_kit:

Any | None

param answer_kit:

Override kit for Markdown answer generation. Falls back to kit.

type answer_prompt:

RactoPrompt | None

param answer_prompt:

Override default system prompts for each step.

type sql_max_tokens:

int

param sql_max_tokens:

LLM settings for the SQL step (default: 0.0 / 1024).

type pandas_max_tokens:

int

param pandas_max_tokens:

LLM settings for the pandas step (default: 0.0 / 2048).

type answer_max_tokens:

int

param answer_max_tokens:

LLM settings for the answer step (default: 0.3 / 2048).

type run_pandas:

bool

param run_pandas:

Run pandas analysis step by default (default: True).

type run_answer:

bool

param run_answer:

Run Markdown answer step by default (default: True).

type chart:

ChartSpec | dict[str, Any] | str | None

param chart:

Default chart behaviour: "auto" (infer from data), a ChartSpec, a plain dict, or None to skip charts. Default: "auto".

type force_read_only:

bool

param force_read_only:

Block any non-SELECT SQL (default: True).

type tracer:

Any | None

param tracer:

Optional RactoTracer instance.

type metrics:

Any | None

param metrics:

Optional GatewayMetricsMiddleware instance.

type engine:

Any | None

param engine:

Optional pre-built SQLAlchemy Engine (e.g. with connection pooling). When provided, connection_string / host / port / etc. params in run() are ignored.

type max_sql_retries:

int

param max_sql_retries:

Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default: 2.

type max_rows:

int

param max_rows:

Safety cap on returned rows — auto-injects LIMIT {max_rows} into the SQL if no LIMIT is already present. Set to 0 to disable. Default: 10_000.

type schema_cache_ttl:

float

param schema_cache_ttl:

Seconds to cache the schema introspection result in-process. Set to 0 to disable caching. Default: 3600 (1 hour).

type allowed_tables:

list[str] | None

param allowed_tables:

Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.

type blocked_columns:

list[str] | None

param blocked_columns:

Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like ssn or credit_card_number.

type mask_columns:

list[str] | None

param mask_columns:

Column names whose values are replaced with "***MASKED***" in result rows before they are returned or passed to the answer LLM.

type table_docs:

dict[str, str] | None

param table_docs:

{table_name: description} — appended as inline schema comments so the LLM understands table business meaning.

type column_docs:

dict[str, dict[str, str]] | None

param column_docs:

{table_name: {column_name: description}} — per-column inline comments.

type safe_mode:

bool

param safe_mode:

When True, all exceptions are caught and returned as SQLAnalystResult(error=...) instead of being raised. Default: False.

type memory:

Any | None

param memory:

Optional conversation memory object (e.g. RedisChatMemory). Must implement get_history(session_id) -> list[dict] and append(session_id, role, content).

type rate_limiter:

Any | None

param rate_limiter:

Optional rate-limiter object (e.g. RedisRateLimiter). Must implement check_and_consume(user_id, tokens) -> bool and get_remaining(user_id) -> int.

type user_id:

str | None

param user_id:

Default user identifier used for rate limiting and audit. Can be overridden per-call in run() / arun().

param Example:::

from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

pipeline = SQLAnalystPipeline(

kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,

) result = pipeline.run(

user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,

) if result.error:

print(“Pipeline error:”, result.error)

else:

print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)

run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Run the full pipeline synchronously.

Parameters:
  • user_query (str) – Plain-English question to answer from the database.

  • connection_string (str | None) – Full SQLAlchemy URI. Ignored when engine is provided.

  • driver (str) – Individual connection params used when both connection_string and engine are omitted.

  • engine (Any) – Per-call pre-built SQLAlchemy Engine. Overrides the pipeline-level engine and all connection params.

  • schema (str | None) – Pre-computed schema string. None → fetched automatically (with optional cache).

  • force_read_only (bool | None) – Override the corresponding pipeline-level defaults for this call.

  • / (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)

  • answer_max_tokens (int | None) – Per-call LLM setting overrides.

  • max_rows (int | None) – Per-call row limit override. Overrides the pipeline-level max_rows.

  • user_id (str | None) – Per-call user ID for rate limiting and audit.

  • session_id (str | None) – Conversation session ID used to fetch and save memory context.

Return type:

SQLAnalystResult

Returns:

SQLAnalystResult

async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Async variant of run() — identical parameters.

Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async achat() method.

Return type:

SQLAnalystResult

class ractogateway.SQLAnalystResult(**data)[source]

Bases: BaseModel

Result returned by SQLAnalystPipeline.

All fields except user_query have sensible defaults so that a partial result can be returned when safe_mode=True and an error occurs.

Fields

user_query:

The original natural-language question.

schema_used:

The database schema string that was passed to (or fetched for) the LLM.

sql_query:

The generated (and possibly LIMIT-injected) SQL SELECT statement.

columns:

Column names returned by the SQL query.

row_count:

Number of rows in raw_rows.

raw_rows:

All rows from the SQL result as a list of dicts.

pandas_code:

The LLM-generated pandas analysis code (None if run_pandas=False).

pandas_result:

Output of executing pandas_code — DataFrame, scalar, or any value assigned to result inside the code. None if run_pandas=False.

answer:

Rich Markdown answer written by the LLM, including a results table and key insights. None if run_answer=False.

chart_spec:

The ChartSpec dict used to build the Plotly figure. None if no chart was requested.

plotly_figure:

A plotly.graph_objects.Figure object ready to call .show() or .to_html(). None if no chart was requested or plotly is not installed.

usage:

Aggregated token counts for all LLM steps in the pipeline.

error:

Set when safe_mode=True and an exception occurs. None means the pipeline completed successfully.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_query: str
schema_used: str
sql_query: str
columns: list[str]
row_count: int
raw_rows: list[dict[str, Any]]
pandas_code: str | None
pandas_result: Any | None
answer: str | None
chart_spec: dict[str, Any] | None
plotly_figure: Any | None
usage: PipelineUsage
error: str | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

to_csv(path=None)[source]

Export the raw SQL result rows to CSV.

Does not require pandas — uses the standard-library csv module.

Parameters:

path (str | None) – File path to write to. When None the CSV string is returned.

Return type:

str | None

Returns:

str | None – CSV string when path is None; otherwise None (file written).

to_json(path=None, *, indent=2)[source]

Export the raw SQL result rows to JSON.

Parameters:
  • path (str | None) – File path to write to. When None the JSON string is returned.

  • indent (int) – JSON indentation level (default: 2).

Return type:

str | None

Returns:

str | None – JSON string when path is None; otherwise None (file written).

to_excel(path, *, sheet_name='Results')[source]

Export the raw SQL result rows to an Excel file.

Requires pandas and openpyxl:

pip install ractogateway[pipelines-sql] openpyxl
Parameters:
  • path (str) – File path to write to (must end in .xlsx).

  • sheet_name (str) – Excel sheet name (default: "Results").

Return type:

None

class ractogateway.AsyncListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]

Bases: ListClassifierPipeline

Async-first variant of ListClassifierPipeline.

run() is a coroutine — await pipeline.run(...) directly. Designed for FastAPI, aiohttp, Starlette, and other async frameworks.

Constructor and all run() parameters are identical to ListClassifierPipeline.

Example

pipeline = AsyncListClassifierPipeline.from_provider(
    "openai", "gpt-4o-mini",
    options=["Billing", "Support", "Sales"],
    safe_mode=True,
)

# FastAPI handler:
@app.post("/classify")
async def classify(query: str):
    result = await pipeline.run(query)
    return result.as_dict()
async run(user_query, **kwargs)[source]

Async run() — delegates to ListClassifierPipeline.arun().

Return type:

ClassifierResult | str | dict[str, Any]

class ractogateway.AuditEntry(**data)[source]

Bases: BaseModel

Immutable audit record emitted to the audit_logger after every call.

Emitted regardless of whether the call was served from cache, hit an error, or was a live LLM classification. Provides a complete picture of every request for compliance, debugging, and analytics.

Fields

timestamp:

ISO 8601 UTC timestamp of when the call was made (e.g. "2026-02-26T14:23:01.456789Z").

user_query:

Original natural-language query.

options_provided:

Full candidate list shown to the LLM (including uncertain_label if one was configured).

selected:

Option(s) chosen by the LLM, or empty on error.

confidences:

Per-selection confidence scores, or None.

all_scores:

Score for every option (when score_all=True), or None.

reasoning:

LLM explanation (when include_reasoning=True), or None.

fuzzy_corrected:

True when the LLM returned a near-miss that was fuzzy-matched.

uncertain:

True when the LLM selected the uncertain_label option.

cache_hit:

"exact" or "semantic" when the result was served from cache; None when a live LLM call was made.

user_id:

User identifier passed to the pipeline (for rate limiting / audit).

session_id:

Conversation session identifier (for memory context).

latency_ms:

Wall-clock latency of the entire pipeline call in milliseconds (near-zero for cache hits).

usage:

Token usage dict — keys: input_tokens, output_tokens, total_tokens, retry_count. All zero on cache hits.

error:

Non-None when safe_mode=True and an exception occurred.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

timestamp: str
user_query: str
options_provided: list[str]
selected: list[str]
confidences: list[float] | None
all_scores: dict[str, float] | None
reasoning: str | None
fuzzy_corrected: bool
uncertain: bool
cache_hit: str | None
user_id: str | None
session_id: str | None
latency_ms: float
usage: dict[str, int]
error: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception ractogateway.ClassifierRateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter denies a request for a given user.

class ractogateway.ClassifierResult(**data)[source]

Bases: BaseModel

Result returned by ListClassifierPipeline.

All fields except user_query and options_provided have sensible defaults so that a partial result can be returned when safe_mode=True and an error occurs mid-pipeline.

Fields

user_query:

The original natural-language query passed to the pipeline.

options_provided:

The full list of candidate strings presented to the LLM (including the injected uncertain_label option if one was configured).

selected:

The option(s) chosen by the LLM, ordered by descending confidence. Empty when an error occurred and safe_mode=True.

confidences:

Per-selection confidence scores in [0.0, 1.0], aligned with selected. None when include_confidence=False.

all_scores:

Confidence score for every option in the list, keyed by option string. None when score_all=False (the default).

reasoning:

Brief natural-language explanation produced by the LLM. None when include_reasoning=False.

fuzzy_corrected:

True when the LLM returned a near-miss that was corrected by the built-in fuzzy matcher without consuming a retry.

uncertain:

True when the LLM selected the uncertain_label option, indicating no real option matched the query well enough.

cache_hit:

"exact" or "semantic" when served from cache; None for a live LLM call.

usage:

Aggregated token counts and retry statistics for this call.

error:

Non-None only when safe_mode=True and an exception occurred. When error is set, selected will be empty.

Examples

>>> result.first                         # "Billing"
>>> result.top_confidence                # 0.95
>>> result.as_string()                   # "Billing, Account"
>>> result.as_dict()                     # {"selected": ["Billing"], ...}
>>> result.as_enum()["Billing"].value    # "Billing"
>>> result.uncertain                     # False
>>> result.cache_hit                     # "exact" | "semantic" | None

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_query: str
options_provided: list[str]
selected: list[str]
confidences: list[float] | None
all_scores: dict[str, float] | None
reasoning: str | None
fuzzy_corrected: bool
uncertain: bool
cache_hit: str | None
usage: ClassifierUsage
error: str | None
property first: str | None

The first (highest-priority) selected option, or None if empty.

property top_confidence: float | None

Confidence score for the first selected option, or None.

property is_empty: bool

True when no options were selected (including error cases).

as_string(separator=', ')[source]

Return selected options as a single joined string.

Parameters:

separator (str) – Delimiter placed between items. Default: ", ".

Return type:

str

Returns:

str – E.g. "Billing, Account" for two selections.

as_dict()[source]

Return a plain dict with selected options and optional metadata.

Always contains "selected". "confidences", "all_scores", and "reasoning" are included only when they are non-None.

Return type:

dict[str, Any]

Returns:

dict[str, Any]

Example:

{
    "selected":    ["Billing", "Account"],
    "confidences": [0.95, 0.82],
    "all_scores":  {"Billing": 0.95, "Account": 0.82, "Sales": 0.12},
    "reasoning":   "Both topics are mentioned explicitly.",
}

as_enum(name='SelectedOptions')[source]

Return a dynamic Python enum.Enum of the selected options.

Parameters:

name (str) – Class name for the generated Enum. Default: "SelectedOptions".

Return type:

type[Enum]

Returns:

type[Enum] – An Enum whose members have the option string as both name and value.

Example

>>> E = result.as_enum()
>>> E["Billing"].value   # "Billing"
top_n(n)[source]

Return the top-n selected options.

Parameters:

n (int) – Maximum number of options to return.

Return type:

list[str]

score_for(option)[source]

Return the confidence score for a specific option, or None.

Searches all_scores first (all options, when score_all=True), then confidences for selected items.

Parameters:

option (str) – The option string to look up.

Return type:

float | None

to_audit_entry(*, timestamp, user_id=None, session_id=None, latency_ms=0.0)[source]

Build an AuditEntry from this result.

Called automatically by the pipeline — exposed here so that users can reconstruct audit entries from stored results if needed.

Return type:

AuditEntry

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.ClassifierUsage(**data)[source]

Bases: BaseModel

Token usage and retry statistics for a single pipeline call.

Properties

total_tokens:

Sum of input_tokens and output_tokens across all LLM attempts, including automatic retries triggered by invalid LLM responses. Zero on a cache hit (no LLM call was made).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

input_tokens: int
output_tokens: int
retry_count: int
property total_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.ListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]

Bases: object

Map a natural-language query to one or more items from a candidate list.

Supports every RactoGateway provider via the kit parameter or the from_provider() class factory. Internally builds a dynamic Python enum.Enum from the options list and validates every LLM response against it — hallucinations and paraphrased answers are caught, fuzzy- corrected if close enough, and retried otherwise.

Two variants

  • ListClassifierPipelinerun() sync, arun() async.

  • AsyncListClassifierPipelinerun() is async only.

type kit:

Any

param kit:

Any RactoGateway developer kit (OpenAI, Anthropic, Google, Ollama, HuggingFace). Must expose .chat(ChatConfig) and .achat(ChatConfig) methods. Use from_provider() instead of constructing kits manually when you only need provider + model.

type options:

list[str] | None

param options:

Default candidate strings. Can be overridden per-call. Must be non-empty and duplicate-free when provided.

type selection_mode:

Literal['single', 'multiple']

param selection_mode:

"single" (default) — exactly one option. "multiple" — one or more options. Overridable per-call.

type output_format:

Literal['string', 'dict', 'pydantic']

param output_format:

"pydantic" (default) — ClassifierResult. "string" — comma-joined string. "dict" — plain dict. Overridable per-call.

type prompt:

RactoPrompt | None

param prompt:

Custom RactoPrompt to replace the built-in system prompt.

type temperature:

float

param temperature:

LLM temperature. Default 0.0 for deterministic output.

type max_tokens:

int

param max_tokens:

Response token budget. Default 512.

type max_retries:

int

param max_retries:

Retry attempts when LLM returns invalid JSON / unknown option. Default 2.

type include_confidence:

bool

param include_confidence:

Ask LLM for per-selection confidence scores [0.0–1.0]. Default True.

type include_reasoning:

bool

param include_reasoning:

Ask LLM for a one-sentence explanation. Default False.

type score_all:

bool

param score_all:

Ask LLM for a score for every option (not just selected ones). Stored in result.all_scores. Default False.

type option_descriptions:

dict[str, str] | None

param option_descriptions:

{option: description} — shown inline next to each option in the prompt to help the LLM distinguish similar categories.

type fuzzy_fallback:

bool

param fuzzy_fallback:

Use stdlib difflib to correct near-miss LLM responses before consuming a retry. Default True.

type uncertain_label:

str | None

param uncertain_label:

When set, this string is appended as an extra option that the LLM can pick when nothing matches (e.g. "Other / None of the above"). result.uncertain is True when this label is selected.

type confidence_threshold:

float | None

param confidence_threshold:

Drop selections below this score. Keeps highest-confidence match as fallback. Default None (no filtering).

type case_sensitive:

bool

param case_sensitive:

Whether option matching is case-sensitive. Default False.

type safe_mode:

bool

param safe_mode:

Return ClassifierResult(error=...) instead of raising. Default False.

type tracer:

Any | None

param tracer:

Optional RactoTracer.

type metrics:

Any | None

param metrics:

Optional GatewayMetricsMiddleware.

type rate_limiter:

Any | None

param rate_limiter:

Duck-typed — check_and_consume(user_id, tokens) -> bool + get_remaining(user_id) -> int.

type memory:

Any | None

param memory:

Duck-typed — get_history(session_id) -> list[dict] + append(session_id, role, content).

type user_id:

str | None

param user_id:

Default user ID for rate limiting. Overridable per-call.

Example

# Via kit directly
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines import ListClassifierPipeline

pipeline = ListClassifierPipeline(
    kit=Chat(model="gpt-4o-mini"),
    options=["Billing", "Technical Support", "Sales"],
    include_confidence=True,
    include_reasoning=True,
)
result = pipeline.run("My invoice is wrong")
print(result.first, result.top_confidence)

# Via from_provider() — no manual kit import needed
pipeline = ListClassifierPipeline.from_provider(
    "anthropic", "claude-haiku-4-5-20251001",
    options=["Billing", "Technical Support", "Sales"],
)
classmethod from_provider(provider, model, *, api_key=None, base_url=None, options=None, **kwargs)[source]

Create a pipeline by specifying provider + model — no kit import needed.

Parameters:
  • provider (str) – One of "openai", "anthropic", "google", "ollama", "huggingface".

  • model (str) –

    Model identifier string, e.g.:

    • OpenAI: "gpt-4o-mini", "gpt-4o"

    • Anthropic: "claude-haiku-4-5-20251001", "claude-sonnet-4-6"

    • Google: "gemini-2.0-flash", "gemini-1.5-pro"

    • Ollama: "llama3.2", "mistral"

    • HuggingFace: "meta-llama/Llama-3.2-3B-Instruct"

  • api_key (str | None) – Provider API key. Falls back to the standard env var for each provider (e.g. OPENAI_API_KEY, ANTHROPIC_API_KEY).

  • base_url (str | None) – Custom endpoint — used for Ollama (http://localhost:11434) or OpenAI-compatible proxies.

  • options (list[str] | None) – Default candidate options list.

  • **kwargs (Any) – Any other ListClassifierPipeline constructor parameters (selection_mode, include_confidence, safe_mode, etc.).

Return type:

ListClassifierPipeline

Returns:

ListClassifierPipeline

Example

pipeline = ListClassifierPipeline.from_provider(
    "anthropic",
    "claude-haiku-4-5-20251001",
    options=["Billing", "Support", "Sales"],
    include_reasoning=True,
    safe_mode=True,
)
static make_enum(options, name='OptionsEnum')[source]

Build a standalone dynamic enum.Enum from an options list.

Useful when you want enum-typed values outside the pipeline.

Parameters:
  • options (list[str]) – List of option strings.

  • name (str) – Enum class name. Default "OptionsEnum".

Return type:

type[Enum]

Returns:

type[Enum]

Example

E = ListClassifierPipeline.make_enum(["Red", "Green", "Blue"])
E["Red"].value   # "Red"
get_options()[source]

Return the pipeline-level options list, or None if not set.

Return type:

list[str] | None

set_options(options)[source]

Replace the entire pipeline-level options list.

Thread-safe — safe to call while the pipeline is in use.

Parameters:

options (list[str]) – New options list. Must be non-empty and duplicate-free.

Return type:

None

add_option(option, description=None)[source]

Append a new option to the pipeline-level list.

Parameters:
  • option (str) – The option string to add.

  • description (str | None) – Optional inline description for the option.

Return type:

None

remove_option(option)[source]

Remove an option from the pipeline-level list.

Parameters:

option (str) – The option string to remove. Raises ValueError if not found.

Return type:

None

run(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]

Classify user_query synchronously.

Parameters:
  • user_query (str) – Natural-language query to classify.

  • options (list[str] | None) – Per-call override for the candidate list. Omit to use the pipeline-level list. Pass [] to get a ValueError.

  • selection_mode (Literal['single', 'multiple'] | None) – Per-call override — "single" or "multiple".

  • output_format (Literal['string', 'dict', 'pydantic'] | None) – Per-call override — "pydantic", "string", or "dict".

  • max_tokens (int | None) – Per-call LLM setting overrides.

  • confidence_threshold (float | None) – Per-call override. Pass None explicitly to disable filtering for this call even if a pipeline-level threshold is set.

  • session_id (str | None) – Conversation session ID for memory retrieval/storage.

  • user_id (str | None) – Per-call user ID for rate limiting and audit.

Return type:

ClassifierResult | str | dict[str, Any]

Returns:

ClassifierResult | str | dict – Type depends on output_format.

batch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]

Classify multiple queries synchronously, one after another.

Shares all per-call overrides across every query in the batch. Use abatch_run() to run them concurrently in async contexts.

Parameters:

queries (list[str]) – List of natural-language queries to classify.

Return type:

list[ClassifierResult | str | dict[str, Any]]

Returns:

list – One result per query, in the same order.

async arun(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]

Async variant of run() — identical parameters.

Return type:

ClassifierResult | str | dict[str, Any]

async abatch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None, max_concurrency=None)[source]

Classify multiple queries concurrently with asyncio.gather.

Parameters:
  • queries (list[str]) – List of natural-language queries.

  • max_concurrency (int | None) – Cap the number of simultaneous LLM calls. None (default) runs all queries in parallel. Set to e.g. 5 to avoid rate-limit errors on large batches.

Return type:

list[ClassifierResult | str | dict[str, Any]]

Returns:

list – Results in the same order as queries.