ractogateway.pipelines

RactoGateway Prebuilt Pipelines.

Prebuilt end-to-end pipelines for real-world LLM workflows. Each pipeline is a self-contained class with run() / arun() methods, full observability hooks, per-step model control, and optional per-call overrides.

Available pipelines

SQLAnalystPipeline / AsyncSQLAnalystPipeline

NL → SQL → pandas → Markdown answer + optional Plotly chart. Requires: pip install ractogateway[pipelines-sql] Charts: pip install ractogateway[pipelines-sql-viz]

ListClassifierPipeline / AsyncListClassifierPipeline

NL query → best-matching item(s) from a list[str]. Uses dynamic Enum + Pydantic validation; supports single/multi selection, confidence scores, reasoning, retries, memory, rate limiting, and telemetry. No extra dependencies.

VideoProcessorPipeline / AsyncVideoProcessorPipeline

Process tutorial/lecture videos: frame dedup, audio transcription, vision-LLM analysis (whiteboard/screen), comprehensive summary, optional RAG storage. Accepts local paths, URLs, YouTube links, raw bytes, or pre-extracted frame images. Requires: pip install ractogateway[pipelines-video] Audio: pip install ractogateway[pipelines-video-whisper] YouTube: pip install ractogateway[pipelines-video-yt]

AgentPipeline / AsyncAgentPipeline

Autonomous ReAct agent: reason, call tools, observe, repeat. Supports RAG search, SQL query, HTTP fetch, memory, and any Python callable. Stops at max_steps or when finish() is called. HTTP tool: pip install ractogateway[pipelines-agent-http]

Usage:

from ractogateway.pipelines import SQLAnalystPipeline, ListClassifierPipeline
from ractogateway.openai_developer_kit import Chat

# SQL Analyst
pipeline = SQLAnalystPipeline(kit=Chat(model="gpt-4o"))
result = pipeline.run(
    user_query="Top 5 products by quantity sold?",
    connection_string="postgresql://user:pass@localhost/db",
)
print(result.answer)

# List Classifier
classifier = ListClassifierPipeline(
    kit=Chat(model="gpt-4o-mini"),
    options=["Billing", "Technical Support", "Sales", "Account Management"],
    selection_mode="single",
    include_confidence=True,
    include_reasoning=True,
)
result = classifier.run("I can't log into my account")
print(result.first)           # "Account Management"
print(result.top_confidence)  # 0.94
print(result.as_dict())       # {"selected": [...], "confidences": [...], ...}

# Video Processor
from ractogateway.pipelines import VideoProcessorPipeline, TranscriberBackend
vp = VideoProcessorPipeline(
    kit=Chat(model="gpt-4o"),
    fps=1.0,
    similarity_threshold=85.0,
    transcriber=TranscriberBackend.FASTER_WHISPER,
    transcriber_model="base",
    generate_summary=True,
)
res = vp.run("lecture.mp4")       # or YouTube URL / bytes / pre-extracted frames
print(res.summary)
res.to_markdown("report.md")

# Agent
from ractogateway.pipelines import AgentPipeline

def get_weather(city: str) -> str:
    """Return the current weather for a city."""
    return f"Sunny, 22 C in {city}"

agent = AgentPipeline(
    kit=Chat(model="gpt-4o"),
    tools=[get_weather],
    max_steps=6,
    safe_mode=True,
)
result = agent.run("What is the weather in Paris?")
print(result.final_answer)
print(result.to_markdown())
class ractogateway.pipelines.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Autonomous ReAct agent with sync run() and async arun().

Parameters:
  • kit (Any) – Any RactoGateway developer kit (must support chat() and achat() methods).

  • tools (list[Callable[..., Any]] | None) – List of plain callables or @tool-decorated functions to register. Each is registered by its __name__ (or __tool_name__ if set by the @tool decorator).

  • rag_pipeline (Any) – Optional RactoRAG instance — auto-registers a rag_search tool.

  • sql_pipeline (Any) – Optional SQLAnalystPipeline — auto-registers a sql_query tool.

  • enable_http (bool) – When True, registers an http_get tool that fetches URLs via httpx (requires pip install ractogateway[pipelines-agent-http]).

  • agent_memory (Any) – Any dict-like or object with get/set methods. Auto-registers memory_read and memory_write tools.

  • max_steps (int) – Hard cap on reasoning steps before the loop stops with StopReason.MAX_STEPS.

  • max_consecutive_errors (int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default 3.

  • tool_retries (int) – How many times to retry a failing tool before reporting the error to the LLM. Default 0 (no retry).

  • max_step_extension (int) – Maximum additional steps the agent may request via request_more_steps. 0 disables the feature.

  • max_parallel_tools (int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch. 1 forces sequential execution. Default 4.

  • system_prompt (str | None) – Fully override the auto-generated system prompt (advanced).

  • extra_rules (str) – Append an extra numbered rule to the default system prompt.

  • safe_mode (bool) – Catch all exceptions and surface them in result.error instead of re-raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Run the agent synchronously until it finishes or hits max_steps.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag passed to tracer spans (informational).

  • response_format (type | None) – A Pydantic BaseModel subclass. When provided, the agent’s final answer is parsed and validated into an instance stored in AgentResult.parsed_output.

Return type:

AgentResult

Returns:

AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.

async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]

Async variant of run().

Uses kit.achat() for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.

Parameters:
  • goal (str) – The task or question the agent should solve.

  • max_steps (Any) – Per-call override for the constructor max_steps.

  • user_id (Any) – Per-call override for the rate-limiter user identifier.

  • session_id (str | None) – Optional session tag for tracing.

  • response_format (type | None) – A Pydantic BaseModel subclass for structured output parsing.

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt sent to the LLM each step (read-only).

exception ractogateway.pipelines.AgentRateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter blocks an agent run.

class ractogateway.pipelines.AgentResult(**data)[source]

Bases: BaseModel

Full output of an AgentPipeline run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

goal: str

The original user goal / task string.

final_answer: str | None

The agent’s answer when finish() was called (None on max_steps/error).

steps: list[AgentStep]

All reasoning+action steps in execution order.

stop_reason: StopReason

Why the loop terminated.

usage: AgentUsage

Token and step counts.

error: str | None

Exception message when stop_reason is ERROR.

parsed_output: Any

Validated Pydantic model instance when response_format was passed to run(). Not included in JSON serialisation; call result.parsed_output.model_dump() manually.

get_tool_calls()[source]

Return (tool_name, tool_input) for every non-finish step.

Return type:

list[tuple[str, dict[str, Any]]]

get_observations()[source]

Return all tool observations in execution order.

Return type:

list[str]

succeeded()[source]

True when the agent reached a final answer.

Return type:

bool

to_json(path=None, *, indent=2)[source]

Serialise to JSON. Returns string when path is None.

Return type:

str | None

to_markdown(path=None)[source]

Build a step-by-step Markdown trace. Returns string when path is None.

Return type:

str | None

class ractogateway.pipelines.AgentStep(**data)[source]

Bases: BaseModel

One reasoning + action step in the ReAct loop.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

step_num: int

1-based position in the execution sequence.

thought: str | None

The LLM’s reasoning text before selecting a tool.

tool_name: str

Name of the tool that was invoked.

tool_input: dict[str, Any]

Arguments passed to the tool as a flat dict.

observation: str

The tool’s return value (truncated to 4000 chars when long).

duration_ms: float

Wall-clock time for tool execution in milliseconds.

is_finish: bool

True when this step invoked the finish() termination tool.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.AgentUsage(**data)[source]

Bases: BaseModel

Token and step accounting across the full agent run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

total_input_tokens: int

Cumulative input tokens across all LLM calls.

total_output_tokens: int

Cumulative output tokens across all LLM calls.

steps_taken: int

Number of reasoning+action steps executed.

tools_called: int

Number of non-finish tool invocations.

property total_tokens: int

Sum of all input and output tokens.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.AsyncAgentPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of AgentPipeline.

Exposes a single async run() method - suitable for FastAPI endpoints where a sync run() should not be in the public API.

All constructor parameters are identical to AgentPipeline.

async run(goal, **kwargs)[source]

Async-only agent entrypoint. See AgentPipeline.arun().

Return type:

AgentResult

property registered_tools: list[str]

Sorted list of all registered tool names.

property system_prompt: str

The system prompt used by the inner pipeline.

class ractogateway.pipelines.StopReason(*values)[source]

Bases: str, Enum

Why the agent loop terminated.

FINISHED = 'finished'
MAX_STEPS = 'max_steps'
ERROR = 'error'
CIRCUIT_BREAK = 'circuit_break'
class ractogateway.pipelines.ToolExecutor(tools, max_retries=0)[source]

Bases: object

Runs registered tools by name with sync and async support.

Parameters:
  • tools (dict[str, Callable]) – Mapping of tool name to callable.

  • max_retries (int) – How many times to retry a tool that raises an exception before reporting an error observation to the LLM. Default 0 = no retry.

property names: list[str]

Sorted list of registered tool names.

describe_all()[source]

Build the tools section for the agent system prompt.

Return type:

str

execute(tool_name, tool_input)[source]

Execute tool_name synchronously, retrying up to max_retries times on exception.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute(tool_name, tool_input)[source]

Execute tool_name asynchronously, retrying up to max_retries times on exception.

Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.

Return type:

tuple[str, float]

Returns:

tuple[str, float](observation, duration_ms)

async aexecute_parallel(calls)[source]

Execute multiple tool calls concurrently via asyncio.gather.

Parameters:

calls (list[tuple[str, dict[str, Any]]]) – List of (tool_name, tool_input) pairs to run in parallel.

Return type:

list[tuple[str, float]]

Returns:

list[tuple[str, float]] – Results in the same order as calls.

ractogateway.pipelines.make_finish_tool()[source]

Return the always-present finish tool.

When the LLM calls finish(answer=...), the agent loop stops and returns the answer as AgentResult.final_answer.

Return type:

tuple[str, Callable]

ractogateway.pipelines.make_http_tool()[source]

Return an http_get tool that fetches URL content via httpx.

Requires httpx: pip install ractogateway[pipelines-agent-http]

Return type:

tuple[str, Callable]

ractogateway.pipelines.make_memory_tools(agent_memory)[source]

Return memory_read and memory_write tools backed by agent_memory.

agent_memory can be any object supporting:

memory.get(key) -> Any
memory.set(key, value) -> None

or a plain dict.

Return type:

list[tuple[str, Callable]]

ractogateway.pipelines.make_rag_tool(rag_pipeline)[source]

Return a rag_search tool backed by a RactoRAG pipeline.

Return type:

tuple[str, Callable]

ractogateway.pipelines.make_rag_tool_async(rag_pipeline)[source]

Return an async rag_search tool backed by an async RactoRAG.

Return type:

tuple[str, Callable]

ractogateway.pipelines.make_sql_tool(sql_pipeline)[source]

Return a sql_query tool backed by a SQLAnalystPipeline.

Return type:

tuple[str, Callable]

class ractogateway.pipelines.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: SQLAnalystPipeline

Async-first variant of SQLAnalystPipeline.

run() is a coroutine — use await pipeline.run(...) directly. Designed for FastAPI, aiohttp, and other async frameworks.

All constructor parameters and run() parameters are identical to SQLAnalystPipeline.

Example:

from ractogateway.pipelines import AsyncSQLAnalystPipeline
from ractogateway.openai_developer_kit import Chat

pipeline = AsyncSQLAnalystPipeline(
    kit=Chat(model="gpt-4o"),
    pandas_kit=Chat(model="gpt-3.5-turbo"),
    max_rows=5_000,
    safe_mode=True,
)

# In an async context:
result = await pipeline.run(
    user_query="Top 5 products by quantity sold?",
    connection_string="postgresql://user:pass@localhost/shop",
)
if result.error:
    print("Error:", result.error)
else:
    print(result.answer)
    result.plotly_figure.show()
async run(user_query, **kwargs)[source]

Async run() — delegates to SQLAnalystPipeline.arun().

Return type:

SQLAnalystResult

class ractogateway.pipelines.ChartSpec(**data)[source]

Bases: BaseModel

Specification for a Plotly chart.

Pass to SQLAnalystPipeline as chart=ChartSpec(...) or as a plain dict (e.g. chart={"chart_type": "bar", "x": "customer", "y": "revenue"}). Use chart="auto" to let the pipeline infer the best chart type from the DataFrame’s column dtypes with no extra LLM call.

Supported chart types

bar · line · scatter · pie · histogram · box · area · heatmap · violin · funnel

Example:

from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

result = pipeline.run(
    user_query="Top 5 customers by revenue?",
    ...,
    chart=ChartSpec(chart_type="bar", x="customer_name", y="revenue",
                    title="Top 5 Customers"),
)
result.plotly_figure.show()

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

chart_type: str
x: str | None
y: str | list[str] | None
color: str | None
title: str
x_label: str | None
y_label: str | None
orientation: Literal['v', 'h'] | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.PipelineUsage(**data)[source]

Bases: BaseModel

Aggregated token usage across all LLM calls in the pipeline.

Tracks each step (SQL generation, pandas code generation, markdown answer generation) separately so you can see exactly where tokens are consumed.

Properties

total_input_tokens:

Sum of all prompt tokens across every LLM step.

total_output_tokens:

Sum of all completion tokens across every LLM step.

total_tokens:

Grand total of every token consumed by the pipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

sql_input_tokens: int
sql_output_tokens: int
pandas_input_tokens: int
pandas_output_tokens: int
answer_input_tokens: int
answer_output_tokens: int
property total_input_tokens: int
property total_output_tokens: int
property total_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception ractogateway.pipelines.RateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter denies a request for a given user.

class ractogateway.pipelines.ReadOnlySQLGuard[source]

Bases: object

Validates that a SQL string contains only read (SELECT) operations.

Uses a keyword regex to detect any DML/DDL statement that would modify data or schema. Call check() before executing any LLM-generated SQL when force_read_only=True.

Example:

ReadOnlySQLGuard.check("SELECT * FROM users")   # passes silently
ReadOnlySQLGuard.check("DROP TABLE users")       # raises ReadOnlyViolationError
static check(sql)[source]

Raise ReadOnlyViolationError if sql contains a write operation.

Parameters:

sql (str) – The SQL string to validate.

Raises:

ReadOnlyViolationError – If the SQL contains a disallowed keyword.

Return type:

None

exception ractogateway.pipelines.ReadOnlyViolationError[source]

Bases: ValueError

Raised when generated SQL contains a write operation in force_read_only mode.

class ractogateway.pipelines.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: object

Natural-language to SQL + pandas + Markdown answer + chart pipeline.

Converts a plain-English question into:

  1. A read-only SQL query (LLM step — sql_kit)

  2. Pandas analysis code executed against the SQL result (LLM step — pandas_kit)

  3. A rich Markdown answer with table + insights (LLM step — answer_kit)

  4. An optional Plotly figure built deterministically from a ChartSpec (zero LLM calls — pure dtype heuristics or user-provided spec)

Two variants

  • SQLAnalystPipelinerun() sync, arun() async.

  • AsyncSQLAnalystPipelinerun() is async (same as arun()).

type kit:

Any

param kit:

Default LLM kit used for any step that doesn’t have its own kit.

type sql_kit:

Any | None

param sql_kit:

Override kit for SQL generation. Falls back to kit.

type pandas_kit:

Any | None

param pandas_kit:

Override kit for pandas code generation. Falls back to kit.

type answer_kit:

Any | None

param answer_kit:

Override kit for Markdown answer generation. Falls back to kit.

type answer_prompt:

RactoPrompt | None

param answer_prompt:

Override default system prompts for each step.

type sql_max_tokens:

int

param sql_max_tokens:

LLM settings for the SQL step (default: 0.0 / 1024).

type pandas_max_tokens:

int

param pandas_max_tokens:

LLM settings for the pandas step (default: 0.0 / 2048).

type answer_max_tokens:

int

param answer_max_tokens:

LLM settings for the answer step (default: 0.3 / 2048).

type run_pandas:

bool

param run_pandas:

Run pandas analysis step by default (default: True).

type run_answer:

bool

param run_answer:

Run Markdown answer step by default (default: True).

type chart:

ChartSpec | dict[str, Any] | str | None

param chart:

Default chart behaviour: "auto" (infer from data), a ChartSpec, a plain dict, or None to skip charts. Default: "auto".

type force_read_only:

bool

param force_read_only:

Block any non-SELECT SQL (default: True).

type tracer:

Any | None

param tracer:

Optional RactoTracer instance.

type metrics:

Any | None

param metrics:

Optional GatewayMetricsMiddleware instance.

type engine:

Any | None

param engine:

Optional pre-built SQLAlchemy Engine (e.g. with connection pooling). When provided, connection_string / host / port / etc. params in run() are ignored.

type max_sql_retries:

int

param max_sql_retries:

Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default: 2.

type max_rows:

int

param max_rows:

Safety cap on returned rows — auto-injects LIMIT {max_rows} into the SQL if no LIMIT is already present. Set to 0 to disable. Default: 10_000.

type schema_cache_ttl:

float

param schema_cache_ttl:

Seconds to cache the schema introspection result in-process. Set to 0 to disable caching. Default: 3600 (1 hour).

type allowed_tables:

list[str] | None

param allowed_tables:

Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.

type blocked_columns:

list[str] | None

param blocked_columns:

Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like ssn or credit_card_number.

type mask_columns:

list[str] | None

param mask_columns:

Column names whose values are replaced with "***MASKED***" in result rows before they are returned or passed to the answer LLM.

type table_docs:

dict[str, str] | None

param table_docs:

{table_name: description} — appended as inline schema comments so the LLM understands table business meaning.

type column_docs:

dict[str, dict[str, str]] | None

param column_docs:

{table_name: {column_name: description}} — per-column inline comments.

type safe_mode:

bool

param safe_mode:

When True, all exceptions are caught and returned as SQLAnalystResult(error=...) instead of being raised. Default: False.

type memory:

Any | None

param memory:

Optional conversation memory object (e.g. RedisChatMemory). Must implement get_history(session_id) -> list[dict] and append(session_id, role, content).

type rate_limiter:

Any | None

param rate_limiter:

Optional rate-limiter object (e.g. RedisRateLimiter). Must implement check_and_consume(user_id, tokens) -> bool and get_remaining(user_id) -> int.

type user_id:

str | None

param user_id:

Default user identifier used for rate limiting and audit. Can be overridden per-call in run() / arun().

param Example:::

from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

pipeline = SQLAnalystPipeline(

kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,

) result = pipeline.run(

user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,

) if result.error:

print(“Pipeline error:”, result.error)

else:

print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)

run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Run the full pipeline synchronously.

Parameters:
  • user_query (str) – Plain-English question to answer from the database.

  • connection_string (str | None) – Full SQLAlchemy URI. Ignored when engine is provided.

  • driver (str) – Individual connection params used when both connection_string and engine are omitted.

  • engine (Any) – Per-call pre-built SQLAlchemy Engine. Overrides the pipeline-level engine and all connection params.

  • schema (str | None) – Pre-computed schema string. None → fetched automatically (with optional cache).

  • force_read_only (bool | None) – Override the corresponding pipeline-level defaults for this call.

  • / (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)

  • answer_max_tokens (int | None) – Per-call LLM setting overrides.

  • max_rows (int | None) – Per-call row limit override. Overrides the pipeline-level max_rows.

  • user_id (str | None) – Per-call user ID for rate limiting and audit.

  • session_id (str | None) – Conversation session ID used to fetch and save memory context.

Return type:

SQLAnalystResult

Returns:

SQLAnalystResult

async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Async variant of run() — identical parameters.

Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async achat() method.

Return type:

SQLAnalystResult

class ractogateway.pipelines.SQLAnalystResult(**data)[source]

Bases: BaseModel

Result returned by SQLAnalystPipeline.

All fields except user_query have sensible defaults so that a partial result can be returned when safe_mode=True and an error occurs.

Fields

user_query:

The original natural-language question.

schema_used:

The database schema string that was passed to (or fetched for) the LLM.

sql_query:

The generated (and possibly LIMIT-injected) SQL SELECT statement.

columns:

Column names returned by the SQL query.

row_count:

Number of rows in raw_rows.

raw_rows:

All rows from the SQL result as a list of dicts.

pandas_code:

The LLM-generated pandas analysis code (None if run_pandas=False).

pandas_result:

Output of executing pandas_code — DataFrame, scalar, or any value assigned to result inside the code. None if run_pandas=False.

answer:

Rich Markdown answer written by the LLM, including a results table and key insights. None if run_answer=False.

chart_spec:

The ChartSpec dict used to build the Plotly figure. None if no chart was requested.

plotly_figure:

A plotly.graph_objects.Figure object ready to call .show() or .to_html(). None if no chart was requested or plotly is not installed.

usage:

Aggregated token counts for all LLM steps in the pipeline.

error:

Set when safe_mode=True and an exception occurs. None means the pipeline completed successfully.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_query: str
schema_used: str
sql_query: str
columns: list[str]
row_count: int
raw_rows: list[dict[str, Any]]
pandas_code: str | None
pandas_result: Any | None
answer: str | None
chart_spec: dict[str, Any] | None
plotly_figure: Any | None
usage: PipelineUsage
error: str | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

to_csv(path=None)[source]

Export the raw SQL result rows to CSV.

Does not require pandas — uses the standard-library csv module.

Parameters:

path (str | None) – File path to write to. When None the CSV string is returned.

Return type:

str | None

Returns:

str | None – CSV string when path is None; otherwise None (file written).

to_json(path=None, *, indent=2)[source]

Export the raw SQL result rows to JSON.

Parameters:
  • path (str | None) – File path to write to. When None the JSON string is returned.

  • indent (int) – JSON indentation level (default: 2).

Return type:

str | None

Returns:

str | None – JSON string when path is None; otherwise None (file written).

to_excel(path, *, sheet_name='Results')[source]

Export the raw SQL result rows to an Excel file.

Requires pandas and openpyxl:

pip install ractogateway[pipelines-sql] openpyxl
Parameters:
  • path (str) – File path to write to (must end in .xlsx).

  • sheet_name (str) – Excel sheet name (default: "Results").

Return type:

None

ractogateway.pipelines.clear_schema_cache()[source]

Evict all entries from the in-process schema cache.

Useful in tests or when the database schema changes at runtime.

Return type:

None

class ractogateway.pipelines.AsyncListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]

Bases: ListClassifierPipeline

Async-first variant of ListClassifierPipeline.

run() is a coroutine — await pipeline.run(...) directly. Designed for FastAPI, aiohttp, Starlette, and other async frameworks.

Constructor and all run() parameters are identical to ListClassifierPipeline.

Example

pipeline = AsyncListClassifierPipeline.from_provider(
    "openai", "gpt-4o-mini",
    options=["Billing", "Support", "Sales"],
    safe_mode=True,
)

# FastAPI handler:
@app.post("/classify")
async def classify(query: str):
    result = await pipeline.run(query)
    return result.as_dict()
async run(user_query, **kwargs)[source]

Async run() — delegates to ListClassifierPipeline.arun().

Return type:

ClassifierResult | str | dict[str, Any]

class ractogateway.pipelines.AuditEntry(**data)[source]

Bases: BaseModel

Immutable audit record emitted to the audit_logger after every call.

Emitted regardless of whether the call was served from cache, hit an error, or was a live LLM classification. Provides a complete picture of every request for compliance, debugging, and analytics.

Fields

timestamp:

ISO 8601 UTC timestamp of when the call was made (e.g. "2026-02-26T14:23:01.456789Z").

user_query:

Original natural-language query.

options_provided:

Full candidate list shown to the LLM (including uncertain_label if one was configured).

selected:

Option(s) chosen by the LLM, or empty on error.

confidences:

Per-selection confidence scores, or None.

all_scores:

Score for every option (when score_all=True), or None.

reasoning:

LLM explanation (when include_reasoning=True), or None.

fuzzy_corrected:

True when the LLM returned a near-miss that was fuzzy-matched.

uncertain:

True when the LLM selected the uncertain_label option.

cache_hit:

"exact" or "semantic" when the result was served from cache; None when a live LLM call was made.

user_id:

User identifier passed to the pipeline (for rate limiting / audit).

session_id:

Conversation session identifier (for memory context).

latency_ms:

Wall-clock latency of the entire pipeline call in milliseconds (near-zero for cache hits).

usage:

Token usage dict — keys: input_tokens, output_tokens, total_tokens, retry_count. All zero on cache hits.

error:

Non-None when safe_mode=True and an exception occurred.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

timestamp: str
user_query: str
options_provided: list[str]
selected: list[str]
confidences: list[float] | None
all_scores: dict[str, float] | None
reasoning: str | None
fuzzy_corrected: bool
uncertain: bool
cache_hit: str | None
user_id: str | None
session_id: str | None
latency_ms: float
usage: dict[str, int]
error: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception ractogateway.pipelines.ClassifierRateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter denies a request for a given user.

class ractogateway.pipelines.ClassifierResult(**data)[source]

Bases: BaseModel

Result returned by ListClassifierPipeline.

All fields except user_query and options_provided have sensible defaults so that a partial result can be returned when safe_mode=True and an error occurs mid-pipeline.

Fields

user_query:

The original natural-language query passed to the pipeline.

options_provided:

The full list of candidate strings presented to the LLM (including the injected uncertain_label option if one was configured).

selected:

The option(s) chosen by the LLM, ordered by descending confidence. Empty when an error occurred and safe_mode=True.

confidences:

Per-selection confidence scores in [0.0, 1.0], aligned with selected. None when include_confidence=False.

all_scores:

Confidence score for every option in the list, keyed by option string. None when score_all=False (the default).

reasoning:

Brief natural-language explanation produced by the LLM. None when include_reasoning=False.

fuzzy_corrected:

True when the LLM returned a near-miss that was corrected by the built-in fuzzy matcher without consuming a retry.

uncertain:

True when the LLM selected the uncertain_label option, indicating no real option matched the query well enough.

cache_hit:

"exact" or "semantic" when served from cache; None for a live LLM call.

usage:

Aggregated token counts and retry statistics for this call.

error:

Non-None only when safe_mode=True and an exception occurred. When error is set, selected will be empty.

Examples

>>> result.first                         # "Billing"
>>> result.top_confidence                # 0.95
>>> result.as_string()                   # "Billing, Account"
>>> result.as_dict()                     # {"selected": ["Billing"], ...}
>>> result.as_enum()["Billing"].value    # "Billing"
>>> result.uncertain                     # False
>>> result.cache_hit                     # "exact" | "semantic" | None

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_query: str
options_provided: list[str]
selected: list[str]
confidences: list[float] | None
all_scores: dict[str, float] | None
reasoning: str | None
fuzzy_corrected: bool
uncertain: bool
cache_hit: str | None
usage: ClassifierUsage
error: str | None
property first: str | None

The first (highest-priority) selected option, or None if empty.

property top_confidence: float | None

Confidence score for the first selected option, or None.

property is_empty: bool

True when no options were selected (including error cases).

as_string(separator=', ')[source]

Return selected options as a single joined string.

Parameters:

separator (str) – Delimiter placed between items. Default: ", ".

Return type:

str

Returns:

str – E.g. "Billing, Account" for two selections.

as_dict()[source]

Return a plain dict with selected options and optional metadata.

Always contains "selected". "confidences", "all_scores", and "reasoning" are included only when they are non-None.

Return type:

dict[str, Any]

Returns:

dict[str, Any]

Example:

{
    "selected":    ["Billing", "Account"],
    "confidences": [0.95, 0.82],
    "all_scores":  {"Billing": 0.95, "Account": 0.82, "Sales": 0.12},
    "reasoning":   "Both topics are mentioned explicitly.",
}

as_enum(name='SelectedOptions')[source]

Return a dynamic Python enum.Enum of the selected options.

Parameters:

name (str) – Class name for the generated Enum. Default: "SelectedOptions".

Return type:

type[Enum]

Returns:

type[Enum] – An Enum whose members have the option string as both name and value.

Example

>>> E = result.as_enum()
>>> E["Billing"].value   # "Billing"
top_n(n)[source]

Return the top-n selected options.

Parameters:

n (int) – Maximum number of options to return.

Return type:

list[str]

score_for(option)[source]

Return the confidence score for a specific option, or None.

Searches all_scores first (all options, when score_all=True), then confidences for selected items.

Parameters:

option (str) – The option string to look up.

Return type:

float | None

to_audit_entry(*, timestamp, user_id=None, session_id=None, latency_ms=0.0)[source]

Build an AuditEntry from this result.

Called automatically by the pipeline — exposed here so that users can reconstruct audit entries from stored results if needed.

Return type:

AuditEntry

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.ClassifierUsage(**data)[source]

Bases: BaseModel

Token usage and retry statistics for a single pipeline call.

Properties

total_tokens:

Sum of input_tokens and output_tokens across all LLM attempts, including automatic retries triggered by invalid LLM responses. Zero on a cache hit (no LLM call was made).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

input_tokens: int
output_tokens: int
retry_count: int
property total_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.ListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]

Bases: object

Map a natural-language query to one or more items from a candidate list.

Supports every RactoGateway provider via the kit parameter or the from_provider() class factory. Internally builds a dynamic Python enum.Enum from the options list and validates every LLM response against it — hallucinations and paraphrased answers are caught, fuzzy- corrected if close enough, and retried otherwise.

Two variants

  • ListClassifierPipelinerun() sync, arun() async.

  • AsyncListClassifierPipelinerun() is async only.

type kit:

Any

param kit:

Any RactoGateway developer kit (OpenAI, Anthropic, Google, Ollama, HuggingFace). Must expose .chat(ChatConfig) and .achat(ChatConfig) methods. Use from_provider() instead of constructing kits manually when you only need provider + model.

type options:

list[str] | None

param options:

Default candidate strings. Can be overridden per-call. Must be non-empty and duplicate-free when provided.

type selection_mode:

Literal['single', 'multiple']

param selection_mode:

"single" (default) — exactly one option. "multiple" — one or more options. Overridable per-call.

type output_format:

Literal['string', 'dict', 'pydantic']

param output_format:

"pydantic" (default) — ClassifierResult. "string" — comma-joined string. "dict" — plain dict. Overridable per-call.

type prompt:

RactoPrompt | None

param prompt:

Custom RactoPrompt to replace the built-in system prompt.

type temperature:

float

param temperature:

LLM temperature. Default 0.0 for deterministic output.

type max_tokens:

int

param max_tokens:

Response token budget. Default 512.

type max_retries:

int

param max_retries:

Retry attempts when LLM returns invalid JSON / unknown option. Default 2.

type include_confidence:

bool

param include_confidence:

Ask LLM for per-selection confidence scores [0.0–1.0]. Default True.

type include_reasoning:

bool

param include_reasoning:

Ask LLM for a one-sentence explanation. Default False.

type score_all:

bool

param score_all:

Ask LLM for a score for every option (not just selected ones). Stored in result.all_scores. Default False.

type option_descriptions:

dict[str, str] | None

param option_descriptions:

{option: description} — shown inline next to each option in the prompt to help the LLM distinguish similar categories.

type fuzzy_fallback:

bool

param fuzzy_fallback:

Use stdlib difflib to correct near-miss LLM responses before consuming a retry. Default True.

type uncertain_label:

str | None

param uncertain_label:

When set, this string is appended as an extra option that the LLM can pick when nothing matches (e.g. "Other / None of the above"). result.uncertain is True when this label is selected.

type confidence_threshold:

float | None

param confidence_threshold:

Drop selections below this score. Keeps highest-confidence match as fallback. Default None (no filtering).

type case_sensitive:

bool

param case_sensitive:

Whether option matching is case-sensitive. Default False.

type safe_mode:

bool

param safe_mode:

Return ClassifierResult(error=...) instead of raising. Default False.

type tracer:

Any | None

param tracer:

Optional RactoTracer.

type metrics:

Any | None

param metrics:

Optional GatewayMetricsMiddleware.

type rate_limiter:

Any | None

param rate_limiter:

Duck-typed — check_and_consume(user_id, tokens) -> bool + get_remaining(user_id) -> int.

type memory:

Any | None

param memory:

Duck-typed — get_history(session_id) -> list[dict] + append(session_id, role, content).

type user_id:

str | None

param user_id:

Default user ID for rate limiting. Overridable per-call.

Example

# Via kit directly
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines import ListClassifierPipeline

pipeline = ListClassifierPipeline(
    kit=Chat(model="gpt-4o-mini"),
    options=["Billing", "Technical Support", "Sales"],
    include_confidence=True,
    include_reasoning=True,
)
result = pipeline.run("My invoice is wrong")
print(result.first, result.top_confidence)

# Via from_provider() — no manual kit import needed
pipeline = ListClassifierPipeline.from_provider(
    "anthropic", "claude-haiku-4-5-20251001",
    options=["Billing", "Technical Support", "Sales"],
)
classmethod from_provider(provider, model, *, api_key=None, base_url=None, options=None, **kwargs)[source]

Create a pipeline by specifying provider + model — no kit import needed.

Parameters:
  • provider (str) – One of "openai", "anthropic", "google", "ollama", "huggingface".

  • model (str) –

    Model identifier string, e.g.:

    • OpenAI: "gpt-4o-mini", "gpt-4o"

    • Anthropic: "claude-haiku-4-5-20251001", "claude-sonnet-4-6"

    • Google: "gemini-2.0-flash", "gemini-1.5-pro"

    • Ollama: "llama3.2", "mistral"

    • HuggingFace: "meta-llama/Llama-3.2-3B-Instruct"

  • api_key (str | None) – Provider API key. Falls back to the standard env var for each provider (e.g. OPENAI_API_KEY, ANTHROPIC_API_KEY).

  • base_url (str | None) – Custom endpoint — used for Ollama (http://localhost:11434) or OpenAI-compatible proxies.

  • options (list[str] | None) – Default candidate options list.

  • **kwargs (Any) – Any other ListClassifierPipeline constructor parameters (selection_mode, include_confidence, safe_mode, etc.).

Return type:

ListClassifierPipeline

Returns:

ListClassifierPipeline

Example

pipeline = ListClassifierPipeline.from_provider(
    "anthropic",
    "claude-haiku-4-5-20251001",
    options=["Billing", "Support", "Sales"],
    include_reasoning=True,
    safe_mode=True,
)
static make_enum(options, name='OptionsEnum')[source]

Build a standalone dynamic enum.Enum from an options list.

Useful when you want enum-typed values outside the pipeline.

Parameters:
  • options (list[str]) – List of option strings.

  • name (str) – Enum class name. Default "OptionsEnum".

Return type:

type[Enum]

Returns:

type[Enum]

Example

E = ListClassifierPipeline.make_enum(["Red", "Green", "Blue"])
E["Red"].value   # "Red"
get_options()[source]

Return the pipeline-level options list, or None if not set.

Return type:

list[str] | None

set_options(options)[source]

Replace the entire pipeline-level options list.

Thread-safe — safe to call while the pipeline is in use.

Parameters:

options (list[str]) – New options list. Must be non-empty and duplicate-free.

Return type:

None

add_option(option, description=None)[source]

Append a new option to the pipeline-level list.

Parameters:
  • option (str) – The option string to add.

  • description (str | None) – Optional inline description for the option.

Return type:

None

remove_option(option)[source]

Remove an option from the pipeline-level list.

Parameters:

option (str) – The option string to remove. Raises ValueError if not found.

Return type:

None

run(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]

Classify user_query synchronously.

Parameters:
  • user_query (str) – Natural-language query to classify.

  • options (list[str] | None) – Per-call override for the candidate list. Omit to use the pipeline-level list. Pass [] to get a ValueError.

  • selection_mode (Literal['single', 'multiple'] | None) – Per-call override — "single" or "multiple".

  • output_format (Literal['string', 'dict', 'pydantic'] | None) – Per-call override — "pydantic", "string", or "dict".

  • max_tokens (int | None) – Per-call LLM setting overrides.

  • confidence_threshold (float | None) – Per-call override. Pass None explicitly to disable filtering for this call even if a pipeline-level threshold is set.

  • session_id (str | None) – Conversation session ID for memory retrieval/storage.

  • user_id (str | None) – Per-call user ID for rate limiting and audit.

Return type:

ClassifierResult | str | dict[str, Any]

Returns:

ClassifierResult | str | dict – Type depends on output_format.

batch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]

Classify multiple queries synchronously, one after another.

Shares all per-call overrides across every query in the batch. Use abatch_run() to run them concurrently in async contexts.

Parameters:

queries (list[str]) – List of natural-language queries to classify.

Return type:

list[ClassifierResult | str | dict[str, Any]]

Returns:

list – One result per query, in the same order.

async arun(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]

Async variant of run() — identical parameters.

Return type:

ClassifierResult | str | dict[str, Any]

async abatch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None, max_concurrency=None)[source]

Classify multiple queries concurrently with asyncio.gather.

Parameters:
  • queries (list[str]) – List of natural-language queries.

  • max_concurrency (int | None) – Cap the number of simultaneous LLM calls. None (default) runs all queries in parallel. Set to e.g. 5 to avoid rate-limit errors on large batches.

Return type:

list[ClassifierResult | str | dict[str, Any]]

Returns:

list – Results in the same order as queries.

class ractogateway.pipelines.AsyncVideoProcessorPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of VideoProcessorPipeline.

Exposes a single async run() method — suitable for FastAPI endpoints where you do not want a sync run() in the public API.

All constructor parameters are identical to VideoProcessorPipeline.

async run(source, **kwargs)[source]

Async-only process entrypoint.

Return type:

VideoProcessorResult

async answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Async-only variant of VideoProcessorPipeline.aanswer_question().

Return type:

VideoProcessorResult

static parse_timestamp(value)[source]

Delegate to VideoProcessorPipeline.parse_timestamp().

Return type:

float

class ractogateway.pipelines.DeduplicationMethod(*values)[source]

Bases: str, Enum

Frame similarity algorithm used for deduplication.

PHASH = 'phash'
SSIM = 'ssim'
class ractogateway.pipelines.FrameAnalysisMode(*values)[source]

Bases: str, Enum

How frames are sent to the vision LLM.

INDIVIDUAL = 'individual'
GRID = 'grid'
class ractogateway.pipelines.FrameEntry(**data)[source]

Bases: BaseModel

One video frame, after extraction and optional analysis.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

frame_id: int

Zero-based sequential frame identifier.

timestamp: float

Position in the video in seconds.

similarity_to_prev: float | None

Similarity percentage to the previous kept frame (None for first frame).

kept: bool

False if discarded by the deduplication step.

analysis: str | None

LLM-generated description of visual content (whiteboard, screen, etc.).

image_data: bytes | None

Raw image bytes for kept + analyzed frames.

image_format: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.TranscriberBackend(*values)[source]

Bases: str, Enum

Audio transcription backend.

FASTER_WHISPER = 'faster-whisper'
OPENAI_WHISPER = 'openai-whisper'
HUGGINGFACE_LOCAL = 'huggingface-local'
OPENAI_API = 'openai-api'
GOOGLE_API = 'google-api'
HUGGINGFACE_API = 'huggingface-api'
GROQ_API = 'groq-api'
DEEPGRAM_API = 'deepgram-api'
OLLAMA = 'ollama'
class ractogateway.pipelines.TranscriptSegment(**data)[source]

Bases: BaseModel

A time-bounded transcription segment aligned to frame IDs.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

start: float

Segment start time in seconds.

end: float

Segment end time in seconds.

text: str

Transcribed text for this segment.

frame_ids: list[int]

IDs of kept frames whose timestamps fall within [start, end].

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.VideoConfig(**data)[source]

Bases: BaseModel

All tunable hyperparameters for VideoProcessorPipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

fps: float

Frames to sample per second of video.

similarity_threshold: float

Discard a frame whose similarity to the previous kept frame is >= this %. Lower = keep more frames. Range 0-100.

max_frames: int | None

Hard cap on frames kept (None = no cap).

dedup_method: DeduplicationMethod

Algorithm used to compare frame similarity.

frame_format: str

‘JPEG’ (smaller) or ‘PNG’ (lossless).

Type:

Image format for kept frames

analyze_frames: bool

Pass kept frames to the vision LLM for content extraction.

frame_analysis_mode: FrameAnalysisMode

Individual = one LLM call per frame; Grid = stitch frames into a collage.

grid_size: int

Number of frames per grid collage (used when frame_analysis_mode=’grid’).

batch_size: int

How many frames to submit to the LLM concurrently per batch.

max_workers: int

Thread-pool size for concurrent LLM frame analysis calls.

max_process_workers: int

Process-pool size for CPU-bound frame extraction / hashing.

transcribe_audio: bool

Extract and transcribe the video’s audio track.

transcriber_backend: TranscriberBackend

Which transcription engine to use.

transcriber_model: str

Model name / size — interpretation is backend-specific.

Examples:

faster-whisper / openai-whisper : “tiny” “base” “small” “medium” “large-v3” huggingface-local / -api : HF model ID e.g. “openai/whisper-large-v3” openai-api : “whisper-1” google-api : “long” “short” “latest_long” groq-api : “whisper-large-v3” “whisper-large-v3-turbo” deepgram-api : “nova-3” “nova-2” “enhanced” “base” ollama : model name on server e.g. “whisper”

transcriber_api_key: str | None

API key for cloud transcription backends (falls back to env vars).

transcriber_base_url: str | None

Base URL for self-hosted / Ollama transcription endpoints.

language: str | None

BCP-47 language code (e.g. ‘en’, ‘fr’). None = auto-detect.

generate_summary: bool

Generate a comprehensive textual summary at the end.

store_in_rag: bool

Push all extracted content into the supplied rag_pipeline for Q&A.

processing_mode: VideoProcessingMode

active processes full video; passive processes only a time window.

focus_time_seconds: float | None

10).

Type:

Center timestamp in seconds for passive mode (e.g. 130 for 02

window_seconds: float

Passive-mode half-window size in seconds (focus ± window_seconds).

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.VideoProcessorPipeline(kit, *, analysis_kit=None, summary_kit=None, transcriber=TranscriberBackend.FASTER_WHISPER, transcriber_model='base', transcriber_api_key=None, transcriber_base_url=None, fps=1.0, similarity_threshold=90.0, dedup_method=DeduplicationMethod.PHASH, max_frames=None, frame_format='JPEG', frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL, grid_size=4, batch_size=10, max_workers=4, max_process_workers=4, language=None, transcribe_audio=True, analyze_frames=True, generate_summary=True, processing_mode=VideoProcessingMode.ACTIVE, focus_time_seconds=None, window_seconds=5.0, rag_pipeline=None, safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Synchronous + asynchronous video processing pipeline.

Parameters:
  • kit (Any) – A RactoGateway developer kit (Chat) used for both frame analysis and summary generation unless analysis_kit or summary_kit are provided.

  • analysis_kit (Any) – Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to kit when not supplied.

  • summary_kit (Any) – Optional separate kit for summary generation (e.g. a larger model). Falls back to kit when not supplied.

  • transcriber (TranscriberBackend) – Which audio transcription backend to use.

  • transcriber_model (str) – Model name / size for the chosen backend.

  • transcriber_api_key (str | None) – API key for cloud transcription backends (or read from env vars).

  • transcriber_base_url (str | None) – Base URL for self-hosted endpoints (Ollama etc.).

  • fps (float) – Video frames to sample per second.

  • similarity_threshold (float) – Frames with similarity >= this % to the previous kept frame are discarded. E.g. 90.0 keeps frames that differ by more than 10 %.

  • dedup_method (DeduplicationMethod) – DeduplicationMethod.PHASH (fast, default) or DeduplicationMethod.SSIM (more accurate).

  • max_frames (int | None) – Hard cap on the number of kept frames (None = no cap).

  • frame_format (str) – "JPEG" (smaller, lossy) or "PNG" (lossless).

  • frame_analysis_mode (FrameAnalysisMode) – FrameAnalysisMode.INDIVIDUAL (one LLM call per frame, default) or FrameAnalysisMode.GRID (stitch into a collage).

  • grid_size (int) – Frames per grid collage (only used in GRID mode).

  • batch_size (int) – Concurrent LLM calls per batch during frame analysis.

  • max_workers (int) – Thread-pool size for concurrent LLM calls.

  • max_process_workers (int) – Process-pool size for CPU-bound frame extraction / hashing.

  • language (str | None) – BCP-47 language code for transcription (None = auto-detect).

  • transcribe_audio (bool) – Whether to extract and transcribe the audio track.

  • analyze_frames (bool) – Whether to pass frames to the vision LLM.

  • generate_summary (bool) – Whether to generate a comprehensive summary at the end.

  • rag_pipeline (Any) – An optional ractogateway.rag.pipeline.RactoRAG instance. When supplied and store_in_rag is True (or per-call), all extracted content is indexed for retrieval.

  • safe_mode (bool) – Catch all exceptions and return them in result.error instead of raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer for OTEL tracing.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]

Process source and return a VideoProcessorResult.

All keyword arguments override the constructor defaults for this call only. In safe_mode=True fatal stage errors are captured into result.failed_stage / result.stage_errors and the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured into result.stage_errors so the pipeline continues with whatever data is available.

Return type:

VideoProcessorResult

async arun(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]

Async variant of run().

Return type:

VideoProcessorResult

static parse_timestamp(value)[source]

Parse timestamp values like 130, "02:10", "2 mins 10 sec".

Return type:

float

answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Process video then answer a user question from extracted timeline context.

Return type:

VideoProcessorResult

async aanswer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Async variant of answer_question().

Return type:

VideoProcessorResult

class ractogateway.pipelines.VideoProcessorResult(**data)[source]

Bases: BaseModel

Full output of a VideoProcessorPipeline run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

video_path: str

Original source identifier (path, URL, or ‘<bytes>’ for buffer input).

frames: list[FrameEntry]

All extracted frames (kept and discarded).

transcript: list[TranscriptSegment]

Audio transcript segmented by timestamp.

sections: list[VideoSection]

Merged visual + audio sections ordered by time.

summary: str | None

Comprehensive LLM-generated summary of the entire video.

rag_stored: bool
rag_chunk_count: int
usage: VideoProcessorUsage
error: str | None

Short description of the first fatal error (backward-compatible).

failed_stage: str | None

Name of the stage that caused a fatal pipeline abort, if any.

stage_errors: list[StageError]

All per-stage errors collected during the run (fatal + non-fatal).

processing_mode: VideoProcessingMode

Whether this run processed full video (active) or a window (passive).

window_start_seconds: float | None

Passive-mode window start timestamp in source-video seconds.

window_end_seconds: float | None

Passive-mode window end timestamp in source-video seconds.

question: str | None

Optional user question answered from this run.

answer: str | None

Answer generated for question, when question-answer mode is used.

property has_errors: bool

True if any stage encountered an error.

property is_failed: bool

True if the pipeline aborted early due to a fatal stage error.

get_transcript_text()[source]

Full transcript as a single string.

Return type:

str

get_all_visual_content()[source]

All frame analyses concatenated in timestamp order.

Return type:

str

to_json(path=None, *, indent=2)[source]

Serialise result to JSON. Returns JSON string if path is None.

Return type:

str | None

to_markdown(path=None)[source]

Build a structured Markdown report. Returns string if path is None.

Return type:

str | None

class ractogateway.pipelines.VideoProcessorUsage(**data)[source]

Bases: BaseModel

Accounting of tokens and frame counts across the full pipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

frames_extracted: int
frames_kept: int
frames_discarded: int
analysis_input_tokens: int
analysis_output_tokens: int
summary_input_tokens: int
summary_output_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

audio_duration_seconds: float
property total_analysis_tokens: int
property total_summary_tokens: int
property total_tokens: int
exception ractogateway.pipelines.VideoRateLimitExceededError[source]

Bases: RuntimeError

Raised when a rate_limiter denies a VideoProcessorPipeline request.

class ractogateway.pipelines.VideoSection(**data)[source]

Bases: BaseModel

A merged time section combining visual analysis + audio transcript.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

timestamp_start: float
timestamp_end: float
frame_ids: list[int]
visual_content: str

Combined LLM analyses for all frames in this section.

audio_content: str

Concatenated transcript text for this section’s time range.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].