ractogateway.pipelines
RactoGateway Prebuilt Pipelines.
Prebuilt end-to-end pipelines for real-world LLM workflows.
Each pipeline is a self-contained class with run() / arun() methods,
full observability hooks, per-step model control, and optional per-call overrides.
Available pipelines
SQLAnalystPipeline/AsyncSQLAnalystPipelineNL → SQL → pandas → Markdown answer + optional Plotly chart. Requires:
pip install ractogateway[pipelines-sql]Charts:pip install ractogateway[pipelines-sql-viz]ListClassifierPipeline/AsyncListClassifierPipelineNL query → best-matching item(s) from a
list[str]. Uses dynamicEnum+ Pydantic validation; supports single/multi selection, confidence scores, reasoning, retries, memory, rate limiting, and telemetry. No extra dependencies.VideoProcessorPipeline/AsyncVideoProcessorPipelineProcess tutorial/lecture videos: frame dedup, audio transcription, vision-LLM analysis (whiteboard/screen), comprehensive summary, optional RAG storage. Accepts local paths, URLs, YouTube links, raw bytes, or pre-extracted frame images. Requires:
pip install ractogateway[pipelines-video]Audio:pip install ractogateway[pipelines-video-whisper]YouTube:pip install ractogateway[pipelines-video-yt]AgentPipeline/AsyncAgentPipelineAutonomous ReAct agent: reason, call tools, observe, repeat. Supports RAG search, SQL query, HTTP fetch, memory, and any Python callable. Stops at
max_stepsor whenfinish()is called. HTTP tool:pip install ractogateway[pipelines-agent-http]
Usage:
from ractogateway.pipelines import SQLAnalystPipeline, ListClassifierPipeline
from ractogateway.openai_developer_kit import Chat
# SQL Analyst
pipeline = SQLAnalystPipeline(kit=Chat(model="gpt-4o"))
result = pipeline.run(
user_query="Top 5 products by quantity sold?",
connection_string="postgresql://user:pass@localhost/db",
)
print(result.answer)
# List Classifier
classifier = ListClassifierPipeline(
kit=Chat(model="gpt-4o-mini"),
options=["Billing", "Technical Support", "Sales", "Account Management"],
selection_mode="single",
include_confidence=True,
include_reasoning=True,
)
result = classifier.run("I can't log into my account")
print(result.first) # "Account Management"
print(result.top_confidence) # 0.94
print(result.as_dict()) # {"selected": [...], "confidences": [...], ...}
# Video Processor
from ractogateway.pipelines import VideoProcessorPipeline, TranscriberBackend
vp = VideoProcessorPipeline(
kit=Chat(model="gpt-4o"),
fps=1.0,
similarity_threshold=85.0,
transcriber=TranscriberBackend.FASTER_WHISPER,
transcriber_model="base",
generate_summary=True,
)
res = vp.run("lecture.mp4") # or YouTube URL / bytes / pre-extracted frames
print(res.summary)
res.to_markdown("report.md")
# Agent
from ractogateway.pipelines import AgentPipeline
def get_weather(city: str) -> str:
"""Return the current weather for a city."""
return f"Sunny, 22 C in {city}"
agent = AgentPipeline(
kit=Chat(model="gpt-4o"),
tools=[get_weather],
max_steps=6,
safe_mode=True,
)
result = agent.run("What is the weather in Paris?")
print(result.final_answer)
print(result.to_markdown())
- class ractogateway.pipelines.AgentPipeline(kit, *, tools=None, rag_pipeline=None, sql_pipeline=None, enable_http=False, agent_memory=None, max_steps=10, max_consecutive_errors=3, tool_retries=0, max_step_extension=0, max_parallel_tools=4, system_prompt=None, extra_rules='', safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectAutonomous ReAct agent with sync
run()and asyncarun().- Parameters:
kit (
Any) – Any RactoGateway developer kit (must supportchat()andachat()methods).tools (
list[Callable[...,Any]] |None) – List of plain callables or@tool-decorated functions to register. Each is registered by its__name__(or__tool_name__if set by the@tooldecorator).rag_pipeline (
Any) – OptionalRactoRAGinstance — auto-registers arag_searchtool.sql_pipeline (
Any) – OptionalSQLAnalystPipeline— auto-registers asql_querytool.enable_http (
bool) – WhenTrue, registers anhttp_gettool that fetches URLs viahttpx(requirespip install ractogateway[pipelines-agent-http]).agent_memory (
Any) – Any dict-like or object withget/setmethods. Auto-registersmemory_readandmemory_writetools.max_steps (
int) – Hard cap on reasoning steps before the loop stops withStopReason.MAX_STEPS.max_consecutive_errors (
int) – Number of consecutive tool errors that trigger the circuit breaker (StopReason.CIRCUIT_BREAK). Default3.tool_retries (
int) – How many times to retry a failing tool before reporting the error to the LLM. Default0(no retry).max_step_extension (
int) – Maximum additional steps the agent may request viarequest_more_steps.0disables the feature.max_parallel_tools (
int) – Maximum number of tools to run simultaneously when the LLM requests a parallel batch.1forces sequential execution. Default4.system_prompt (
str|None) – Fully override the auto-generated system prompt (advanced).extra_rules (
str) – Append an extra numbered rule to the default system prompt.safe_mode (
bool) – Catch all exceptions and surface them inresult.errorinstead of re-raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracer.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Run the agent synchronously until it finishes or hits
max_steps.- Parameters:
goal (
str) – The task or question the agent should solve.max_steps (
Any) – Per-call override for the constructormax_steps.user_id (
Any) – Per-call override for the rate-limiter user identifier.session_id (
str|None) – Optional session tag passed to tracer spans (informational).response_format (
type|None) – A PydanticBaseModelsubclass. When provided, the agent’s final answer is parsed and validated into an instance stored inAgentResult.parsed_output.
- Return type:
- Returns:
AgentResult – Contains the final answer, all steps, stop reason, token usage, and (when response_format is set) the parsed structured output.
- async arun(goal, *, max_steps=<object object>, user_id=<object object>, session_id=None, response_format=None)[source]
Async variant of
run().Uses
kit.achat()for LLM calls and runs sync tools in a thread pool so the event loop is never blocked.- Parameters:
- Return type:
- property system_prompt: str
The system prompt sent to the LLM each step (read-only).
- exception ractogateway.pipelines.AgentRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter blocks an agent run.
- class ractogateway.pipelines.AgentResult(**data)[source]
Bases:
BaseModelFull output of an
AgentPipelinerun.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- goal: str
The original user goal / task string.
- stop_reason: StopReason
Why the loop terminated.
- usage: AgentUsage
Token and step counts.
- parsed_output: Any
Validated Pydantic model instance when
response_formatwas passed torun(). Not included in JSON serialisation; callresult.parsed_output.model_dump()manually.
- get_tool_calls()[source]
Return (tool_name, tool_input) for every non-finish step.
- to_json(path=None, *, indent=2)[source]
Serialise to JSON. Returns string when path is
None.
- class ractogateway.pipelines.AgentStep(**data)[source]
Bases:
BaseModelOne reasoning + action step in the ReAct loop.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- step_num: int
1-based position in the execution sequence.
- tool_name: str
Name of the tool that was invoked.
- observation: str
The tool’s return value (truncated to 4000 chars when long).
- duration_ms: float
Wall-clock time for tool execution in milliseconds.
- is_finish: bool
True when this step invoked the finish() termination tool.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.AgentUsage(**data)[source]
Bases:
BaseModelToken and step accounting across the full agent run.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- total_input_tokens: int
Cumulative input tokens across all LLM calls.
- total_output_tokens: int
Cumulative output tokens across all LLM calls.
- steps_taken: int
Number of reasoning+action steps executed.
- tools_called: int
Number of non-finish tool invocations.
- property total_tokens: int
Sum of all input and output tokens.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.AsyncAgentPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
AgentPipeline.Exposes a single
async run()method - suitable for FastAPI endpoints where a syncrun()should not be in the public API.All constructor parameters are identical to
AgentPipeline.- async run(goal, **kwargs)[source]
Async-only agent entrypoint. See
AgentPipeline.arun().- Return type:
- property system_prompt: str
The system prompt used by the inner pipeline.
- class ractogateway.pipelines.StopReason(*values)[source]
-
Why the agent loop terminated.
- FINISHED = 'finished'
- MAX_STEPS = 'max_steps'
- ERROR = 'error'
- CIRCUIT_BREAK = 'circuit_break'
- class ractogateway.pipelines.ToolExecutor(tools, max_retries=0)[source]
Bases:
objectRuns registered tools by name with sync and async support.
- Parameters:
- execute(tool_name, tool_input)[source]
Execute tool_name synchronously, retrying up to max_retries times on exception.
- async aexecute(tool_name, tool_input)[source]
Execute tool_name asynchronously, retrying up to max_retries times on exception.
Async callables are awaited directly; sync callables run in the default thread-pool executor to avoid blocking the event loop.
- async aexecute_parallel(calls)[source]
Execute multiple tool calls concurrently via
asyncio.gather.
- ractogateway.pipelines.make_finish_tool()[source]
Return the always-present
finishtool.When the LLM calls
finish(answer=...), the agent loop stops and returns the answer asAgentResult.final_answer.
- ractogateway.pipelines.make_http_tool()[source]
Return an
http_gettool that fetches URL content via httpx.Requires
httpx:pip install ractogateway[pipelines-agent-http]
- ractogateway.pipelines.make_memory_tools(agent_memory)[source]
Return
memory_readandmemory_writetools backed by agent_memory.agent_memory can be any object supporting:
memory.get(key) -> Any memory.set(key, value) -> None
or a plain
dict.
- ractogateway.pipelines.make_rag_tool(rag_pipeline)[source]
Return a
rag_searchtool backed by aRactoRAGpipeline.
- ractogateway.pipelines.make_rag_tool_async(rag_pipeline)[source]
Return an async
rag_searchtool backed by an asyncRactoRAG.
- ractogateway.pipelines.make_sql_tool(sql_pipeline)[source]
Return a
sql_querytool backed by aSQLAnalystPipeline.
- class ractogateway.pipelines.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
SQLAnalystPipelineAsync-first variant of
SQLAnalystPipeline.run()is a coroutine — useawait pipeline.run(...)directly. Designed for FastAPI, aiohttp, and other async frameworks.All constructor parameters and
run()parameters are identical toSQLAnalystPipeline.Example:
from ractogateway.pipelines import AsyncSQLAnalystPipeline from ractogateway.openai_developer_kit import Chat pipeline = AsyncSQLAnalystPipeline( kit=Chat(model="gpt-4o"), pandas_kit=Chat(model="gpt-3.5-turbo"), max_rows=5_000, safe_mode=True, ) # In an async context: result = await pipeline.run( user_query="Top 5 products by quantity sold?", connection_string="postgresql://user:pass@localhost/shop", ) if result.error: print("Error:", result.error) else: print(result.answer) result.plotly_figure.show()
- async run(user_query, **kwargs)[source]
Async
run()— delegates toSQLAnalystPipeline.arun().- Return type:
SQLAnalystResult
- class ractogateway.pipelines.ChartSpec(**data)[source]
Bases:
BaseModelSpecification for a Plotly chart.
Pass to
SQLAnalystPipelineaschart=ChartSpec(...)or as a plaindict(e.g.chart={"chart_type": "bar", "x": "customer", "y": "revenue"}). Usechart="auto"to let the pipeline infer the best chart type from the DataFrame’s column dtypes with no extra LLM call.Supported chart types
bar·line·scatter·pie·histogram·box·area·heatmap·violin·funnelExample:
from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec result = pipeline.run( user_query="Top 5 customers by revenue?", ..., chart=ChartSpec(chart_type="bar", x="customer_name", y="revenue", title="Top 5 Customers"), ) result.plotly_figure.show()
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- chart_type: str
- title: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.PipelineUsage(**data)[source]
Bases:
BaseModelAggregated token usage across all LLM calls in the pipeline.
Tracks each step (SQL generation, pandas code generation, markdown answer generation) separately so you can see exactly where tokens are consumed.
Properties
- total_input_tokens:
Sum of all prompt tokens across every LLM step.
- total_output_tokens:
Sum of all completion tokens across every LLM step.
- total_tokens:
Grand total of every token consumed by the pipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- sql_input_tokens: int
- sql_output_tokens: int
- pandas_input_tokens: int
- pandas_output_tokens: int
- answer_input_tokens: int
- answer_output_tokens: int
- property total_input_tokens: int
- property total_output_tokens: int
- property total_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- exception ractogateway.pipelines.RateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter denies a request for a given user.
- class ractogateway.pipelines.ReadOnlySQLGuard[source]
Bases:
objectValidates that a SQL string contains only read (SELECT) operations.
Uses a keyword regex to detect any DML/DDL statement that would modify data or schema. Call
check()before executing any LLM-generated SQL whenforce_read_only=True.Example:
ReadOnlySQLGuard.check("SELECT * FROM users") # passes silently ReadOnlySQLGuard.check("DROP TABLE users") # raises ReadOnlyViolationError
- exception ractogateway.pipelines.ReadOnlyViolationError[source]
Bases:
ValueErrorRaised when generated SQL contains a write operation in force_read_only mode.
- class ractogateway.pipelines.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
objectNatural-language to SQL + pandas + Markdown answer + chart pipeline.
Converts a plain-English question into:
A read-only SQL query (LLM step — sql_kit)
Pandas analysis code executed against the SQL result (LLM step — pandas_kit)
A rich Markdown answer with table + insights (LLM step — answer_kit)
An optional Plotly figure built deterministically from a
ChartSpec(zero LLM calls — pure dtype heuristics or user-provided spec)
Two variants
SQLAnalystPipeline—run()sync,arun()async.AsyncSQLAnalystPipeline—run()is async (same asarun()).
- type kit:
- param kit:
Default LLM kit used for any step that doesn’t have its own kit.
- type sql_kit:
- param sql_kit:
Override kit for SQL generation. Falls back to kit.
- type pandas_kit:
- param pandas_kit:
Override kit for pandas code generation. Falls back to kit.
- type answer_kit:
- param answer_kit:
Override kit for Markdown answer generation. Falls back to kit.
- type answer_prompt:
- param answer_prompt:
Override default system prompts for each step.
- type sql_max_tokens:
- param sql_max_tokens:
LLM settings for the SQL step (default: 0.0 / 1024).
- type pandas_max_tokens:
- param pandas_max_tokens:
LLM settings for the pandas step (default: 0.0 / 2048).
- type answer_max_tokens:
- param answer_max_tokens:
LLM settings for the answer step (default: 0.3 / 2048).
- type run_pandas:
- param run_pandas:
Run pandas analysis step by default (default:
True).- type run_answer:
- param run_answer:
Run Markdown answer step by default (default:
True).- type chart:
- param chart:
Default chart behaviour:
"auto"(infer from data), aChartSpec, a plaindict, orNoneto skip charts. Default:"auto".- type force_read_only:
- param force_read_only:
Block any non-SELECT SQL (default:
True).- type tracer:
- param tracer:
Optional
RactoTracerinstance.- type metrics:
- param metrics:
Optional
GatewayMetricsMiddlewareinstance.- type engine:
- param engine:
Optional pre-built SQLAlchemy
Engine(e.g. with connection pooling). When provided,connection_string/host/port/ etc. params inrun()are ignored.- type max_sql_retries:
- param max_sql_retries:
Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default:
2.- type max_rows:
- param max_rows:
Safety cap on returned rows — auto-injects
LIMIT {max_rows}into the SQL if no LIMIT is already present. Set to0to disable. Default:10_000.- type schema_cache_ttl:
- param schema_cache_ttl:
Seconds to cache the schema introspection result in-process. Set to
0to disable caching. Default:3600(1 hour).- type allowed_tables:
- param allowed_tables:
Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.
- type blocked_columns:
- param blocked_columns:
Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like
ssnorcredit_card_number.- type mask_columns:
- param mask_columns:
Column names whose values are replaced with
"***MASKED***"in result rows before they are returned or passed to the answer LLM.- type table_docs:
- param table_docs:
{table_name: description}— appended as inline schema comments so the LLM understands table business meaning.- type column_docs:
- param column_docs:
{table_name: {column_name: description}}— per-column inline comments.- type safe_mode:
- param safe_mode:
When
True, all exceptions are caught and returned asSQLAnalystResult(error=...)instead of being raised. Default:False.- type memory:
- param memory:
Optional conversation memory object (e.g.
RedisChatMemory). Must implementget_history(session_id) -> list[dict]andappend(session_id, role, content).- type rate_limiter:
- param rate_limiter:
Optional rate-limiter object (e.g.
RedisRateLimiter). Must implementcheck_and_consume(user_id, tokens) -> boolandget_remaining(user_id) -> int.- type user_id:
- param user_id:
Default user identifier used for rate limiting and audit. Can be overridden per-call in
run()/arun().- param Example:::
from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec
- pipeline = SQLAnalystPipeline(
kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,
) result = pipeline.run(
user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,
) if result.error:
print(“Pipeline error:”, result.error)
- else:
print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)
- run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Run the full pipeline synchronously.
- Parameters:
user_query (
str) – Plain-English question to answer from the database.connection_string (
str|None) – Full SQLAlchemy URI. Ignored whenengineis provided.driver (
str) – Individual connection params used when both connection_string and engine are omitted.engine (
Any) – Per-call pre-built SQLAlchemyEngine. Overrides the pipeline-levelengineand all connection params.schema (
str|None) – Pre-computed schema string.None→ fetched automatically (with optional cache).force_read_only (
bool|None) – Override the corresponding pipeline-level defaults for this call./ (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)
answer_max_tokens (
int|None) – Per-call LLM setting overrides.max_rows (
int|None) – Per-call row limit override. Overrides the pipeline-levelmax_rows.user_id (
str|None) – Per-call user ID for rate limiting and audit.session_id (
str|None) – Conversation session ID used to fetch and save memory context.
- Return type:
SQLAnalystResult- Returns:
SQLAnalystResult
- async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Async variant of
run()— identical parameters.Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async
achat()method.- Return type:
SQLAnalystResult
- class ractogateway.pipelines.SQLAnalystResult(**data)[source]
Bases:
BaseModelResult returned by
SQLAnalystPipeline.All fields except
user_queryhave sensible defaults so that a partial result can be returned whensafe_mode=Trueand an error occurs.Fields
- user_query:
The original natural-language question.
- schema_used:
The database schema string that was passed to (or fetched for) the LLM.
- sql_query:
The generated (and possibly LIMIT-injected) SQL SELECT statement.
- columns:
Column names returned by the SQL query.
- row_count:
Number of rows in
raw_rows.- raw_rows:
All rows from the SQL result as a list of dicts.
- pandas_code:
The LLM-generated pandas analysis code (
Noneifrun_pandas=False).- pandas_result:
Output of executing
pandas_code— DataFrame, scalar, or any value assigned toresultinside the code.Noneifrun_pandas=False.- answer:
Rich Markdown answer written by the LLM, including a results table and key insights.
Noneifrun_answer=False.- chart_spec:
The
ChartSpecdict used to build the Plotly figure.Noneif no chart was requested.- plotly_figure:
A
plotly.graph_objects.Figureobject ready to call.show()or.to_html().Noneif no chart was requested or plotly is not installed.- usage:
Aggregated token counts for all LLM steps in the pipeline.
- error:
Set when
safe_mode=Trueand an exception occurs.Nonemeans the pipeline completed successfully.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_query: str
- schema_used: str
- sql_query: str
- row_count: int
- usage: PipelineUsage
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- to_csv(path=None)[source]
Export the raw SQL result rows to CSV.
Does not require pandas — uses the standard-library
csvmodule.
- to_json(path=None, *, indent=2)[source]
Export the raw SQL result rows to JSON.
- to_excel(path, *, sheet_name='Results')[source]
Export the raw SQL result rows to an Excel file.
Requires
pandasandopenpyxl:pip install ractogateway[pipelines-sql] openpyxl
- ractogateway.pipelines.clear_schema_cache()[source]
Evict all entries from the in-process schema cache.
Useful in tests or when the database schema changes at runtime.
- Return type:
- class ractogateway.pipelines.AsyncListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]
Bases:
ListClassifierPipelineAsync-first variant of
ListClassifierPipeline.run()is a coroutine —await pipeline.run(...)directly. Designed for FastAPI, aiohttp, Starlette, and other async frameworks.Constructor and all
run()parameters are identical toListClassifierPipeline.Example
pipeline = AsyncListClassifierPipeline.from_provider( "openai", "gpt-4o-mini", options=["Billing", "Support", "Sales"], safe_mode=True, ) # FastAPI handler: @app.post("/classify") async def classify(query: str): result = await pipeline.run(query) return result.as_dict()
- class ractogateway.pipelines.AuditEntry(**data)[source]
Bases:
BaseModelImmutable audit record emitted to the
audit_loggerafter every call.Emitted regardless of whether the call was served from cache, hit an error, or was a live LLM classification. Provides a complete picture of every request for compliance, debugging, and analytics.
Fields
- timestamp:
ISO 8601 UTC timestamp of when the call was made (e.g.
"2026-02-26T14:23:01.456789Z").- user_query:
Original natural-language query.
- options_provided:
Full candidate list shown to the LLM (including
uncertain_labelif one was configured).- selected:
Option(s) chosen by the LLM, or empty on error.
- confidences:
Per-selection confidence scores, or
None.- all_scores:
Score for every option (when
score_all=True), orNone.- reasoning:
LLM explanation (when
include_reasoning=True), orNone.- fuzzy_corrected:
Truewhen the LLM returned a near-miss that was fuzzy-matched.- uncertain:
Truewhen the LLM selected theuncertain_labeloption.- cache_hit:
"exact"or"semantic"when the result was served from cache;Nonewhen a live LLM call was made.- user_id:
User identifier passed to the pipeline (for rate limiting / audit).
- session_id:
Conversation session identifier (for memory context).
- latency_ms:
Wall-clock latency of the entire pipeline call in milliseconds (near-zero for cache hits).
- usage:
Token usage dict — keys:
input_tokens,output_tokens,total_tokens,retry_count. All zero on cache hits.- error:
Non-
Nonewhensafe_mode=Trueand an exception occurred.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- timestamp: str
- user_query: str
- fuzzy_corrected: bool
- uncertain: bool
- latency_ms: float
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- exception ractogateway.pipelines.ClassifierRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter denies a request for a given user.
- class ractogateway.pipelines.ClassifierResult(**data)[source]
Bases:
BaseModelResult returned by
ListClassifierPipeline.All fields except
user_queryandoptions_providedhave sensible defaults so that a partial result can be returned whensafe_mode=Trueand an error occurs mid-pipeline.Fields
- user_query:
The original natural-language query passed to the pipeline.
- options_provided:
The full list of candidate strings presented to the LLM (including the injected
uncertain_labeloption if one was configured).- selected:
The option(s) chosen by the LLM, ordered by descending confidence. Empty when an error occurred and
safe_mode=True.- confidences:
Per-selection confidence scores in [0.0, 1.0], aligned with
selected.Nonewheninclude_confidence=False.- all_scores:
Confidence score for every option in the list, keyed by option string.
Nonewhenscore_all=False(the default).- reasoning:
Brief natural-language explanation produced by the LLM.
Nonewheninclude_reasoning=False.- fuzzy_corrected:
Truewhen the LLM returned a near-miss that was corrected by the built-in fuzzy matcher without consuming a retry.- uncertain:
Truewhen the LLM selected theuncertain_labeloption, indicating no real option matched the query well enough.- cache_hit:
"exact"or"semantic"when served from cache;Nonefor a live LLM call.- usage:
Aggregated token counts and retry statistics for this call.
- error:
Non-
Noneonly whensafe_mode=Trueand an exception occurred. Whenerroris set,selectedwill be empty.
Examples
>>> result.first # "Billing" >>> result.top_confidence # 0.95 >>> result.as_string() # "Billing, Account" >>> result.as_dict() # {"selected": ["Billing"], ...} >>> result.as_enum()["Billing"].value # "Billing" >>> result.uncertain # False >>> result.cache_hit # "exact" | "semantic" | None
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_query: str
- fuzzy_corrected: bool
- uncertain: bool
- usage: ClassifierUsage
- property is_empty: bool
Truewhen no options were selected (including error cases).
- as_string(separator=', ')[source]
Return selected options as a single joined string.
- as_dict()[source]
Return a plain
dictwith selected options and optional metadata.Always contains
"selected"."confidences","all_scores", and"reasoning"are included only when they are non-None.
- as_enum(name='SelectedOptions')[source]
Return a dynamic Python
enum.Enumof the selected options.- Parameters:
name (
str) – Class name for the generated Enum. Default:"SelectedOptions".- Return type:
- Returns:
type[Enum] – An Enum whose members have the option string as both name and value.
Example
>>> E = result.as_enum() >>> E["Billing"].value # "Billing"
- top_n(n)[source]
Return the top-n selected options.
- score_for(option)[source]
Return the confidence score for a specific option, or
None.Searches
all_scoresfirst (all options, whenscore_all=True), thenconfidencesfor selected items.
- to_audit_entry(*, timestamp, user_id=None, session_id=None, latency_ms=0.0)[source]
Build an
AuditEntryfrom this result.Called automatically by the pipeline — exposed here so that users can reconstruct audit entries from stored results if needed.
- Return type:
AuditEntry
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.ClassifierUsage(**data)[source]
Bases:
BaseModelToken usage and retry statistics for a single pipeline call.
Properties
- total_tokens:
Sum of input_tokens and output_tokens across all LLM attempts, including automatic retries triggered by invalid LLM responses. Zero on a cache hit (no LLM call was made).
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- input_tokens: int
- output_tokens: int
- retry_count: int
- property total_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.ListClassifierPipeline(kit, *, options=None, selection_mode='single', output_format='pydantic', prompt=None, temperature=0.0, max_tokens=512, max_retries=2, include_confidence=True, include_reasoning=False, score_all=False, option_descriptions=None, fuzzy_fallback=True, uncertain_label=None, confidence_threshold=None, case_sensitive=False, safe_mode=False, exact_cache=None, semantic_cache=None, audit_logger=None, tracer=None, metrics=None, rate_limiter=None, memory=None, user_id=None)[source]
Bases:
objectMap a natural-language query to one or more items from a candidate list.
Supports every RactoGateway provider via the
kitparameter or thefrom_provider()class factory. Internally builds a dynamic Pythonenum.Enumfrom the options list and validates every LLM response against it — hallucinations and paraphrased answers are caught, fuzzy- corrected if close enough, and retried otherwise.Two variants
ListClassifierPipeline—run()sync,arun()async.AsyncListClassifierPipeline—run()is async only.
- type kit:
- param kit:
Any RactoGateway developer kit (OpenAI, Anthropic, Google, Ollama, HuggingFace). Must expose
.chat(ChatConfig)and.achat(ChatConfig)methods. Usefrom_provider()instead of constructing kits manually when you only need provider + model.- type options:
- param options:
Default candidate strings. Can be overridden per-call. Must be non-empty and duplicate-free when provided.
- type selection_mode:
Literal['single','multiple']- param selection_mode:
"single"(default) — exactly one option."multiple"— one or more options. Overridable per-call.- type output_format:
Literal['string','dict','pydantic']- param output_format:
"pydantic"(default) —ClassifierResult."string"— comma-joined string."dict"— plaindict. Overridable per-call.- type prompt:
- param prompt:
Custom
RactoPromptto replace the built-in system prompt.- type temperature:
- param temperature:
LLM temperature. Default
0.0for deterministic output.- type max_tokens:
- param max_tokens:
Response token budget. Default
512.- type max_retries:
- param max_retries:
Retry attempts when LLM returns invalid JSON / unknown option. Default
2.- type include_confidence:
- param include_confidence:
Ask LLM for per-selection confidence scores [0.0–1.0]. Default
True.- type include_reasoning:
- param include_reasoning:
Ask LLM for a one-sentence explanation. Default
False.- type score_all:
- param score_all:
Ask LLM for a score for every option (not just selected ones). Stored in
result.all_scores. DefaultFalse.- type option_descriptions:
- param option_descriptions:
{option: description}— shown inline next to each option in the prompt to help the LLM distinguish similar categories.- type fuzzy_fallback:
- param fuzzy_fallback:
Use stdlib
difflibto correct near-miss LLM responses before consuming a retry. DefaultTrue.- type uncertain_label:
- param uncertain_label:
When set, this string is appended as an extra option that the LLM can pick when nothing matches (e.g.
"Other / None of the above").result.uncertainisTruewhen this label is selected.- type confidence_threshold:
- param confidence_threshold:
Drop selections below this score. Keeps highest-confidence match as fallback. Default
None(no filtering).- type case_sensitive:
- param case_sensitive:
Whether option matching is case-sensitive. Default
False.- type safe_mode:
- param safe_mode:
Return
ClassifierResult(error=...)instead of raising. DefaultFalse.- type tracer:
- param tracer:
Optional
RactoTracer.- type metrics:
- param metrics:
Optional
GatewayMetricsMiddleware.- type rate_limiter:
- param rate_limiter:
Duck-typed —
check_and_consume(user_id, tokens) -> bool+get_remaining(user_id) -> int.- type memory:
- param memory:
Duck-typed —
get_history(session_id) -> list[dict]+append(session_id, role, content).- type user_id:
- param user_id:
Default user ID for rate limiting. Overridable per-call.
Example
# Via kit directly from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import ListClassifierPipeline pipeline = ListClassifierPipeline( kit=Chat(model="gpt-4o-mini"), options=["Billing", "Technical Support", "Sales"], include_confidence=True, include_reasoning=True, ) result = pipeline.run("My invoice is wrong") print(result.first, result.top_confidence) # Via from_provider() — no manual kit import needed pipeline = ListClassifierPipeline.from_provider( "anthropic", "claude-haiku-4-5-20251001", options=["Billing", "Technical Support", "Sales"], )
- classmethod from_provider(provider, model, *, api_key=None, base_url=None, options=None, **kwargs)[source]
Create a pipeline by specifying provider + model — no kit import needed.
- Parameters:
provider (
str) – One of"openai","anthropic","google","ollama","huggingface".model (
str) –Model identifier string, e.g.:
OpenAI:
"gpt-4o-mini","gpt-4o"Anthropic:
"claude-haiku-4-5-20251001","claude-sonnet-4-6"Google:
"gemini-2.0-flash","gemini-1.5-pro"Ollama:
"llama3.2","mistral"HuggingFace:
"meta-llama/Llama-3.2-3B-Instruct"
api_key (
str|None) – Provider API key. Falls back to the standard env var for each provider (e.g.OPENAI_API_KEY,ANTHROPIC_API_KEY).base_url (
str|None) – Custom endpoint — used for Ollama (http://localhost:11434) or OpenAI-compatible proxies.options (
list[str] |None) – Default candidate options list.**kwargs (
Any) – Any otherListClassifierPipelineconstructor parameters (selection_mode,include_confidence,safe_mode, etc.).
- Return type:
ListClassifierPipeline- Returns:
ListClassifierPipeline
Example
pipeline = ListClassifierPipeline.from_provider( "anthropic", "claude-haiku-4-5-20251001", options=["Billing", "Support", "Sales"], include_reasoning=True, safe_mode=True, )
- static make_enum(options, name='OptionsEnum')[source]
Build a standalone dynamic
enum.Enumfrom an options list.Useful when you want enum-typed values outside the pipeline.
- Parameters:
- Return type:
- Returns:
type[Enum]
Example
E = ListClassifierPipeline.make_enum(["Red", "Green", "Blue"]) E["Red"].value # "Red"
- get_options()[source]
Return the pipeline-level options list, or
Noneif not set.
- set_options(options)[source]
Replace the entire pipeline-level options list.
Thread-safe — safe to call while the pipeline is in use.
- add_option(option, description=None)[source]
Append a new option to the pipeline-level list.
- remove_option(option)[source]
Remove an option from the pipeline-level list.
- run(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]
Classify user_query synchronously.
- Parameters:
user_query (
str) – Natural-language query to classify.options (
list[str] |None) – Per-call override for the candidate list. Omit to use the pipeline-level list. Pass[]to get aValueError.selection_mode (
Literal['single','multiple'] |None) – Per-call override —"single"or"multiple".output_format (
Literal['string','dict','pydantic'] |None) – Per-call override —"pydantic","string", or"dict".confidence_threshold (
float|None) – Per-call override. PassNoneexplicitly to disable filtering for this call even if a pipeline-level threshold is set.session_id (
str|None) – Conversation session ID for memory retrieval/storage.user_id (
str|None) – Per-call user ID for rate limiting and audit.
- Return type:
- Returns:
ClassifierResult | str | dict – Type depends on output_format.
- batch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]
Classify multiple queries synchronously, one after another.
Shares all per-call overrides across every query in the batch. Use
abatch_run()to run them concurrently in async contexts.
- async arun(user_query, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None)[source]
Async variant of
run()— identical parameters.
- async abatch_run(queries, *, options=<object object>, selection_mode=None, output_format=None, temperature=None, max_tokens=None, confidence_threshold=<object object>, session_id=None, user_id=None, max_concurrency=None)[source]
Classify multiple queries concurrently with
asyncio.gather.- Parameters:
- Return type:
- Returns:
list – Results in the same order as queries.
- class ractogateway.pipelines.AsyncVideoProcessorPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
VideoProcessorPipeline.Exposes a single
async run()method — suitable for FastAPI endpoints where you do not want a syncrun()in the public API.All constructor parameters are identical to
VideoProcessorPipeline.- async run(source, **kwargs)[source]
Async-only process entrypoint.
- Return type:
VideoProcessorResult
- async answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Async-only variant of
VideoProcessorPipeline.aanswer_question().- Return type:
VideoProcessorResult
- class ractogateway.pipelines.DeduplicationMethod(*values)[source]
-
Frame similarity algorithm used for deduplication.
- PHASH = 'phash'
- SSIM = 'ssim'
- class ractogateway.pipelines.FrameAnalysisMode(*values)[source]
-
How frames are sent to the vision LLM.
- INDIVIDUAL = 'individual'
- GRID = 'grid'
- class ractogateway.pipelines.FrameEntry(**data)[source]
Bases:
BaseModelOne video frame, after extraction and optional analysis.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- frame_id: int
Zero-based sequential frame identifier.
- timestamp: float
Position in the video in seconds.
- similarity_to_prev: float | None
Similarity percentage to the previous kept frame (None for first frame).
- kept: bool
False if discarded by the deduplication step.
- image_format: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.TranscriberBackend(*values)[source]
-
Audio transcription backend.
- FASTER_WHISPER = 'faster-whisper'
- OPENAI_WHISPER = 'openai-whisper'
- HUGGINGFACE_LOCAL = 'huggingface-local'
- OPENAI_API = 'openai-api'
- GOOGLE_API = 'google-api'
- HUGGINGFACE_API = 'huggingface-api'
- GROQ_API = 'groq-api'
- DEEPGRAM_API = 'deepgram-api'
- OLLAMA = 'ollama'
- class ractogateway.pipelines.TranscriptSegment(**data)[source]
Bases:
BaseModelA time-bounded transcription segment aligned to frame IDs.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- start: float
Segment start time in seconds.
- end: float
Segment end time in seconds.
- text: str
Transcribed text for this segment.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.VideoConfig(**data)[source]
Bases:
BaseModelAll tunable hyperparameters for VideoProcessorPipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- fps: float
Frames to sample per second of video.
- similarity_threshold: float
Discard a frame whose similarity to the previous kept frame is >= this %. Lower = keep more frames. Range 0-100.
- dedup_method: DeduplicationMethod
Algorithm used to compare frame similarity.
- frame_format: str
‘JPEG’ (smaller) or ‘PNG’ (lossless).
- Type:
Image format for kept frames
- analyze_frames: bool
Pass kept frames to the vision LLM for content extraction.
- frame_analysis_mode: FrameAnalysisMode
Individual = one LLM call per frame; Grid = stitch frames into a collage.
- grid_size: int
Number of frames per grid collage (used when frame_analysis_mode=’grid’).
- batch_size: int
How many frames to submit to the LLM concurrently per batch.
- max_workers: int
Thread-pool size for concurrent LLM frame analysis calls.
- max_process_workers: int
Process-pool size for CPU-bound frame extraction / hashing.
- transcribe_audio: bool
Extract and transcribe the video’s audio track.
- transcriber_backend: TranscriberBackend
Which transcription engine to use.
- transcriber_model: str
Model name / size — interpretation is backend-specific.
- Examples:
faster-whisper / openai-whisper : “tiny” “base” “small” “medium” “large-v3” huggingface-local / -api : HF model ID e.g. “openai/whisper-large-v3” openai-api : “whisper-1” google-api : “long” “short” “latest_long” groq-api : “whisper-large-v3” “whisper-large-v3-turbo” deepgram-api : “nova-3” “nova-2” “enhanced” “base” ollama : model name on server e.g. “whisper”
- generate_summary: bool
Generate a comprehensive textual summary at the end.
- store_in_rag: bool
Push all extracted content into the supplied rag_pipeline for Q&A.
- processing_mode: VideoProcessingMode
active processes full video; passive processes only a time window.
- focus_time_seconds: float | None
10).
- Type:
Center timestamp in seconds for passive mode (e.g. 130 for 02
- window_seconds: float
Passive-mode half-window size in seconds (focus ± window_seconds).
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.VideoProcessorPipeline(kit, *, analysis_kit=None, summary_kit=None, transcriber=TranscriberBackend.FASTER_WHISPER, transcriber_model='base', transcriber_api_key=None, transcriber_base_url=None, fps=1.0, similarity_threshold=90.0, dedup_method=DeduplicationMethod.PHASH, max_frames=None, frame_format='JPEG', frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL, grid_size=4, batch_size=10, max_workers=4, max_process_workers=4, language=None, transcribe_audio=True, analyze_frames=True, generate_summary=True, processing_mode=VideoProcessingMode.ACTIVE, focus_time_seconds=None, window_seconds=5.0, rag_pipeline=None, safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectSynchronous + asynchronous video processing pipeline.
- Parameters:
kit (
Any) – A RactoGateway developer kit (Chat) used for both frame analysis and summary generation unless analysis_kit or summary_kit are provided.analysis_kit (
Any) – Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to kit when not supplied.summary_kit (
Any) – Optional separate kit for summary generation (e.g. a larger model). Falls back to kit when not supplied.transcriber (
TranscriberBackend) – Which audio transcription backend to use.transcriber_model (
str) – Model name / size for the chosen backend.transcriber_api_key (
str|None) – API key for cloud transcription backends (or read from env vars).transcriber_base_url (
str|None) – Base URL for self-hosted endpoints (Ollama etc.).fps (
float) – Video frames to sample per second.similarity_threshold (
float) – Frames with similarity >= this % to the previous kept frame are discarded. E.g.90.0keeps frames that differ by more than 10 %.dedup_method (
DeduplicationMethod) –DeduplicationMethod.PHASH(fast, default) orDeduplicationMethod.SSIM(more accurate).max_frames (
int|None) – Hard cap on the number of kept frames (None= no cap).frame_format (
str) –"JPEG"(smaller, lossy) or"PNG"(lossless).frame_analysis_mode (
FrameAnalysisMode) –FrameAnalysisMode.INDIVIDUAL(one LLM call per frame, default) orFrameAnalysisMode.GRID(stitch into a collage).grid_size (
int) – Frames per grid collage (only used in GRID mode).batch_size (
int) – Concurrent LLM calls per batch during frame analysis.max_workers (
int) – Thread-pool size for concurrent LLM calls.max_process_workers (
int) – Process-pool size for CPU-bound frame extraction / hashing.language (
str|None) – BCP-47 language code for transcription (None= auto-detect).transcribe_audio (
bool) – Whether to extract and transcribe the audio track.analyze_frames (
bool) – Whether to pass frames to the vision LLM.generate_summary (
bool) – Whether to generate a comprehensive summary at the end.rag_pipeline (
Any) – An optionalractogateway.rag.pipeline.RactoRAGinstance. When supplied and store_in_rag isTrue(or per-call), all extracted content is indexed for retrieval.safe_mode (
bool) – Catch all exceptions and return them inresult.errorinstead of raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracerfor OTEL tracing.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]
Process source and return a
VideoProcessorResult.All keyword arguments override the constructor defaults for this call only. In
safe_mode=Truefatal stage errors are captured intoresult.failed_stage/result.stage_errorsand the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured intoresult.stage_errorsso the pipeline continues with whatever data is available.- Return type:
VideoProcessorResult
- async arun(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]
Async variant of
run().- Return type:
VideoProcessorResult
- static parse_timestamp(value)[source]
Parse timestamp values like
130,"02:10","2 mins 10 sec".- Return type:
- answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Process video then answer a user question from extracted timeline context.
- Return type:
VideoProcessorResult
- async aanswer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Async variant of
answer_question().- Return type:
VideoProcessorResult
- class ractogateway.pipelines.VideoProcessorResult(**data)[source]
Bases:
BaseModelFull output of a VideoProcessorPipeline run.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- video_path: str
Original source identifier (path, URL, or ‘<bytes>’ for buffer input).
- frames: list[FrameEntry]
All extracted frames (kept and discarded).
- transcript: list[TranscriptSegment]
Audio transcript segmented by timestamp.
- sections: list[VideoSection]
Merged visual + audio sections ordered by time.
- rag_stored: bool
- rag_chunk_count: int
- usage: VideoProcessorUsage
- stage_errors: list[StageError]
All per-stage errors collected during the run (fatal + non-fatal).
- processing_mode: VideoProcessingMode
Whether this run processed full video (active) or a window (passive).
- property has_errors: bool
True if any stage encountered an error.
- property is_failed: bool
True if the pipeline aborted early due to a fatal stage error.
- get_all_visual_content()[source]
All frame analyses concatenated in timestamp order.
- Return type:
- to_json(path=None, *, indent=2)[source]
Serialise result to JSON. Returns JSON string if path is None.
- class ractogateway.pipelines.VideoProcessorUsage(**data)[source]
Bases:
BaseModelAccounting of tokens and frame counts across the full pipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- frames_extracted: int
- frames_kept: int
- frames_discarded: int
- analysis_input_tokens: int
- analysis_output_tokens: int
- summary_input_tokens: int
- summary_output_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- audio_duration_seconds: float
- property total_analysis_tokens: int
- property total_summary_tokens: int
- property total_tokens: int
- exception ractogateway.pipelines.VideoRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when a rate_limiter denies a VideoProcessorPipeline request.
- class ractogateway.pipelines.VideoSection(**data)[source]
Bases:
BaseModelA merged time section combining visual analysis + audio transcript.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- timestamp_start: float
- timestamp_end: float
- visual_content: str
Combined LLM analyses for all frames in this section.
- audio_content: str
Concatenated transcript text for this section’s time range.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].