ractogateway
RactoGateway - unified AI SDK with anti-hallucination prompting via RACTO.
The package surface stays import-friendly while deferring heavy imports until a specific symbol is accessed.
- class ractogateway.MCPAgent(kit, registry, *, max_turns=10)[source]
Bases:
objectAgentic tool-execution loop compatible with all three developer kits.
Runs the LLM → tool-call → execute → continue loop automatically, returning the final
LLMResponseonce the LLM produces a non-tool response or max_turns is reached.- Parameters:
kit (
Any) – Any developer kit withchat()/achat()methods:OpenAIDeveloperKit,GoogleDeveloperKit, orAnthropicDeveloperKit.registry (
ToolRegistry) – Tool registry containing callables for each tool the LLM can call. Typically populated viaRactoMCPClient.to_registry()orMCPMultiClient.to_registry().max_turns (
int) – Maximum number of tool-call rounds before the loop stops and returns the last response. Prevents infinite recursion.
Example
from ractogateway.openai_developer_kit import OpenAIDeveloperKit from ractogateway.mcp import MCPAgent, RactoMCPClient, MCPClientConfig from ractogateway._models.chat import ChatConfig cfg = MCPClientConfig(transport="stdio", command="python", args=["-m", "my_server"]) reg = RactoMCPClient(cfg).list_tools_sync() kit = OpenAIDeveloperKit(model="gpt-4o") agent = MCPAgent(kit, reg, max_turns=6) result = agent.run(ChatConfig(user_message="Search for recent AI papers")) print(result.content)
- classmethod from_mcp(kit, configs, *, max_turns=10)[source]
Build an agent by fetching tools from one or more MCP servers.
Opens a one-shot connection per config, fetches tools, closes the connection, and constructs the agent with the merged registry.
Note
This is a sync classmethod; it uses
asyncio.run()and therefore cannot be called from within a running event loop. In async code, build the registry yourself:async with MCPMultiClient(configs) as multi: registry = await multi.to_registry() agent = MCPAgent(kit, registry)
- Parameters:
kit (
Any) – Any RactoGateway developer kit.configs (
list[MCPClientConfig]) – MCP server connection configs.max_turns (
int) – Maximum tool-call rounds.
- Return type:
- Returns:
MCPAgent – Ready to call
run()orarun().
- run(config)[source]
Run the agentic loop synchronously.
Injects the tool registry from this agent into config (overriding
config.toolsif already set).- Parameters:
config (
ChatConfig) – Initial chat config.promptmust be set here or on the kit.- Return type:
- Returns:
LLMResponse – Final response after tool calls are resolved.
- async arun(config)[source]
Run the agentic loop asynchronously.
Supports async tool callables (
async def); sync callables are called directly.- Parameters:
config (
ChatConfig) – Initial chat config.- Return type:
- Returns:
LLMResponse – Final response after tool calls are resolved.
- property registry: ToolRegistry
The
ToolRegistryused by this agent.
- property max_turns: int
Maximum number of tool-call rounds per
run()call.
- class ractogateway.MCPClientConfig(**data)[source]
Bases:
BaseModelConfiguration for connecting to an MCP server.
- Parameters:
transport (Literal['stdio', 'sse', 'streamable-http']) – MCP transport to use.
"stdio"is standard for subprocess-based servers (e.g. Claude Desktop)."sse"/"streamable-http"are for HTTP-based servers.command (str | None) – Executable to launch the server process — required for stdio.
args (list[str]) – Command-line arguments passed to command (stdio only).
env (dict[str, str]) – Extra environment variables injected into the server process (stdio only). Merged on top of the inherited environment.
url (str | None) – Server URL — required for sse / streamable-http. Example:
"http://localhost:8000/sse".
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- transport: Literal['stdio', 'sse', 'streamable-http']
- model_config: ClassVar[ConfigDict] = {'frozen': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.MCPMultiClient(configs)[source]
Bases:
objectConnect to multiple MCP servers and present them as a single tool surface.
Tools from all servers are merged into one flat namespace. If two servers advertise the same tool name, the later server’s definition wins (and a warning is embedded in the tool description noting the override).
Routing is O(1): an internal
dict[tool_name → server_index]maps each tool back to its origin server forcall_tooldispatch.- Parameters:
configs (
list[MCPClientConfig]) – OneMCPClientConfigper server. At least one config is required.
- async list_tools()[source]
Return the merged list of tool schemas from all servers.
- Return type:
- Returns:
list[ToolSchema] – Deduplicated (last-server-wins) tool schemas sorted by name.
- async call_tool(name, arguments=None)[source]
Call a tool on whichever server originally advertised it.
Routing is O(1) via the internal
tool_name → server_indexmap.- Parameters:
- Return type:
- Returns:
MCPToolResult – Tool output.
- Raises:
KeyError – If name is not in the merged tool namespace.
RuntimeError – If called outside an
async withblock.
- async to_registry()[source]
Return a merged
ToolRegistrywith remote callables.Each callable in the registry makes a fresh one-shot connection to the correct origin server when invoked. This keeps the registry self-contained and usable outside an
async withblock.- Return type:
- Returns:
ToolRegistry – Merged registry compatible with all three developer kits.
- list_tools_sync()[source]
Synchronous wrapper: connect all, list merged tools, disconnect all.
- Raises:
RuntimeError – If called from within a running event loop.
- Return type:
- call_tool_sync(name, arguments=None)[source]
Synchronous wrapper: connect all, call tool, disconnect all.
- Raises:
RuntimeError – If called from within a running event loop.
- Return type:
- property server_count: int
Number of configured MCP servers.
- class ractogateway.MCPServerConfig(**data)[source]
Bases:
BaseModelConfiguration for a
RactoMCPServerinstance.- Parameters:
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- name: str
- description: str
- version: str
- model_config: ClassVar[ConfigDict] = {'frozen': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.MCPToolResult(**data)[source]
Bases:
BaseModelResult returned from calling a remote MCP tool.
- Parameters:
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- content: str
- is_error: bool
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.RactoMCPClient(config)[source]
Bases:
objectConnect to an MCP server and consume its tools as
ToolSchemaobjects.This is an async context manager. Keep it alive to reuse the underlying connection for multiple tool calls (O(1) per call after connection setup). For single calls from synchronous code, use the
*_sync()convenience methods.- Parameters:
config (
MCPClientConfig) – Connection configuration (transport, command / URL, env, …).(recommended) (Example — async)
------------------------------
:: –
- config = MCPClientConfig(transport=”stdio”, command=”python”,
args=[“-m”, “my_server”])
- async with RactoMCPClient(config) as client:
# Reuse this connection for all calls. tools = await client.list_tools() result = await client.call_tool(“search”, {“query”: “AI”}) registry = await client.to_registry()
one-shot (Example — sync)
-----------------------
:: – client = RactoMCPClient(config) tools = client.list_tools_sync()
- async list_tools()[source]
List all tools exposed by the MCP server.
- Return type:
- Returns:
list[ToolSchema] – Provider-agnostic tool schemas — ready to be registered in any
ToolRegistryor passed directly to a developer kit viaChatConfig(tools=…).- Raises:
RuntimeError – If called outside an
async withblock.
- async call_tool(name, arguments=None)[source]
Call a remote MCP tool.
- Parameters:
- Return type:
- Returns:
MCPToolResult –
contentcontains all text blocks joined by"\n".is_errorisTruewhen the server signals a tool error.- Raises:
RuntimeError – If called outside an
async withblock.
- async to_registry()[source]
Return a
ToolRegistrypopulated with all server tools.Each callable in the registry makes a fresh one-shot MCP connection when invoked. This keeps the returned registry self-contained and usable outside an
async withblock.For high-throughput usage, hold the
RactoMCPClientcontext manager alive and callcall_tool()directly.- Return type:
- Returns:
ToolRegistry – Registry compatible with all three developer kits via
ChatConfig(tools=registry).
- list_tools_sync()[source]
Synchronous wrapper: connect, list tools, disconnect.
- Return type:
- Returns:
list[ToolSchema] – All tool schemas exposed by the server.
- Raises:
RuntimeError – If called from within a running event loop.
- call_tool_sync(name, arguments=None)[source]
Synchronous wrapper: connect, call tool, disconnect.
- Parameters:
- Return type:
- Returns:
MCPToolResult – Tool output.
- Raises:
RuntimeError – If called from within a running event loop.
- class ractogateway.RactoMCPServer(name, *, description='', version='0.1.0')[source]
Bases:
objectExpose a
ToolRegistry(or individual functions) as a Model Context Protocol server.Supported transports
stdio (default) — standard I/O; ideal for Claude Desktop and any subprocess-based MCP client.
sse — HTTP Server-Sent Events; for remote / browser-based clients. Requires
pip install ractogateway[mcp-sse].
- type name:
- param name:
Server name visible to MCP clients.
- type description:
- param description:
Optional human-readable description.
- type version:
- param version:
Server version string.
Example
from ractogateway import ToolRegistry from ractogateway.mcp import RactoMCPServer registry = ToolRegistry() @registry.register def add(a: int, b: int) -> int: '''Add two integers.''' return a + b server = RactoMCPServer.from_registry(registry, name="math-tools") server.run() # stdio, blocking
- classmethod from_registry(registry, *, name='ractogateway-server', description='RactoGateway MCP Server', version='0.1.0')[source]
Build a server from a populated
ToolRegistry.Every registered callable becomes an MCP tool. Tools registered only as Pydantic models (no backing callable) are silently skipped.
- Parameters:
registry (
ToolRegistry) – A ToolRegistry with one or more registered tools.name (
str) – MCP server name shown to clients.description (
str) – Optional server description.version (
str) – Server version string.
- Return type:
- Returns:
RactoMCPServer – Ready-to-run server instance.
- add_tool(fn, *, name=None, description=None)[source]
Register a single function as an MCP tool.
The function’s type annotations drive the JSON Schema; its docstring provides the description (overridable via description).
- run(transport='stdio', *, host='0.0.0.0', port=8000)[source]
Start the MCP server (blocking).
- Parameters:
- Return type:
- get_asgi_app()[source]
Return a Starlette ASGI app for SSE transport.
Use this to mount the MCP server into an existing web application rather than starting a standalone server with
run().Requires
pip install ractogateway[mcp-sse].- Return type:
Example
import uvicorn from ractogateway.mcp import RactoMCPServer server = RactoMCPServer.from_registry(registry, name="tools") app = server.get_asgi_app() uvicorn.run(app, host="0.0.0.0", port=8000)
- exception ractogateway.RactoGatewayError[source]
Bases:
ExceptionBase class for every RactoGateway runtime error.
- exception ractogateway.RactoGatewayTimeoutError[source]
Bases:
RactoGatewayErrorThe upstream provider did not respond within the allowed time.
- exception ractogateway.RactoGatewayAPIError(message, *, status_code=None)[source]
Bases:
RactoGatewayErrorThe upstream provider returned an error response.
- status_code
HTTP status code returned by the provider, when available.
- exception ractogateway.RactoGatewayAuthError[source]
Bases:
RactoGatewayErrorAPI key missing, invalid, or not authorised for the requested resource.
- exception ractogateway.ResponseModelValidationError(message, *, attempts, last_error, raw_response=None)[source]
Bases:
RactoGatewayErrorRaised when
response_modelvalidation cannot be satisfied.The LLM returned structurally valid JSON but Pydantic rejected it (e.g. a field value was out of range) and all automatic retry attempts were exhausted.
- attempts
Total number of API calls made (1 initial + N retries).
- last_error
The final
pydantic.ValidationErrorthat triggered this exception.
- raw_response
The LLM’s raw text from the last attempt, available for inspection or manual recovery.
- Example::
from ractogateway.exceptions import ResponseModelValidationError
- try:
response = kit.chat(config)
- except ResponseModelValidationError as e:
print(e.last_error) # Pydantic ValidationError details print(e.raw_response) # raw JSON string from the LLM print(e.attempts) # how many times we tried
- class ractogateway.AnthropicFineTuner(api_key=None)[source]
Bases:
objectFine-tune Anthropic Claude models using the fine-tuning API.
- Parameters:
api_key (
str|None) – Anthropic API key. Falls back to theANTHROPIC_API_KEYenvironment variable when not supplied.
Examples
End-to-end pipeline:
from ractogateway.finetune import RactoDataset, AnthropicFineTuner ds = RactoDataset.from_pairs( [("Summarise this: …", "The text discusses…")], system="You are a concise summariser.", ) tuner = AnthropicFineTuner() model = tuner.run_pipeline(ds, model="claude-3-haiku-20240307") print(model) # "claude-3-haiku-20240307:ft:org-xxx:suffix:abc123"
- upload_dataset(dataset)[source]
Upload dataset as an Anthropic training file.
- Parameters:
dataset (
RactoDataset) – The training examples to upload.- Return type:
- Returns:
str – The Anthropic file ID used in
create_job().
- create_job(training_file, model='claude-3-haiku-20240307', *, validation_file=None, suffix=None, hyperparameters=None)[source]
Submit a fine-tuning job.
- Parameters:
- Return type:
- Returns:
str – The fine-tuning job ID.
- get_status(job_id)[source]
Retrieve the current status of a fine-tuning job.
- list_jobs(limit=10)[source]
Return the most recent fine-tuning jobs (newest first).
- wait_for_completion(job_id, *, poll_interval=60, verbose=True)[source]
Block until a fine-tuning job finishes.
- Parameters:
- Return type:
- Returns:
str – Fine-tuned model name — pass directly to
AnthropicDeveloperKit(model=...).- Raises:
RuntimeError – If the job ends in
"failed"or"cancelled"state.
- run_pipeline(dataset, model='claude-3-haiku-20240307', *, validation_dataset=None, suffix=None, hyperparameters=None, poll_interval=60, verbose=True)[source]
Validate → upload → train → wait in a single call.
- Parameters:
dataset (
RactoDataset) – Training examples.model (
str) – Base Claude model to fine-tune.validation_dataset (
RactoDataset|None) – Optional held-out validation set.suffix (
str|None) – Short label appended to the fine-tuned model name.hyperparameters (
dict[str,Any] |None) – Optional overrides, e.g.{"n_epochs": 3}.poll_interval (
int) – Seconds between status polls.verbose (
bool) – Print progress to stdout.
- Return type:
- Returns:
str – Fine-tuned model identifier — pass directly to
AnthropicDeveloperKit(model=...).- Raises:
ValueError – If dataset validation fails.
RuntimeError – If the fine-tuning job fails remotely.
- class ractogateway.GeminiFineTuner(api_key=None)[source]
Bases:
objectFine-tune Google Gemini models using the Generative AI tuning API.
- Parameters:
api_key (
str|None) – Google AI API key. Falls back to theGEMINI_API_KEYenvironment variable when not supplied.
Examples
End-to-end pipeline:
from ractogateway.finetune import RactoDataset, GeminiFineTuner ds = RactoDataset.from_pairs( [("capital of France?", "Paris"), ("capital of Japan?", "Tokyo")], ) tuner = GeminiFineTuner() model_name = tuner.run_pipeline( ds, base_model="models/gemini-1.5-flash-001-tuning", display_name="geography-tutor", ) print(model_name) # "tunedModels/geography-tutor-abc123"
- create_job(dataset, base_model='models/gemini-1.5-flash-001-tuning', *, display_name='', epoch_count=5, batch_size=4, learning_rate=None)[source]
Start a Gemini supervised fine-tuning job.
- Parameters:
dataset (
RactoDataset) – Training examples. Each example must be a single-turn text pair (text_input/output). Examples with attachments or multi-turn conversations are not supported by this adapter — use Vertex AI for those.base_model (
str) – Tuning-enabled Gemini model identifier.display_name (
str) – Human-readable label for the tuned model.epoch_count (
int) – Number of training epochs.batch_size (
int) – Training batch size.learning_rate (
float|None) – Learning rate.Noneuses the provider default.
- Return type:
- Returns:
google.generativeai.types.TunedModel (operation-like object) – Pass to
wait_for_completion().- Raises:
ValueError – If the dataset fails validation, or if any examples are multimodal / multi-turn (unsupported by this adapter).
- get_model(tuned_model_name)[source]
Retrieve metadata for a tuned model.
- delete_model(tuned_model_name)[source]
Permanently delete a tuned model from your project.
- Return type:
- wait_for_completion(operation, *, poll_interval=60, verbose=True)[source]
Block until a tuning operation finishes.
- Parameters:
- Return type:
- Returns:
str – Tuned model name (e.g.
"tunedModels/my-model-abc123"). Pass directly toGoogleDeveloperKit(model=...).- Raises:
RuntimeError – If the tuning job ends in a failed state.
- run_pipeline(dataset, base_model='models/gemini-1.5-flash-001-tuning', *, display_name='', epoch_count=5, batch_size=4, learning_rate=None, poll_interval=60, verbose=True)[source]
Validate → create → wait in a single call.
- Parameters:
dataset (
RactoDataset) – Text-pair training examples.base_model (
str) – Tuning-enabled Gemini model.display_name (
str) – Human-readable label for the tuned model.epoch_count (
int) – Training epochs.batch_size (
int) – Training batch size.poll_interval (
int) – Seconds between status polls.verbose (
bool) – Print progress to stdout.
- Return type:
- Returns:
str – Tuned model name — pass to
GoogleDeveloperKit(model=...).
- class ractogateway.OpenAIFineTuner(api_key=None, *, base_url=None)[source]
Bases:
objectFine-tune OpenAI models using the fine-tuning API.
- Parameters:
Examples
End-to-end pipeline (simplest usage):
from ractogateway.finetune import RactoDataset, OpenAIFineTuner ds = RactoDataset.from_pairs( [("What is Python?", "A high-level programming language.")], system="You are a Python tutor.", ) tuner = OpenAIFineTuner() model = tuner.run_pipeline(ds, model="gpt-4o-mini-2024-07-18") print(model) # "ft:gpt-4o-mini-2024-07-18:org::abc123"
- upload_dataset(dataset)[source]
Upload dataset as an OpenAI training file.
- Parameters:
dataset (
RactoDataset) – The training examples to upload.- Return type:
- Returns:
str – The OpenAI file ID (e.g.
"file-abc123").
- create_job(training_file, model='gpt-4o-mini-2024-07-18', *, validation_file=None, n_epochs='auto', batch_size='auto', learning_rate_multiplier='auto', suffix=None)[source]
Submit a fine-tuning job.
- Parameters:
training_file (
str) – File ID returned byupload_dataset().model (
str) – Base model to fine-tune.validation_file (
str|None) – Optional validation file ID (also produced byupload_dataset()).learning_rate_multiplier (
float|str) – Scales the default learning rate.suffix (
str|None) – Custom label appended to the fine-tuned model name.
- Return type:
- Returns:
str – The fine-tuning job ID (e.g.
"ftjob-abc123").
- get_status(job_id)[source]
Retrieve the current status of a fine-tuning job.
- list_jobs(limit=10)[source]
Return the most recent fine-tuning jobs (newest first).
- list_events(job_id, limit=20)[source]
Return recent training log events for a job.
- wait_for_completion(job_id, *, poll_interval=30, verbose=True)[source]
Block until a fine-tuning job finishes.
- Parameters:
- Return type:
- Returns:
str – The fine-tuned model name ready for use in
OpenAILLMKit.- Raises:
RuntimeError – If the job ends in
"failed"or"cancelled"state.
- run_pipeline(dataset, model='gpt-4o-mini-2024-07-18', *, validation_dataset=None, n_epochs='auto', batch_size='auto', learning_rate_multiplier='auto', suffix=None, poll_interval=30, verbose=True)[source]
Validate → upload → train → wait in a single call.
This is the recommended entry-point for most use cases.
- Parameters:
dataset (
RactoDataset) – Training examples.model (
str) – Base model to fine-tune.validation_dataset (
RactoDataset|None) – Optional held-out validation set (uploaded separately).n_epochs (
int|str) – Training hyperparameters. Pass"auto"to let OpenAI decide.batch_size (
int|str) – Training hyperparameters. Pass"auto"to let OpenAI decide.learning_rate_multiplier (
float|str) – Training hyperparameters. Pass"auto"to let OpenAI decide.suffix (
str|None) – Short label appended to the fine-tuned model name.poll_interval (
int) – Seconds between status polls while waiting.verbose (
bool) – Print progress to stdout.
- Return type:
- Returns:
str – Fine-tuned model identifier — pass directly to
OpenAIDeveloperKit(model=...):kit = opd.OpenAIDeveloperKit(model=fine_tuned_model)
- Raises:
ValueError – If dataset validation fails.
RuntimeError – If the fine-tuning job fails remotely.
- class ractogateway.RactoDataset(examples=None)[source]
Bases:
objectAn ordered collection of
RactoTrainingExampleobjects.This is the central data container for building, validating, splitting, and exporting fine-tuning datasets for any supported LLM provider.
- Parameters:
examples (
list[RactoTrainingExample] |None) – Initial examples. An empty dataset is created when omitted.
Examples
Build from (user, assistant) pairs:
ds = RactoDataset.from_pairs( [ ("What is Python?", "Python is a high-level programming language."), ("What is a list?", "A list is a mutable ordered sequence."), ], system="You are a Python tutor.", )
Add multimodal examples manually:
ds.add( RactoTrainingExample.from_pair( user="Describe this image.", assistant="The image shows a flowchart with three decision nodes.", user_attachments=[RactoFile.from_path("diagram.png")], ) )
Export to JSONL for fine-tuning:
train_ds, val_ds = ds.split(0.8, seed=42) train_ds.export_jsonl("train.jsonl", provider="openai") val_ds.export_jsonl("val.jsonl", provider="openai")
- classmethod from_pairs(pairs, *, system='')[source]
Build a text-only dataset from
(user, assistant)pairs.
- classmethod from_jsonl(path, provider='openai')[source]
Load a JSONL dataset previously exported for provider.
Supports text-only OpenAI, Anthropic, and Gemini formats.
- Parameters:
- Return type:
- shuffle(seed=None)[source]
Return a new dataset with examples in random order.
- split(train_ratio=0.8, *, seed=None)[source]
Split into train and validation datasets.
- Parameters:
- Return type:
- Returns:
tuple[RactoDataset, RactoDataset] –
(train_dataset, validation_dataset)
- validate(provider='openai')[source]
Check examples for common formatting errors.
- to_jsonl_string(provider='openai')[source]
Serialize all examples to a JSONL string for provider.
- export_jsonl(path, provider='openai', *, overwrite=False)[source]
Write the dataset to a
.jsonlfile on disk.
- class ractogateway.RactoTrainingExample(messages)[source]
Bases:
objectA complete conversation used as one training record.
- Parameters:
messages (
list[RactoTrainingMessage]) –Ordered turns. Typical shapes:
Single-turn :
[user, assistant]With system :
[system, user, assistant]Multi-turn :
[system, user, assistant, user, assistant, …]
Examples
>>> ex = RactoTrainingExample.from_pair( ... user="What is 2 + 2?", ... assistant="4", ... system="You are a maths tutor.", ... )
>>> # Multimodal example (image + question) >>> ex = RactoTrainingExample.from_pair( ... user="Describe this chart.", ... assistant="The chart shows monthly revenue for Q4 2024.", ... user_attachments=[RactoFile.from_path("chart.png")], ... )
- classmethod from_pair(user, assistant, *, system='', user_attachments=None)[source]
Create a single-turn (prompt → completion) training example.
- classmethod from_conversation(turns)[source]
Build from a list of
(role, content)tuples.
- to_openai_dict()[source]
Serialize to OpenAI fine-tuning JSONL record.
Output format:
{"messages": [{"role": "system", "content": "…"}, …]}
- to_anthropic_dict()[source]
Serialize to Anthropic fine-tuning JSONL record.
Output format:
{"system": "…", "messages": [{"role": "user", …}, …]}The
systemkey is only present when a system message exists.
- to_gemini_dict()[source]
Serialize to Gemini tuning record.
For text-only single-turn examples (most common) the output is:
{"text_input": "…", "output": "…"}
For multimodal or multi-turn examples the Vertex AI
contentsformat is used:{"contents": [{"role": "user", "parts": […]}, …]}
- class ractogateway.RactoTrainingMessage(role, content, attachments=<factory>)[source]
Bases:
objectOne conversational turn inside a training example.
- Parameters:
- role: Literal['system', 'user', 'assistant']
- content: str
- attachments: list[RactoFile]
- to_openai()[source]
Return an OpenAI-compatible message dict.
Text-only messages produce
{"role": ..., "content": str}. Messages with attachments produce a content-block list:{"role": ..., "content": [image_url_block, ..., text_block]}.
- to_anthropic()[source]
Return an Anthropic-compatible message dict.
System messages should be lifted to the top-level
systemfield —RactoTrainingExample.to_anthropic_dict()handles this automatically.
- class ractogateway.Gateway(adapter, *, tools=None, default_prompt=None)[source]
Bases:
objectUnified entry point that wraps any
BaseLLMAdapter.- Parameters:
adapter (
BaseLLMAdapter) – A concrete adapter instance (OpenAILLMKit,GoogleLLMKit,AnthropicLLMKit).tools (
ToolRegistry|None) – An optionalToolRegistrycontaining registered tools that the LLM is allowed to call.default_prompt (
RactoPrompt|None) – An optionalRactoPromptto use whenrun()is called without an explicit prompt.Usage:: –
from ractogateway import RactoPrompt, Gateway from ractogateway.adapters import OpenAILLMKit
adapter = OpenAILLMKit(model=”gpt-4o”, api_key=”sk-…”) prompt = RactoPrompt(…) gw = Gateway(adapter=adapter)
response = gw.run(prompt, “Analyse this code for bugs.”) print(response.parsed) # auto-parsed JSON dict
- run(prompt=None, user_message='', *, tools=None, temperature=0.0, max_tokens=4096, response_model=None, **kwargs)[source]
Execute a request and return a standardised
LLMResponse.- Parameters:
prompt (
RactoPrompt|None) – The RACTO prompt. Falls back todefault_prompt.user_message (
str) – The end-user’s query.tools (
ToolRegistry|None) – Override the gateway-level tool registry for this call.temperature (
float) – Sampling temperature.max_tokens (
int) – Maximum tokens in the response.response_model (
type[BaseModel] |None) – Optional Pydantic model to validateparsedoutput against. If provided and the LLM returns valid JSON, it is validated through this model and attached toresponse.parsed.**kwargs (
Any) – Passed through to the adapter.
- Return type:
- async arun(prompt=None, user_message='', *, tools=None, temperature=0.0, max_tokens=4096, response_model=None, **kwargs)[source]
Async variant of
run().- Return type:
- class ractogateway.LLMResponse(**data)[source]
Bases:
BaseModelUnified, provider-agnostic response envelope.
Every adapter’s
run()method returns one of these, regardless of whether the underlying provider is OpenAI, Gemini, or Anthropic.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- tool_calls: list[ToolCallResult]
- finish_reason: FinishReason
- raw: Any
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.RactoFile(data, mime_type, name='')[source]
Bases:
objectA file attachment that can be passed to
RactoPrompt.to_messages().Create from a file path (MIME type is auto-detected) or directly from raw bytes with an explicit MIME type.
- Parameters:
Examples
>>> # From a file path >>> img = RactoFile.from_path("/tmp/photo.jpg")
>>> # From bytes >>> img = RactoFile.from_bytes(open("photo.jpg", "rb").read(), "image/jpeg")
- classmethod from_path(path)[source]
Load a file from path and auto-detect its MIME type.
- Parameters:
path (
str|Path) – Absolute or relative path to the file on disk.- Raises:
FileNotFoundError – If path does not exist.
- Return type:
RactoFile
- classmethod from_bytes(data, mime_type, name='')[source]
Create a
RactoFiledirectly from data bytes.
- property base64_data: str
Return file bytes encoded as a base-64 ASCII string.
- property is_image: bool
True when the MIME type is a supported image type.
- property is_pdf: bool
- property is_text: bool
- class ractogateway.RactoPrompt(**data)[source]
Bases:
BaseModelA strictly validated RACTO prompt definition.
- Parameters:
role (str) – A sentence (or short paragraph) describing who the LLM is.
aim (str) – A clear statement of the task objective.
constraints (list[str]) – Hard rules the model must obey. At least one is required.
tone (str) – The desired communication style (e.g. “Professional and concise”).
output_format (str | type[BaseModel]) – Either a format keyword (
"json","text","markdown"), a free-form format description, or a Pydantic model class whose JSON Schema will be embedded in the prompt.context (str | None) – Optional extra context paragraph injected between AIM and CONSTRAINTS. Useful for passing domain-specific background knowledge that the model needs to reason about.
examples (list[dict[str, str]] | None) – Optional list of example input/output pairs that are included in the prompt to steer the model via few-shot learning.
anti_hallucination (bool) – When True (the default), the compiler appends explicit anti-hallucination directives at the end of the prompt.
Notes
- Legacy compatibility:
Older RactoGateway versions accepted
instructionswithout the full RACTO field set. That shape is still accepted and mapped toaimwith sensible defaults for missing RACTO fields.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- role: str
- aim: str
- tone: str
- anti_hallucination: bool
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- compile()[source]
Compile the RACTO fields into an optimized system prompt string.
The resulting prompt is structured into clearly delimited sections so that the LLM can parse each instruction block unambiguously.
- Return type:
- Returns:
str – A ready-to-use system prompt.
- to_messages(user_message, *, attachments=None, provider='generic')[source]
Return a ready-to-send message list for a given LLM provider.
- Parameters:
user_message (
str) – The end-user’s query or input.attachments (
list[RactoFile] |None) –Optional list of
RactoFileobjects to send alongside the text message. Accepted inputs per file:File path — use
RactoFile.from_path():RactoFile.from_path("/tmp/diagram.png")
Raw bytes — use
RactoFile.from_bytes():RactoFile.from_bytes(img_bytes, "image/png")
Each file is re-encoded into the content-block schema expected by the target provider (
image_urlfor OpenAI,image/documentfor Anthropic,inline_datafor Google,imageslist for Ollama).provider (
str) – One of"openai","anthropic","google","ollama", or"generic". Controls the system-role key name and the content-block format used for attachments.
- Return type:
- Returns:
list[dict[str, Any]] – A list of message dicts suitable for the provider’s API.
- class ractogateway.ToolRegistry[source]
Bases:
objectA registry that collects tools and exposes them as canonical schemas.
Usage:
registry = ToolRegistry() @registry.register def get_weather(city: str, unit: str = "celsius") -> str: '''Get the current weather for a city.''' ... # Or register a Pydantic model: registry.register(WeatherRequest) # Iterate schemas: for schema in registry.schemas: print(schema.name)
- register(target=None, *, name=None, description=None)[source]
Register a function or Pydantic model as a tool.
Works as a decorator (
@registry.register) or as a direct call (registry.register(MyModel)).
- property schemas: list[ToolSchema]
Return all registered tool schemas.
- property tools: dict[str, ToolSchema]
Backward-compatible mapping of tool name -> schema.
- get_schema(name)[source]
Look up a single tool schema by name.
- Return type:
- ractogateway.tool(fn=None, *, name=None, description=None, registry=None)[source]
Decorator that marks a function as an LLM-callable tool.
Can be used bare (
@tool) or with overrides (@tool(name="my_tool", description="…")).You can also bind registration directly:
@tool(registry)or@tool(registry=registry).The decorated function gains a
_tool_schemaattribute containing the canonicalToolSchema.
- class ractogateway.RactoRAG(vector_store=None, embedder=None, *, store=None, chunker=None, processors=None, llm_kit=None, context_template=None, reader_registry=None, default_prompt=None)[source]
Bases:
objectProduction-grade RAG pipeline for RactoGateway.
- Parameters:
vector_store (
BaseVectorStore|None) – AnyBaseVectorStoreinstance.embedder (
BaseEmbedder|None) – AnyBaseEmbedderinstance.chunker (
BaseChunker|None) – How to split documents. Defaults toRecursiveChunkerwithchunk_size=512, overlap=50.processors (
list[BaseProcessor] |None) – List of text processors applied to each chunk before embedding. Defaults to[TextCleaner()].llm_kit (
Any|None) – Any developer kit (OpenAIDeveloperKit,GoogleDeveloperKit, orAnthropicDeveloperKit). Required forquery()/aquery().context_template (
str|None) – Template string for injecting retrieved context into the LLM prompt. Must contain{context}and{question}placeholders.reader_registry (
FileReaderRegistry|None) – CustomFileReaderRegistry. Defaults to a registry with all built-in readers.default_prompt (
RactoPrompt|None) – RACTO prompt used for generation. Falls back to a built-in RAG prompt.
- ingest(path, **metadata)[source]
Read, chunk, embed, and store a single file.
- ingest_dir(directory, pattern='**/*', **metadata)[source]
Recursively ingest all supported files in a directory.
- ingest_text(text, source='manual', **metadata)[source]
Ingest a raw text string directly (no file needed).
- async aingest_dir(directory, pattern='**/*', **metadata)[source]
Async variant of
ingest_dir().
- async aingest_text(text, source='manual', **metadata)[source]
Async variant of
ingest_text().
- retrieve(query, top_k=5, filters=None)[source]
Embed query and retrieve the top-k most relevant chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked results (rank 1 = most relevant).
- async aretrieve(query, top_k=5, filters=None)[source]
Async variant of
retrieve().- Return type:
- query(question, top_k=5, filters=None, prompt=None, temperature=0.0, max_tokens=2048)[source]
Retrieve relevant chunks and generate an answer.
- Parameters:
question (
str) – The user’s question.top_k (
int) – Number of context chunks to retrieve.filters (
dict[str,Any] |None) – Optional metadata filters for retrieval.prompt (
RactoPrompt|None) – Override the default RACTO prompt for generation.temperature (
float) – LLM temperature (default0.0for factual answers).max_tokens (
int) – Maximum tokens in the generated answer.
- Return type:
- Returns:
RAGResponse – Contains the generated answer plus the retrieved source chunks.
- Raises:
RuntimeError – If no
llm_kitwas provided.
- async aquery(question, top_k=5, filters=None, prompt=None, temperature=0.0, max_tokens=2048)[source]
Async variant of
query().- Return type:
- property store: BaseVectorStore
The underlying vector store.
- property embedder: BaseEmbedder
The underlying embedder.
- class ractogateway.FixedChunker(chunk_size=512, overlap=50)[source]
Bases:
BaseChunkerSplit text into fixed-size character windows with overlap.
- Parameters:
- class ractogateway.RecursiveChunker(chunk_size=512, overlap=50, separators=None)[source]
Bases:
BaseChunkerSplit text recursively using a priority list of separators.
- Parameters:
- class ractogateway.SemanticChunker(embedder, threshold=0.5, min_chunk_size=2, language='english')[source]
Bases:
BaseChunkerSplit documents where the semantic similarity between adjacent sentences drops below a threshold.
- Parameters:
embedder (
BaseEmbedder) – AnyBaseEmbedderinstance.threshold (
float) – Cosine similarity below which a split is inserted (default:0.5).min_chunk_size (
int) – Minimum number of sentences per chunk (prevents ultra-fine splits).language (
str) – NLTK sentence tokenizer language.
- class ractogateway.SentenceChunker(sentences_per_chunk=5, overlap_sentences=1, language='english')[source]
Bases:
BaseChunkerSplit text into groups of sentences using NLTK.
- Parameters:
- class ractogateway.GoogleEmbedder(model='text-embedding-004', *, api_key=None, task_type=None, batch_size=100)[source]
Bases:
BaseEmbedderEmbed texts using the Google Gemini Embeddings API.
- Parameters:
model (
str) – Gemini embedding model (default"text-embedding-004").api_key (
str|None) – Gemini API key. Falls back toGEMINI_API_KEYenv var.task_type (
str|None) – Gemini task type hint (e.g."RETRIEVAL_DOCUMENT","RETRIEVAL_QUERY").Nonelets the API decide.batch_size (
int) – Maximum number of texts per API call.
- property dimension: int
Dimensionality of the embedding vectors.
Returns
-1if not known until after the first call.
- embed(texts)[source]
Embed texts synchronously.
- class ractogateway.OpenAIEmbedder(model='text-embedding-3-small', *, api_key=None, base_url=None, dimensions=None, batch_size=256)[source]
Bases:
BaseEmbedderEmbed texts using the OpenAI Embeddings API.
- Parameters:
model (
str) – OpenAI embedding model (default"text-embedding-3-small").api_key (
str|None) – OpenAI API key. Falls back toOPENAI_API_KEYenv var.base_url (
str|None) – Custom base URL (Azure OpenAI or proxy).dimensions (
int|None) – Override output dimensionality (supported fortext-embedding-3-*).batch_size (
int) – Maximum number of texts per API call.
- property dimension: int
Dimensionality of the embedding vectors.
Returns
-1if not known until after the first call.
- embed(texts)[source]
Embed texts synchronously.
- class ractogateway.VoyageEmbedder(model='voyage-3', *, api_key=None, input_type='document', batch_size=128)[source]
Bases:
BaseEmbedderEmbed texts using the Voyage AI API.
Voyage AI embeddings are optimised for Anthropic Claude RAG pipelines and are the recommended choice when using Claude as the generation LLM.
- Parameters:
model (
str) – Voyage model name (default"voyage-3").api_key (
str|None) – Voyage API key. Falls back toVOYAGE_API_KEYenv var.input_type (
str|None) –"query"for queries,"document"for documents to index. Using the correct type improves retrieval quality.batch_size (
int) – Maximum texts per API call.
- property dimension: int
Dimensionality of the embedding vectors.
Returns
-1if not known until after the first call.
- embed(texts)[source]
Embed texts synchronously.
- class ractogateway.Lemmatizer(use_pos_tagging=True)[source]
Bases:
BaseProcessorReduce words to their base (lemma) form using NLTK WordNet.
- Parameters:
use_pos_tagging (
bool) – IfTrue, use POS tagging to improve lemmatization accuracy. Slightly slower but produces better results.
- class ractogateway.ProcessingPipeline(processors)[source]
Bases:
BaseProcessorApply a sequence of
BaseProcessorobjects to text.Example:
pipeline = ProcessingPipeline([TextCleaner(), Lemmatizer()]) processed = pipeline.process(" Hello, worlds! ")
- Parameters:
processors (
list[BaseProcessor]) – Ordered list of processors to apply. Each processor receives the output of the previous one.
- class ractogateway.TextCleaner(normalize_unicode=True, strip_html=True, strip_control_chars=True, collapse_whitespace=True, collapse_blank_lines=True)[source]
Bases:
BaseProcessorNormalise text for embedding and retrieval.
Steps applied (all optional via constructor flags):
Unicode normalisation (NFC)
Strip residual HTML tags
Remove control characters
Collapse multiple spaces to one
Collapse runs of blank lines to at most two newlines
Strip leading/trailing whitespace
- Parameters:
normalize_unicode (
bool) – Applyunicodedata.normalize("NFC", text).strip_html (
bool) – Remove<tag>patterns.strip_control_chars (
bool) – Remove non-printable control characters.collapse_whitespace (
bool) – Collapse sequences of spaces/tabs to a single space.collapse_blank_lines (
bool) – Collapse 3+ consecutive newlines to 2.
- class ractogateway.FileReaderRegistry(readers=None)[source]
Bases:
objectRegistry that maps file extensions to
BaseReaderinstances.By default all built-in readers are registered. You can add custom readers with
register().Example:
registry = FileReaderRegistry() doc = registry.read("report.pdf")
- register(reader)[source]
Add reader to the registry for all its supported extensions.
- Return type:
- get_reader(path)[source]
Return the reader for path’s extension.
- Raises:
ValueError – If no reader supports the file’s extension.
- Return type:
- class ractogateway.ChromaStore(collection='ractogateway', *, path=None, host=None, port=8000, distance_function='cosine')[source]
Bases:
BaseVectorStoreVector store backed by ChromaDB.
Supports both in-process (
pathorNonefor ephemeral) and HTTP-client modes (host+port).- Parameters:
collection (
str) – Name of the ChromaDB collection.path (
str|None) – Persist directory for a local persistent client.None= ephemeral.host (
str|None) – ChromaDB server host (enables HTTP client mode).port (
int) – ChromaDB server port (default 8000).distance_function (
str) –"cosine","l2", or"ip"(inner product).
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.FAISSStore(dimension=None, index_type='flat_ip')[source]
Bases:
BaseVectorStoreVector store backed by Facebook AI Similarity Search (FAISS).
Stores embeddings in a flat L2 or cosine (Inner Product) index. All data is in-memory; call
save()/load()to persist.- Parameters:
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- save(path)[source]
Persist the FAISS index to path.index and chunks to path.chunks.
- Return type:
- class ractogateway.InMemoryVectorStore(similarity='cosine')[source]
Bases:
BaseVectorStorePure-Python brute-force vector store — no extra dependencies.
This store keeps all chunks and their embeddings in memory. It is not suitable for production-scale corpora but requires no installation.
- Parameters:
similarity (
str) – Similarity function to use. Currently only"cosine"is supported.
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.MilvusStore(collection='ractogateway', *, host='localhost', port=19530, uri=None, token=None, dimension=None, metric_type='IP', batch_size=100)[source]
Bases:
BaseVectorStoreVector store backed by Milvus or Zilliz Cloud.
- Parameters:
collection (
str) – Milvus collection name.host (
str) – Milvus server host (default"localhost").port (
int) – Milvus server port (default19530).uri (
str|None) – Zilliz Cloud URI (overrides host/port when set).dimension (
int|None) – Embedding dimension. Inferred on first add.metric_type (
str) –"IP"(inner product / cosine) or"L2".batch_size (
int) – Vectors per insert batch.
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.PGVectorStore(dsn, *, table='rag_chunks', dimension=None, distance='cosine', batch_size=100)[source]
Bases:
BaseVectorStoreVector store backed by PostgreSQL with the pgvector extension.
- Parameters:
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.PineconeStore(index_name, *, api_key=None, namespace='', batch_size=100)[source]
Bases:
BaseVectorStoreVector store backed by Pinecone cloud.
- Parameters:
index_name (
str) – Name of the Pinecone index (must already exist).api_key (
str|None) – Pinecone API key. Falls back toPINECONE_API_KEYenv var.namespace (
str) – Pinecone namespace for logical data isolation.environment – Deprecated Pinecone environment string (for legacy pod-based indexes).
batch_size (
int) – Number of vectors per upsert batch.
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.QdrantStore(collection='ractogateway', *, url=None, api_key=None, distance='cosine', dimension=None, batch_size=100)[source]
Bases:
BaseVectorStoreVector store backed by Qdrant.
- Parameters:
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.WeaviateStore(class_name='RactoChunk', *, url=None, api_key=None, additional_headers=None, distance_metric='cosine', batch_size=100)[source]
Bases:
BaseVectorStoreVector store backed by Weaviate.
Supports embedded (local, no server needed), local server, and Weaviate Cloud (WCS) connections.
- Parameters:
class_name (
str) – Weaviate class (collection) name.url (
str|None) – Weaviate server URL.None= use embedded Weaviate.additional_headers (
dict[str,str] |None) – Extra HTTP headers (e.g. for OpenAI API key pass-through to Weaviate).distance_metric (
str) –"cosine"or"l2-squared".batch_size (
int) – Objects per batch import.
- add(chunks)[source]
Add chunks (with embeddings) to the store.
- Parameters:
chunks (
list[Chunk]) – Chunks to index. Each chunk must have a non-Noneembedding.- Raises:
ValueError – If any chunk has
embedding=None.- Return type:
- search(embedding, top_k=5, filters=None)[source]
Search for the top_k most similar chunks.
- Parameters:
- Return type:
- Returns:
list[RetrievalResult] – Ranked list of results (rank 1 = most similar).
- class ractogateway.CacheConfig(**data)[source]
Bases:
BaseModelConfiguration for cache instances.
- Parameters:
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- max_size: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.CacheStats(**data)[source]
Bases:
BaseModelSnapshot of cache performance counters.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- hits: int
- misses: int
- size: int
- property total: int
Total requests seen by the cache.
- property hit_rate: float
Fraction of requests that were cache hits (0.0-1.0).
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.ExactMatchCache(max_size=1024, ttl_seconds=None)[source]
Bases:
objectUltra-low-latency key-value cache for identical LLM requests.
- Parameters:
max_size (
int) – LRU capacity.0= unlimited (no eviction).ttl_seconds (
float|None) – Entries older than ttl_seconds are treated as misses and transparently evicted.Nonedisables expiry.Example:: –
from ractogateway.cache import ExactMatchCache
cache = ExactMatchCache(max_size=512, ttl_seconds=3600)
# Wire into a kit: kit = OpenAIDeveloperKit(model=”gpt-4o”, exact_cache=cache)
- get(user_message, system_prompt, model, temperature, max_tokens)[source]
Return a cached response or
Noneon a miss.O(1) — dictionary lookup + optional move-to-end.
- Return type:
- put(user_message, system_prompt, model, temperature, max_tokens, response)[source]
Store a response. Evicts LRU entry when at capacity.
O(1) amortised — dictionary insert + optional popitem(last=False).
- Return type:
- invalidate(user_message, system_prompt, model, temperature, max_tokens)[source]
Remove a specific entry. Returns
Trueif it was present.- Return type:
- property stats: CacheStats
Return a snapshot of hit/miss/size counters.
- class ractogateway.SemanticCache(embed_fn, similarity_threshold=0.95, max_size=512, ttl_seconds=None)[source]
Bases:
objectVector-similarity cache — returns cached answers for semantically similar queries, costing $0 in API calls.
- Parameters:
embed_fn (
Callable[[str],list[float]]) – Any callable(text: str) -> list[float]. Called once per new query (cache miss) and once atput()time.similarity_threshold (
float) – Minimum cosine similarity to declare a hit. Default0.95is intentionally strict to avoid incorrect responses.max_size (
int) – Maximum number of entries (LRU eviction).0= unlimited.ttl_seconds (
float|None) – Optional per-entry TTL.Nonedisables expiry.
Examples
import ractogateway.openai_developer_kit as gpt kit = gpt.OpenAIDeveloperKit(model="gpt-4o") def embed(text: str) -> list[float]: import openai r = openai.OpenAI().embeddings.create( model="text-embedding-3-small", input=text ) return r.data[0].embedding cache = SemanticCache(embed_fn=embed, similarity_threshold=0.95)
- get(query)[source]
Embed query and return a cached response if cosine-sim ≥ threshold.
Returns
Noneon a cache miss (caller should make the real API call and then invokeput()).Complexity: O(n·d) where n = number of entries, d = embedding dim.
- Return type:
- put(query, response)[source]
Embed query and store response for future similar queries.
Evicts LRU entry when at capacity.
- Return type:
- property stats: CacheStats
Return a snapshot of hit/miss/size counters.
- class ractogateway.ChatMemoryConfig(**data)[source]
Bases:
BaseModelConfiguration for
RedisChatMemory.- Parameters:
max_turns (int) – Maximum number of conversation turns to retain. Each turn consists of one user message and one assistant message, so up to
max_turns * 2raw messages are stored per conversation.ttl_seconds (float | None) – Optional TTL. Every
append()call refreshes the expiry on the underlying Redis list.Nonedisables expiry.key_prefix (str) – Redis key namespace. Each conversation is stored at
"{key_prefix}:{conversation_id}".
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- max_turns: int
- key_prefix: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.RateLimitConfig(**data)[source]
Bases:
BaseModelConfiguration for
RedisRateLimiter.- Parameters:
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- max_tokens_per_minute: int
- key_prefix: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.RedisChatMemory(*, url='redis://localhost:6379/0', client=None, config=None)[source]
Bases:
objectShared, bounded conversation history backed by Redis.
- Parameters:
url (
str) – Redis connection URL. Ignored when client is provided.config (
ChatMemoryConfig|None) –ChatMemoryConfigcontrolling turn limit, TTL, and key namespace. Defaults are applied whenNone.
- append(conversation_id, role, content)[source]
Append a message to the conversation history.
After appending, the list is trimmed to the last
config.max_turns * 2messages (oldest dropped first). If a TTL is configured, it is refreshed on every append so the window slides with activity.
- get_history(conversation_id)[source]
Return all stored messages as a list of
{"role", "content"}dicts.The list is ordered oldest-first, matching the
ChatConfig.historyconvention.Returns an empty list when the conversation does not exist or has expired.
- class ractogateway.RedisExactCache(*, url='redis://localhost:6379/0', client=None, ttl_seconds=None, key_prefix='ractogateway:exact')[source]
Bases:
objectDistributed exact-match LRU cache backed by Redis.
- Parameters:
url (
str) – Redis connection URL (e.g."redis://localhost:6379/0"). Ignored when client is provided.client (
Any|None) – Pre-builtredis.Redis(or compatible) client. Useful when you manage the connection pool yourself or use a mock in tests.ttl_seconds (
float|None) – Optional TTL for each entry. Passed directly to RedisSET EX.Nonemeans entries never expire (Redis default).key_prefix (
str) – Namespace for all Redis keys managed by this instance.
- get(user_message, system_prompt, model, temperature, max_tokens)[source]
Return a cached response or
Noneon a miss.O(1) Redis GET.
- Return type:
- put(user_message, system_prompt, model, temperature, max_tokens, response)[source]
Store a response in Redis.
O(1) Redis SET [EX ttl].
- Return type:
- invalidate(user_message, system_prompt, model, temperature, max_tokens)[source]
Remove a specific entry. Returns
Trueif it was present.- Return type:
- clear()[source]
Delete all entries matching this instance’s key prefix.
Uses SCAN to iterate safely (no KEYS * in production). Also resets in-memory stats counters.
- Return type:
- property stats: CacheStats
Return a snapshot of hit/miss counters plus current Redis key count.
- class ractogateway.RedisRateLimiter(*, url='redis://localhost:6379/0', client=None, config=None)[source]
Bases:
objectFleet-wide token-budget rate limiter backed by a shared Redis instance.
- Parameters:
url (
str) – Redis connection URL. Ignored when client is provided.client (
Any|None) – Pre-builtredis.Redisclient. Useful for connection-pool sharing or unit-test mocking.config (
RateLimitConfig|None) –RateLimitConfigcontrolling the token budget and Redis key namespace. Defaults are applied whenNone.
- check_and_consume(user_id, tokens=1)[source]
Attempt to consume tokens from user_id’s budget.
Returns
Trueif the request is within budget (tokens are consumed), orFalseif the rate limit would be exceeded (no tokens consumed).The check-and-increment is done in a single Redis pipeline, making it safe against concurrent requests from the same user.
- get_remaining(user_id)[source]
Return the remaining token budget for the current minute.
Returns
max_tokens_per_minuteif the user has not made any requests in the current window.- Return type:
- class ractogateway.KafkaAuditEvent(**data)[source]
Bases:
BaseModelImmutable audit envelope for prompt/response traces.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_id: str
- model: str
- prompt: str
- response: str
- timestamp_utc: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaAuditLogger(producer, *, topic='ractogateway.audit')[source]
Bases:
objectAsynchronous audit logger for regulated AI workloads.
Writes immutable prompt/response records to a dedicated Kafka topic so audit traces survive web-process crashes and can be replayed later.
- property topic: str
- class ractogateway.KafkaConsumerClient(*, config, consumer=None, key_deserializer=None, value_deserializer=None)[source]
Bases:
objectTyped facade over
kafka.KafkaConsumer.- Parameters:
config (
KafkaConsumerConfig) – Consumer connection and polling options.consumer (
Any|None) – Pre-built consumer object; useful for tests and dependency injection.key_deserializer (
Callable[[Any],Any] |None) – Optional override for key decoding.value_deserializer (
Callable[[Any],Any] |None) – Optional override for value decoding.
- poll(*, timeout_ms=None, max_records=None)[source]
Poll records and return normalized
KafkaMessageentries.- Return type:
- class ractogateway.KafkaConsumerConfig(**data)[source]
Bases:
BaseModelConfiguration for
KafkaConsumerClient.- Parameters:
group_id (str | None) – Consumer group ID.
Nonedisables group coordination.auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.
enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.
max_poll_records (int) – Upper bound on records returned per poll call.
poll_timeout_ms (int) – Default poll timeout in milliseconds.
session_timeout_ms (int) – Group session timeout.
client_id (str) – Consumer client ID.
security_protocol (str) – Kafka transport protocol.
sasl_mechanism (str | None) – SASL mechanism when applicable.
sasl_plain_username (str | None) – Username for SASL auth.
sasl_plain_password (str | None) – Password for SASL auth.
extra (dict[str, Any]) – Additional kwargs forwarded to
kafka.KafkaConsumer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- auto_offset_reset: Literal['earliest', 'latest', 'none']
- enable_auto_commit: bool
- max_poll_records: int
- poll_timeout_ms: int
- session_timeout_ms: int
- client_id: str
- security_protocol: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaMessage(**data)[source]
Bases:
BaseModelNormalized Kafka record returned by
KafkaConsumerClient.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- topic: str
- partition: int
- offset: int
- value: Any
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaProduceResult(**data)[source]
Bases:
BaseModelBroker acknowledgement metadata returned by publish operations.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- topic: str
- partition: int
- offset: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaProducerClient(*, config=None, producer=None, key_serializer=None, value_serializer=None)[source]
Bases:
objectTyped facade over
kafka.KafkaProducer.- Parameters:
config (
KafkaProducerConfig|None) – Producer connection and reliability settings. Defaults are applied when omitted.producer (
Any|None) – Pre-built Kafka producer object. Useful for dependency injection in tests or when your runtime provides the producer lifecycle externally.key_serializer (
Callable[[Any],bytes] |None) – Optional override for key serialization.value_serializer (
Callable[[Any],bytes] |None) – Optional override for value serialization.
- publish(topic, value, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]
Publish one event and optionally wait for broker acknowledgement.
- Return type:
- publish_json(topic, payload, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]
Alias for
publish()when value is a JSON payload.- Return type:
- class ractogateway.KafkaProducerConfig(**data)[source]
Bases:
BaseModelConfiguration for
KafkaProducerClient.- Parameters:
bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).
client_id (str) – Producer client ID reported to the broker.
acks (Literal['all', '0', '1'] | int) – Acknowledgement policy;
"all"is safest for durability.linger_ms (int) – Small batching delay to improve throughput.
compression_type (str | None) – Optional compression algorithm (e.g.
"gzip","snappy").retries (int) – Retry attempts for transient broker/network failures.
request_timeout_ms (int) – Broker request timeout.
security_protocol (str) – Kafka transport protocol (
PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL).sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.
sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.
sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.
extra (dict[str, Any]) – Additional kwargs forwarded directly to
kafka.KafkaProducer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- client_id: str
- linger_ms: int
- retries: int
- request_timeout_ms: int
- security_protocol: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaPublishRequest(**data)[source]
Bases:
BaseModelExplicit publish request used by stream routing handlers.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- topic: str
- value: Any
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaStreamConfig(**data)[source]
Bases:
BaseModelBatch processing configuration for
KafkaStreamProcessor.- Parameters:
batch_size (int) – Maximum messages to process per handler invocation.
max_wait_ms (int) – Max wall-clock wait to collect one batch.
poll_timeout_ms (int) – Per-poll timeout while collecting a batch.
commit_on_success (bool) – Commit offsets after successful handler + publish cycle.
wait_for_publish (bool) – If
True, block for broker acknowledgement on each produced output.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- batch_size: int
- max_wait_ms: int
- poll_timeout_ms: int
- commit_on_success: bool
- wait_for_publish: bool
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]
Bases:
objectReal-time stream loop helper for high-throughput event processing.
- Parameters:
consumer (
KafkaConsumerClient) – Source consumer used to read input events.producer (
KafkaProducerClient|None) – Optional producer used to publish handler outputs.config (
KafkaStreamConfig|None) – Batch and commit controls. Defaults are applied when omitted.
- collect_batch()[source]
Collect up to
batch_sizemessages bounded bymax_wait_ms.- Return type:
- process_once(handler, *, output_topic=None)[source]
Run one batch cycle and return the number of consumed messages.
- Return type:
Handler return options
None: consume-only, no output publish.Sequence[KafkaPublishRequest]: explicit per-message publish plans.Sequence[Any]: publish raw values tooutput_topic.
- class ractogateway.CostAwareRouter(tiers)[source]
Bases:
objectRoutes LLM requests to the appropriate model tier based on message complexity — without making any extra API calls.
- Parameters:
tiers (
list[RoutingTier]) – Ordered list ofRoutingTierobjects, sorted ascending bymax_score(cheapest first). The last tier’smax_scoreshould be100to act as fallback.- Raises:
ValueError – If
tiersis empty or not sorted ascending bymax_score.Example — 3-tier OpenAI ladder:: – from ractogateway.routing import CostAwareRouter, RoutingTier router = CostAwareRouter([ RoutingTier(model=”gpt-4o-mini”, max_score=30), RoutingTier(model=”gpt-4o”, max_score=70), RoutingTier(model=”o3-mini”, max_score=100), ]) model = router.route(“What is 2+2?”) # → “gpt-4o-mini” model = router.route(“Analyze the trade-offs between Redis Cluster and ” “Cassandra for a write-heavy time-series workload …”) # → “o3-mini”
Example — binary routing (2 tiers):: – router = CostAwareRouter([ RoutingTier(model=”claude-haiku-4-5-20251001”, max_score=40), RoutingTier(model=”claude-opus-4-6”, max_score=100), ])
- score(text)[source]
Compute a complexity score in [0, 100] for text.
A higher score means a more complex task.
- Return type:
Algorithm
token_pts = min(len(text)//4, SAT) * (MAX_TP / SAT) kw_pts = min(matches * PPK, MAX_KP) score = clamp(token_pts + kw_pts, 0, 100)
- route(text)[source]
Return the model identifier for text.
Walks tiers (cheapest first) and returns the first model whose
max_score ≥ complexity_score. Always returns a model because the last tier hasmax_score == 100(validated at construction).Complexity: O(k) where k = number of tiers.
- Return type:
- property tiers: tuple[RoutingTier, ...]
Immutable view of the configured tiers.
- class ractogateway.RoutingTier(**data)[source]
Bases:
BaseModelOne tier in the cost-aware routing ladder.
The router evaluates a complexity score (0-100) for each incoming message and selects the first tier whose
max_scoreis >= that score. The last tier in the list always acts as the catch-all fallback.- Parameters:
model (str) – The LLM model identifier to use for requests that fall in this tier (e.g.
"gpt-4o-mini","gemini-2.0-flash","claude-haiku-4-5-20251001").max_score (float) – Inclusive upper bound on the complexity score that routes to this model. Range: 0-100. Set to
100for the last (most powerful) tier so it catches everything.
Examples
tiers = [ RoutingTier(model="gpt-4o-mini", max_score=30), RoutingTier(model="gpt-4o", max_score=70), RoutingTier(model="o3-mini", max_score=100), ]
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model: str
- max_score: float
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.TokenTruncator(config=None)[source]
Bases:
objectSmart conversation-history trimmer.
- Parameters:
config (
TruncationConfig|None) –TruncationConfiginstance. If omitted a default config is used (approximate counter, 8 k limit).
Examples
from ractogateway.truncation import TokenTruncator, TruncationConfig import tiktoken enc = tiktoken.encoding_for_model("gpt-4o") truncator = TokenTruncator( TruncationConfig( token_counter=lambda t: len(enc.encode(t)), keep_first_n=2, keep_last_n=8, ) ) kit = OpenAIDeveloperKit(model="gpt-4o", truncator=truncator)
- truncate(chat_config, model)[source]
Return a copy of chat_config with trimmed history if necessary.
If the total estimated token count (system prompt + history + user_message) fits within the model’s context limit, the original
ChatConfigis returned unchanged.- Parameters:
chat_config (
ChatConfig) – The chat configuration to potentially truncate.model (
str) – The resolved model name used to look up the context-window limit.
- Return type:
ChatConfig- Returns:
ChatConfig – A new
ChatConfiginstance with (possibly shorter) history. Theuser_messageand all other fields are preserved verbatim.
- class ractogateway.TruncationConfig(**data)[source]
Bases:
BaseModelConfiguration for
TokenTruncator.- Parameters:
max_context_tokens (int | None) – Hard cap on total prompt tokens before calling the API. When
None, the truncator looks up the model inMODEL_CONTEXT_LIMITS(falling back to8 192).keep_first_n (int) – Number of history messages to always preserve from the start of the conversation (anchors context). Defaults to
2.keep_last_n (int) – Number of history messages to always preserve from the most recent end of the conversation. Defaults to
6.token_counter (Callable[[str], int]) –
Callable
(text: str) -> int. Defaults to the built-in approximate counter (len // 4). Swap fortiktokenfor exact OpenAI token counts:import tiktoken enc = tiktoken.encoding_for_model("gpt-4o") config = TruncationConfig(token_counter=lambda t: len(enc.encode(t)))
safety_margin (int) – Extra token budget reserved beyond the system prompt and user message. Defaults to
512.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- keep_first_n: int
- keep_last_n: int
- safety_margin: int
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- resolve_limit(model)[source]
Return the effective token limit for model.
Priority:
max_context_tokens→MODEL_CONTEXT_LIMITSlookup →_DEFAULT_CONTEXT.- Return type:
- class ractogateway.RactoCeleryWorker(app, *, kit, rag=None, retry_config=None)[source]
Bases:
objectCelery-backed task queue wrapper for RactoGateway developer kits.
- Parameters:
app (
Any) – A pre-configuredcelery.Celeryinstance with broker and backend already set. You create and configure it yourself so you retain full control over serializers, routing, concurrency, etc.kit (
Any) – Any RactoGateway developer kit —OpenAIDeveloperKit,GoogleDeveloperKit, orAnthropicDeveloperKit. The kit’sdefault_promptis used by generation tasks (prompts are not serialisable over the broker).rag (
Any|None) – OptionalRactoRAGinstance. Required only when callingingest_document().retry_config (
RetryConfig|None) – Exponential-backoff configuration. Defaults are applied whenNone.
- generate(user_message, *, temperature=0.0, max_tokens=4096, history=None, extra=None)[source]
Enqueue an LLM generation task and return immediately.
The task automatically retries with exponential backoff on transient errors (
RactoGatewayTimeoutErrorandRactoGatewayAPIError).- Parameters:
user_message (
str) – The prompt text for this generation.temperature (
float) – Sampling temperature passed to the LLM.max_tokens (
int) – Maximum completion tokens.history (
list[dict[str,str]] |None) – Previous conversation turns as[{"role": …, "content": …}, …]. Use this to continue a multi-turn dialogue asynchronously.extra (
dict[str,Any] |None) – Provider-specific pass-through parameters (top_p,seed, …).
- Return type:
- Returns:
celery.result.AsyncResult – Call
.idfor the task UUID. Usewait()orget_result()to retrieve the outcome.
Note
The prompt is not an argument because Pydantic models containing
type[BaseModel]fields cannot be serialised over the message broker. Set your prompt on the kit’sdefault_promptat construction time.
- ingest_document(path, **metadata)[source]
Enqueue a background RAG document-ingestion task.
The full
read → chunk → process → embed → storepipeline runs in a Celery worker. Your web request returns immediately with anAsyncResultwhose.idyou can poll later.- Parameters:
- Return type:
- Returns:
celery.result.AsyncResult
- Raises:
RuntimeError – If
ragwas not provided toRactoCeleryWorker.
- parallel_batch(items, *, temperature=0.0, max_tokens=4096)[source]
Fan a list of items out to the worker pool in parallel.
Uses Celery
group()so all tasks are submitted at once and run concurrently across available workers.- Parameters:
- Return type:
- Returns:
celery.result.GroupResult – Call
wait_parallel()to block until all tasks finish, or iterate.resultsfor individualAsyncResultobjects.
- get_result(task_id)[source]
Return the current state of a task without blocking.
- Parameters:
task_id (
str) – The UUID returned bygenerate(),ingest_document(), or the.idattribute of anAsyncResult.- Return type:
- Returns:
TaskResult –
statuswill bePENDINGif the task has not started yet.
- wait(task_id, *, timeout_s=None)[source]
Block until a task completes (or times out) and return its result.
- Parameters:
- Return type:
- Returns:
TaskResult –
result.okisTrueon success;result.erroris set on failure or timeout.
- wait_parallel(group_result, *, timeout_s=None)[source]
Block until all tasks from
parallel_batch()complete.- Parameters:
- Return type:
- Returns:
list[TaskResult] – One
TaskResultper item, in submission order. Inspect each.ok/.errorindividually.
- class ractogateway.RetryConfig(**data)[source]
Bases:
BaseModelExponential-backoff retry policy for
RactoCeleryWorkertasks.The backoff delay for attempt n (0-based) is:
delay = min(initial_delay_s * backoff_factor ** n, max_delay_s)
With the defaults this gives: 2 s → 4 s → 8 s (then stops at max_retries=3).
- Parameters:
max_retries (int) – Maximum number of retry attempts after the first failure.
0disables retries entirely.initial_delay_s (float) – Countdown (seconds) before the first retry.
backoff_factor (float) – Multiplier applied on each successive retry. Must be > 1.
max_delay_s (float) – Upper bound on the countdown (seconds). Prevents extremely long waits after many retries.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- max_retries: int
- initial_delay_s: float
- backoff_factor: float
- max_delay_s: float
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.TaskResult(**data)[source]
Bases:
BaseModelUnified result returned by
wait()andget_result().- Parameters:
task_id (str) – The Celery task UUID.
status (TaskStatus) – Current
TaskStatus.result (Any | None) –
Deserialised task output on success:
For
generate()— adictmatchingLLMResponse.model_dump()`.For
ingest_document()— alistofChunk.model_dump()` dicts.
error (str | None) – Exception message string on failure;
Noneon success.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- task_id: str
- status: TaskStatus
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- property ok: bool
Truewhen the task succeeded and produced a result.
- class ractogateway.TaskStatus(*values)[source]
-
Lifecycle state of a Celery task.
Maps directly to Celery’s native task states so you can compare them without importing Celery’s own constants.
- PENDING = 'pending'
- STARTED = 'started'
- SUCCESS = 'success'
- FAILURE = 'failure'
- RETRY = 'retry'
- REVOKED = 'revoked'
- class ractogateway.AnthropicBatchProcessor(model='claude-haiku-4-5-20251001', *, api_key=None, default_prompt=None)[source]
Bases:
objectSubmit thousands of Claude requests to Anthropic’s Message Batches API at ~50 % of standard API cost.
- Parameters:
model (
str) – Claude model for all items (e.g."claude-haiku-4-5-20251001").api_key (
str|None) – Anthropic API key. Falls back toANTHROPIC_API_KEYenv var.default_prompt (
RactoPrompt|None) – RACTO prompt used as thesystemmessage for every batch item.
- submit_batch / asubmit_batch:
Create a batch job → returns
BatchJobInfo.
- poll_status / apoll_status:
Fetch current job state → returns updated
BatchJobInfo.
- get_results / aget_results:
Stream and parse completed job results →
list[BatchResult].
- submit_and_wait / asubmit_and_wait:
Convenience: submit + poll until done + return results.
- provider: str = 'anthropic'
- submit_batch(items, *, prompt=None)[source]
Create an Anthropic Message Batch job.
Returns immediately with a
BatchJobInfo(status = IN_PROGRESS).- Return type:
- poll_status(job_id)[source]
Fetch the current status of batch job job_id.
- Return type:
- get_results(job_id)[source]
Stream and parse results for a completed batch job.
- Raises:
RuntimeError – If the job is not yet completed.
- Return type:
- submit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]
Submit a batch and block until it completes, then return results.
- Parameters:
- Return type:
- async asubmit_batch(items, *, prompt=None)[source]
Async variant of
submit_batch().- Return type:
- async apoll_status(job_id)[source]
Async variant of
poll_status().- Return type:
- async aget_results(job_id)[source]
Async variant of
get_results().- Return type:
- async asubmit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]
Async variant of
submit_and_wait().- Return type:
- class ractogateway.BatchItem(**data)[source]
Bases:
BaseModelA single request within a batch job.
- Parameters:
custom_id (str) – User-supplied identifier used to correlate results. Must be unique within a batch.
user_message (str) – The end-user’s query string (equivalent to
ChatConfig.user_message).temperature (float) – Sampling temperature. Defaults to
0.0.max_tokens (int) – Maximum tokens for the completion. Defaults to
4096.extra (dict[str, Any]) – Provider-specific pass-through kwargs.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- custom_id: str
- user_message: str
- temperature: float
- max_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.BatchJobInfo(**data)[source]
Bases:
BaseModelMetadata about a submitted batch job.
Returned by
submit_batch()andpoll_status().Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- job_id: str
- provider: str
- status: BatchStatus
- created_at: float
- request_count: int
- raw: Any
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.BatchResult(**data)[source]
Bases:
BaseModelThe outcome of a single
BatchItem.A result is always present in the
resultslist returned byget_results(); checkerrorto detect failures.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- custom_id: str
- response: LLMResponse | None
- raw: Any
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- property ok: bool
Truewhen the request succeeded (no error, response present).
- class ractogateway.BatchStatus(*values)[source]
-
Processing state of a batch job.
Maps to the union of OpenAI and Anthropic batch status strings.
- PENDING = 'pending'
- IN_PROGRESS = 'in_progress'
- FINALIZING = 'finalizing'
- COMPLETED = 'completed'
- FAILED = 'failed'
- EXPIRED = 'expired'
- CANCELLING = 'cancelling'
- CANCELLED = 'cancelled'
- class ractogateway.OpenAIBatchProcessor(model='gpt-4o-mini', *, api_key=None, base_url=None, default_prompt=None)[source]
Bases:
objectSubmit thousands of chat-completion requests to OpenAI’s Batch API at ~50 % of standard API cost.
- Parameters:
model (
str) – Chat model to use for all items in a batch (e.g."gpt-4o-mini").api_key (
str|None) – OpenAI API key. Falls back toOPENAI_API_KEYenv var.base_url (
str|None) – Custom base URL (Azure OpenAI / proxy).default_prompt (
RactoPrompt|None) – RACTO prompt used as the system message for every batch item.
- submit_batch / asubmit_batch:
Upload JSONL and create batch job → returns
BatchJobInfo.
- poll_status / apoll_status:
Fetch current job state → returns updated
BatchJobInfo.
- get_results / aget_results:
Download and parse completed job results →
list[BatchResult].
- submit_and_wait / asubmit_and_wait:
Convenience: submit + poll until done + return results.
- provider: str = 'openai'
- submit_batch(items, *, prompt=None, completion_window='24h')[source]
Upload items as a JSONL file and create an OpenAI batch job.
Returns immediately with a
BatchJobInfo(status = IN_PROGRESS).- Return type:
- poll_status(job_id)[source]
Fetch the current status of batch job job_id.
- Return type:
- get_results(job_id)[source]
Download and parse results for a completed batch job.
- Raises:
RuntimeError – If the job is not yet completed.
- Return type:
- submit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]
Submit a batch and block until it completes, then return results.
- Parameters:
- Raises:
TimeoutError – If the batch does not complete within max_wait_s.
RuntimeError – If the batch job fails or is cancelled.
- Return type:
- async asubmit_batch(items, *, prompt=None, completion_window='24h')[source]
Async variant of
submit_batch().- Return type:
- async apoll_status(job_id)[source]
Async variant of
poll_status().- Return type:
- async aget_results(job_id)[source]
Async variant of
get_results().- Return type:
- async asubmit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]
Async variant of
submit_and_wait().- Return type:
- class ractogateway.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
SQLAnalystPipelineAsync-first variant of
SQLAnalystPipeline.run()is a coroutine — useawait pipeline.run(...)directly. Designed for FastAPI, aiohttp, and other async frameworks.All constructor parameters and
run()parameters are identical toSQLAnalystPipeline.Example:
from ractogateway.pipelines import AsyncSQLAnalystPipeline from ractogateway.openai_developer_kit import Chat pipeline = AsyncSQLAnalystPipeline( kit=Chat(model="gpt-4o"), pandas_kit=Chat(model="gpt-3.5-turbo"), max_rows=5_000, safe_mode=True, ) # In an async context: result = await pipeline.run( user_query="Top 5 products by quantity sold?", connection_string="postgresql://user:pass@localhost/shop", ) if result.error: print("Error:", result.error) else: print(result.answer) result.plotly_figure.show()
- async run(user_query, **kwargs)[source]
Async
run()— delegates toSQLAnalystPipeline.arun().- Return type:
SQLAnalystResult
- class ractogateway.ChartSpec(**data)[source]
Bases:
BaseModelSpecification for a Plotly chart.
Pass to
SQLAnalystPipelineaschart=ChartSpec(...)or as a plaindict(e.g.chart={"chart_type": "bar", "x": "customer", "y": "revenue"}). Usechart="auto"to let the pipeline infer the best chart type from the DataFrame’s column dtypes with no extra LLM call.Supported chart types
bar·line·scatter·pie·histogram·box·area·heatmap·violin·funnelExample:
from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec result = pipeline.run( user_query="Top 5 customers by revenue?", ..., chart=ChartSpec(chart_type="bar", x="customer_name", y="revenue", title="Top 5 Customers"), ) result.plotly_figure.show()
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- chart_type: str
- title: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.PipelineUsage(**data)[source]
Bases:
BaseModelAggregated token usage across all LLM calls in the pipeline.
Tracks each step (SQL generation, pandas code generation, markdown answer generation) separately so you can see exactly where tokens are consumed.
Properties
- total_input_tokens:
Sum of all prompt tokens across every LLM step.
- total_output_tokens:
Sum of all completion tokens across every LLM step.
- total_tokens:
Grand total of every token consumed by the pipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- sql_input_tokens: int
- sql_output_tokens: int
- pandas_input_tokens: int
- pandas_output_tokens: int
- answer_input_tokens: int
- answer_output_tokens: int
- property total_input_tokens: int
- property total_output_tokens: int
- property total_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- exception ractogateway.RateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter denies a request for a given user.
- exception ractogateway.ReadOnlyViolationError[source]
Bases:
ValueErrorRaised when generated SQL contains a write operation in force_read_only mode.
- class ractogateway.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
objectNatural-language to SQL + pandas + Markdown answer + chart pipeline.
Converts a plain-English question into:
A read-only SQL query (LLM step — sql_kit)
Pandas analysis code executed against the SQL result (LLM step — pandas_kit)
A rich Markdown answer with table + insights (LLM step — answer_kit)
An optional Plotly figure built deterministically from a
ChartSpec(zero LLM calls — pure dtype heuristics or user-provided spec)
Two variants
SQLAnalystPipeline—run()sync,arun()async.AsyncSQLAnalystPipeline—run()is async (same asarun()).
- type kit:
- param kit:
Default LLM kit used for any step that doesn’t have its own kit.
- type sql_kit:
- param sql_kit:
Override kit for SQL generation. Falls back to kit.
- type pandas_kit:
- param pandas_kit:
Override kit for pandas code generation. Falls back to kit.
- type answer_kit:
- param answer_kit:
Override kit for Markdown answer generation. Falls back to kit.
- type answer_prompt:
- param answer_prompt:
Override default system prompts for each step.
- type sql_max_tokens:
- param sql_max_tokens:
LLM settings for the SQL step (default: 0.0 / 1024).
- type pandas_max_tokens:
- param pandas_max_tokens:
LLM settings for the pandas step (default: 0.0 / 2048).
- type answer_max_tokens:
- param answer_max_tokens:
LLM settings for the answer step (default: 0.3 / 2048).
- type run_pandas:
- param run_pandas:
Run pandas analysis step by default (default:
True).- type run_answer:
- param run_answer:
Run Markdown answer step by default (default:
True).- type chart:
- param chart:
Default chart behaviour:
"auto"(infer from data), aChartSpec, a plaindict, orNoneto skip charts. Default:"auto".- type force_read_only:
- param force_read_only:
Block any non-SELECT SQL (default:
True).- type tracer:
- param tracer:
Optional
RactoTracerinstance.- type metrics:
- param metrics:
Optional
GatewayMetricsMiddlewareinstance.- type engine:
- param engine:
Optional pre-built SQLAlchemy
Engine(e.g. with connection pooling). When provided,connection_string/host/port/ etc. params inrun()are ignored.- type max_sql_retries:
- param max_sql_retries:
Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default:
2.- type max_rows:
- param max_rows:
Safety cap on returned rows — auto-injects
LIMIT {max_rows}into the SQL if no LIMIT is already present. Set to0to disable. Default:10_000.- type schema_cache_ttl:
- param schema_cache_ttl:
Seconds to cache the schema introspection result in-process. Set to
0to disable caching. Default:3600(1 hour).- type allowed_tables:
- param allowed_tables:
Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.
- type blocked_columns:
- param blocked_columns:
Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like
ssnorcredit_card_number.- type mask_columns:
- param mask_columns:
Column names whose values are replaced with
"***MASKED***"in result rows before they are returned or passed to the answer LLM.- type table_docs:
- param table_docs:
{table_name: description}— appended as inline schema comments so the LLM understands table business meaning.- type column_docs:
- param column_docs:
{table_name: {column_name: description}}— per-column inline comments.- type safe_mode:
- param safe_mode:
When
True, all exceptions are caught and returned asSQLAnalystResult(error=...)instead of being raised. Default:False.- type memory:
- param memory:
Optional conversation memory object (e.g.
RedisChatMemory). Must implementget_history(session_id) -> list[dict]andappend(session_id, role, content).- type rate_limiter:
- param rate_limiter:
Optional rate-limiter object (e.g.
RedisRateLimiter). Must implementcheck_and_consume(user_id, tokens) -> boolandget_remaining(user_id) -> int.- type user_id:
- param user_id:
Default user identifier used for rate limiting and audit. Can be overridden per-call in
run()/arun().- param Example:::
from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec
- pipeline = SQLAnalystPipeline(
kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,
) result = pipeline.run(
user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,
) if result.error:
print(“Pipeline error:”, result.error)
- else:
print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)
- run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Run the full pipeline synchronously.
- Parameters:
user_query (
str) – Plain-English question to answer from the database.connection_string (
str|None) – Full SQLAlchemy URI. Ignored whenengineis provided.driver (
str) – Individual connection params used when both connection_string and engine are omitted.engine (
Any) – Per-call pre-built SQLAlchemyEngine. Overrides the pipeline-levelengineand all connection params.schema (
str|None) – Pre-computed schema string.None→ fetched automatically (with optional cache).force_read_only (
bool|None) – Override the corresponding pipeline-level defaults for this call./ (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)
answer_max_tokens (
int|None) – Per-call LLM setting overrides.max_rows (
int|None) – Per-call row limit override. Overrides the pipeline-levelmax_rows.user_id (
str|None) – Per-call user ID for rate limiting and audit.session_id (
str|None) – Conversation session ID used to fetch and save memory context.
- Return type:
SQLAnalystResult- Returns:
SQLAnalystResult
- async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Async variant of
run()— identical parameters.Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async
achat()method.- Return type:
SQLAnalystResult
- class ractogateway.SQLAnalystResult(**data)[source]
Bases:
BaseModelResult returned by
SQLAnalystPipeline.All fields except
user_queryhave sensible defaults so that a partial result can be returned whensafe_mode=Trueand an error occurs.Fields
- user_query:
The original natural-language question.
- schema_used:
The database schema string that was passed to (or fetched for) the LLM.
- sql_query:
The generated (and possibly LIMIT-injected) SQL SELECT statement.
- columns:
Column names returned by the SQL query.
- row_count:
Number of rows in
raw_rows.- raw_rows:
All rows from the SQL result as a list of dicts.
- pandas_code:
The LLM-generated pandas analysis code (
Noneifrun_pandas=False).- pandas_result:
Output of executing
pandas_code— DataFrame, scalar, or any value assigned toresultinside the code.Noneifrun_pandas=False.- answer:
Rich Markdown answer written by the LLM, including a results table and key insights.
Noneifrun_answer=False.- chart_spec:
The
ChartSpecdict used to build the Plotly figure.Noneif no chart was requested.- plotly_figure:
A
plotly.graph_objects.Figureobject ready to call.show()or.to_html().Noneif no chart was requested or plotly is not installed.- usage:
Aggregated token counts for all LLM steps in the pipeline.
- error:
Set when
safe_mode=Trueand an exception occurs.Nonemeans the pipeline completed successfully.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_query: str
- schema_used: str
- sql_query: str
- row_count: int
- usage: PipelineUsage
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- to_csv(path=None)[source]
Export the raw SQL result rows to CSV.
Does not require pandas — uses the standard-library
csvmodule.
- to_json(path=None, *, indent=2)[source]
Export the raw SQL result rows to JSON.
- to_excel(path, *, sheet_name='Results')[source]
Export the raw SQL result rows to an Excel file.
Requires
pandasandopenpyxl:pip install ractogateway[pipelines-sql] openpyxl
- class ractogateway.AsyncListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]
Bases:
ListClassifierPipelineAsync-first variant of
ListClassifierPipeline.run()is a coroutine —await pipeline.run(...)directly. Designed for FastAPI, aiohttp, Starlette, and other async frameworks.Constructor and all
run()parameters are identical toListClassifierPipeline.Example
pipeline = AsyncListClassifierPipeline.from_provider( "openai", "gpt-4o-mini", options=["Billing", "Support", "Sales"], safe_mode=True, ) # FastAPI handler: @app.post("/classify") async def classify(query: str): result = await pipeline.run(query) return result.as_dict()
- class ractogateway.AuditEntry(**data)[source]
Bases:
BaseModelImmutable audit record emitted to the
audit_loggerafter every call.Emitted regardless of whether the call was served from cache, hit an error, or was a live LLM classification. Provides a complete picture of every request for compliance, debugging, and analytics.
Fields
- timestamp:
ISO 8601 UTC timestamp of when the call was made (e.g.
"2026-02-26T14:23:01.456789Z").- user_query:
Original natural-language query.
- options_provided:
Full candidate list shown to the LLM (including
uncertain_labelif one was configured).- selected:
Option(s) chosen by the LLM, or empty on error.
- confidences:
Per-selection confidence scores, or
None.- all_scores:
Score for every option (when
score_all=True), orNone.- reasoning:
LLM explanation (when
include_reasoning=True), orNone.- fuzzy_corrected:
Truewhen the LLM returned a near-miss that was fuzzy-matched.- uncertain:
Truewhen the LLM selected theuncertain_labeloption.- cache_hit:
"exact"or"semantic"when the result was served from cache;Nonewhen a live LLM call was made.- user_id:
User identifier passed to the pipeline (for rate limiting / audit).
- session_id:
Conversation session identifier (for memory context).
- latency_ms:
Wall-clock latency of the entire pipeline call in milliseconds (near-zero for cache hits).
- usage:
Token usage dict — keys:
input_tokens,output_tokens,total_tokens,retry_count. All zero on cache hits.- error:
Non-
Nonewhensafe_mode=Trueand an exception occurred.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- timestamp: str
- user_query: str
- fuzzy_corrected: bool
- uncertain: bool
- latency_ms: float
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- exception ractogateway.ClassifierRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter denies a request for a given user.
- class ractogateway.ClassifierResult(**data)[source]
Bases:
BaseModelResult returned by
ListClassifierPipeline.All fields except
user_queryandoptions_providedhave sensible defaults so that a partial result can be returned whensafe_mode=Trueand an error occurs mid-pipeline.Fields
- user_query:
The original natural-language query passed to the pipeline.
- options_provided:
The full list of candidate strings presented to the LLM (including the injected
uncertain_labeloption if one was configured).- selected:
The option(s) chosen by the LLM, ordered by descending confidence. Empty when an error occurred and
safe_mode=True.- confidences:
Per-selection confidence scores in [0.0, 1.0], aligned with
selected.Nonewheninclude_confidence=False.- all_scores:
Confidence score for every option in the list, keyed by option string.
Nonewhenscore_all=False(the default).- reasoning:
Brief natural-language explanation produced by the LLM.
Nonewheninclude_reasoning=False.- fuzzy_corrected:
Truewhen the LLM returned a near-miss that was corrected by the built-in fuzzy matcher without consuming a retry.- uncertain:
Truewhen the LLM selected theuncertain_labeloption, indicating no real option matched the query well enough.- cache_hit:
"exact"or"semantic"when served from cache;Nonefor a live LLM call.- usage:
Aggregated token counts and retry statistics for this call.
- error:
Non-
Noneonly whensafe_mode=Trueand an exception occurred. Whenerroris set,selectedwill be empty.
Examples
>>> result.first # "Billing" >>> result.top_confidence # 0.95 >>> result.as_string() # "Billing, Account" >>> result.as_dict() # {"selected": ["Billing"], ...} >>> result.as_enum()["Billing"].value # "Billing" >>> result.uncertain # False >>> result.cache_hit # "exact" | "semantic" | None
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_query: str
- fuzzy_corrected: bool
- uncertain: bool
- usage: ClassifierUsage
- property is_empty: bool
Truewhen no options were selected (including error cases).
- as_string(separator=', ')[source]
Return selected options as a single joined string.
- as_dict()[source]
Return a plain
dictwith selected options and optional metadata.Always contains
"selected"."confidences","all_scores", and"reasoning"are included only when they are non-None.
- as_enum(name='SelectedOptions')[source]
Return a dynamic Python
enum.Enumof the selected options.- Parameters:
name (
str) – Class name for the generated Enum. Default:"SelectedOptions".- Return type:
- Returns:
type[Enum] – An Enum whose members have the option string as both name and value.
Example
>>> E = result.as_enum() >>> E["Billing"].value # "Billing"
- top_n(n)[source]
Return the top-n selected options.
- score_for(option)[source]
Return the confidence score for a specific option, or
None.Searches
all_scoresfirst (all options, whenscore_all=True), thenconfidencesfor selected items.
- to_audit_entry(*, timestamp, user_id=None, session_id=None, latency_ms=0.0)[source]
Build an
AuditEntryfrom this result.Called automatically by the pipeline — exposed here so that users can reconstruct audit entries from stored results if needed.
- Return type:
AuditEntry
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.ClassifierUsage(**data)[source]
Bases:
BaseModelToken usage and retry statistics for a single pipeline call.
Properties
- total_tokens:
Sum of input_tokens and output_tokens across all LLM attempts, including automatic retries triggered by invalid LLM responses. Zero on a cache hit (no LLM call was made).
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- input_tokens: int
- output_tokens: int
- retry_count: int
- property total_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.ListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]
Bases:
objectMap a natural-language query to one or more items from a candidate list.
Supports every RactoGateway provider via the
kitparameter or thefrom_provider()class factory. Internally builds a dynamic Pythonenum.Enumfrom the options list and validates every LLM response against it — hallucinations and paraphrased answers are caught, fuzzy- corrected if close enough, and retried otherwise.Two variants
ListClassifierPipeline—run()sync,arun()async.AsyncListClassifierPipeline—run()is async only.
- type kit:
- param kit:
Any RactoGateway developer kit (OpenAI, Anthropic, Google, Ollama, HuggingFace). Must expose
.chat(ChatConfig)and.achat(ChatConfig)methods. Usefrom_provider()instead of constructing kits manually when you only need provider + model.- type options:
- param options:
Default candidate strings. Can be overridden per-call. Must be non-empty and duplicate-free when provided.
- type selection_mode:
Literal['single','multiple']- param selection_mode:
"single"(default) — exactly one option."multiple"— one or more options. Overridable per-call.- type output_format:
Literal['string','dict','pydantic']- param output_format:
"pydantic"(default) —ClassifierResult."string"— comma-joined string."dict"— plaindict. Overridable per-call.- type prompt:
- param prompt:
Custom
RactoPromptto replace the built-in system prompt.- type temperature:
- param temperature:
LLM temperature. Default
0.0for deterministic output.- type max_tokens:
- param max_tokens:
Response token budget. Default
512.- type max_retries:
- param max_retries:
Retry attempts when LLM returns invalid JSON / unknown option. Default
2.- type include_confidence:
- param include_confidence:
Ask LLM for per-selection confidence scores [0.0–1.0]. Default
True.- type include_reasoning:
- param include_reasoning:
Ask LLM for a one-sentence explanation. Default
False.- type score_all:
- param score_all:
Ask LLM for a score for every option (not just selected ones). Stored in
result.all_scores. DefaultFalse.- type option_descriptions:
- param option_descriptions:
{option: description}— shown inline next to each option in the prompt to help the LLM distinguish similar categories.- type fuzzy_fallback:
- param fuzzy_fallback:
Use stdlib
difflibto correct near-miss LLM responses before consuming a retry. DefaultTrue.- type uncertain_label:
- param uncertain_label:
When set, this string is appended as an extra option that the LLM can pick when nothing matches (e.g.
"Other / None of the above").result.uncertainisTruewhen this label is selected.- type confidence_threshold:
- param confidence_threshold:
Drop selections below this score. Keeps highest-confidence match as fallback. Default
None(no filtering).- type case_sensitive:
- param case_sensitive:
Whether option matching is case-sensitive. Default
False.- type safe_mode:
- param safe_mode:
Return
ClassifierResult(error=...)instead of raising. DefaultFalse.- type tracer:
- param tracer:
Optional
RactoTracer.- type metrics:
- param metrics:
Optional
GatewayMetricsMiddleware.- type rate_limiter:
- param rate_limiter:
Duck-typed —
check_and_consume(user_id, tokens) -> bool+get_remaining(user_id) -> int.- type memory:
- param memory:
Duck-typed —
get_history(session_id) -> list[dict]+append(session_id, role, content).- type user_id:
- param user_id:
Default user ID for rate limiting. Overridable per-call.
Example
# Via kit directly from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import ListClassifierPipeline pipeline = ListClassifierPipeline( kit=Chat(model="gpt-4o-mini"), options=["Billing", "Technical Support", "Sales"], include_confidence=True, include_reasoning=True, ) result = pipeline.run("My invoice is wrong") print(result.first, result.top_confidence) # Via from_provider() — no manual kit import needed pipeline = ListClassifierPipeline.from_provider( "anthropic", "claude-haiku-4-5-20251001", options=["Billing", "Technical Support", "Sales"], )
- classmethod from_provider(provider, model, *, api_key=None, base_url=None, options=None, **kwargs)[source]
Create a pipeline by specifying provider + model — no kit import needed.
- Parameters:
provider (
str) – One of"openai","anthropic","google","ollama","huggingface".model (
str) –Model identifier string, e.g.:
OpenAI:
"gpt-4o-mini","gpt-4o"Anthropic:
"claude-haiku-4-5-20251001","claude-sonnet-4-6"Google:
"gemini-2.0-flash","gemini-1.5-pro"Ollama:
"llama3.2","mistral"HuggingFace:
"meta-llama/Llama-3.2-3B-Instruct"
api_key (
str|None) – Provider API key. Falls back to the standard env var for each provider (e.g.OPENAI_API_KEY,ANTHROPIC_API_KEY).base_url (
str|None) – Custom endpoint — used for Ollama (http://localhost:11434) or OpenAI-compatible proxies.options (
list[str] |None) – Default candidate options list.**kwargs (
Any) – Any otherListClassifierPipelineconstructor parameters (selection_mode,include_confidence,safe_mode, etc.).
- Return type:
ListClassifierPipeline- Returns:
ListClassifierPipeline
Example
pipeline = ListClassifierPipeline.from_provider( "anthropic", "claude-haiku-4-5-20251001", options=["Billing", "Support", "Sales"], include_reasoning=True, safe_mode=True, )
- static make_enum(options, name='OptionsEnum')[source]
Build a standalone dynamic
enum.Enumfrom an options list.Useful when you want enum-typed values outside the pipeline.
- Parameters:
- Return type:
- Returns:
type[Enum]
Example
E = ListClassifierPipeline.make_enum(["Red", "Green", "Blue"]) E["Red"].value # "Red"
- get_options()[source]
Return the pipeline-level options list, or
Noneif not set.
- set_options(options)[source]
Replace the entire pipeline-level options list.
Thread-safe — safe to call while the pipeline is in use.
- add_option(option, description=None)[source]
Append a new option to the pipeline-level list.
- remove_option(option)[source]
Remove an option from the pipeline-level list.
- run(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]
Classify user_query synchronously.
- Parameters:
user_query (
str) – Natural-language query to classify.options (
list[str] |None) – Per-call override for the candidate list. Omit to use the pipeline-level list. Pass[]to get aValueError.selection_mode (
Literal['single','multiple'] |None) – Per-call override —"single"or"multiple".output_format (
Literal['string','dict','pydantic'] |None) – Per-call override —"pydantic","string", or"dict".confidence_threshold (
float|None) – Per-call override. PassNoneexplicitly to disable filtering for this call even if a pipeline-level threshold is set.session_id (
str|None) – Conversation session ID for memory retrieval/storage.user_id (
str|None) – Per-call user ID for rate limiting and audit.
- Return type:
- Returns:
ClassifierResult | str | dict – Type depends on output_format.
- batch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]
Classify multiple queries synchronously, one after another.
Shares all per-call overrides across every query in the batch. Use
abatch_run()to run them concurrently in async contexts.
- async arun(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]
Async variant of
run()— identical parameters.
- async abatch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None, max_concurrency=None)[source]
Classify multiple queries concurrently with
asyncio.gather.- Parameters:
- Return type:
- Returns:
list – Results in the same order as queries.