ractogateway.pipelines.sql_analyst.pipeline
SQLAnalystPipeline — natural-language → SQL → pandas → answer + chart.
Two classes are exported:
SQLAnalystPipeline—run()is sync,arun()is async.AsyncSQLAnalystPipeline—run()is async (no sync variant). Designed for FastAPI / async frameworks where you justawait pipeline.run(...).
Per-step model control
You can assign a different LLM kit (provider + model) to each pipeline step:
pipeline = SQLAnalystPipeline(
kit=Chat(model="gpt-4o"), # default fallback
sql_kit=Chat(model="gpt-4o"), # SQL generation
pandas_kit=Chat(model="gpt-3.5-turbo"), # cheaper for pandas
answer_kit=Chat(model="gpt-4o"), # rich markdown answer
)
When a per-step kit is None the default kit is used for that step.
Production features
SQL retry loop — on DB execution error the LLM is re-prompted with the error message to auto-fix the SQL (up to
max_sql_retriestimes).Pre-built engine — pass a pre-configured SQLAlchemy
Engine(with connection pooling) directly via theengineparameter.Row limit cap —
max_rowsauto-injectsLIMITto prevent unbounded result sets from overwhelming memory.Schema cache — schema introspection results are cached in-process for
schema_cache_ttlseconds to avoid repeated DB round-trips.Table RBAC —
allowed_tableshides all other tables from the LLM.Column blocking —
blocked_columnsremoves sensitive columns from the schema shown to the LLM.Data masking —
mask_columnsreplaces values with"***MASKED***"in result rows before passing them to the LLM or returning to the caller.Custom docs —
table_docs/column_docsannotate the schema with business descriptions so the LLM generates more accurate SQL.Conversation memory —
memory(any object withget_history/appendmethods, e.g.RedisChatMemory) +session_idappends prior Q&A context to each SQL prompt.Rate limiting —
rate_limiter(any object withcheck_and_consume/get_remainingmethods, e.g.RedisRateLimiter) +user_idgates requests.Safe mode —
safe_mode=Truecatches all exceptions and returns aSQLAnalystResultwith theerrorfield set instead of raising.
- class ractogateway.pipelines.sql_analyst.pipeline.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
objectNatural-language to SQL + pandas + Markdown answer + chart pipeline.
Converts a plain-English question into:
A read-only SQL query (LLM step — sql_kit)
Pandas analysis code executed against the SQL result (LLM step — pandas_kit)
A rich Markdown answer with table + insights (LLM step — answer_kit)
An optional Plotly figure built deterministically from a
ChartSpec(zero LLM calls — pure dtype heuristics or user-provided spec)
Two variants
SQLAnalystPipeline—run()sync,arun()async.AsyncSQLAnalystPipeline—run()is async (same asarun()).
- type kit:
- param kit:
Default LLM kit used for any step that doesn’t have its own kit.
- type sql_kit:
- param sql_kit:
Override kit for SQL generation. Falls back to kit.
- type pandas_kit:
- param pandas_kit:
Override kit for pandas code generation. Falls back to kit.
- type answer_kit:
- param answer_kit:
Override kit for Markdown answer generation. Falls back to kit.
- type answer_prompt:
- param answer_prompt:
Override default system prompts for each step.
- type sql_max_tokens:
- param sql_max_tokens:
LLM settings for the SQL step (default: 0.0 / 1024).
- type pandas_max_tokens:
- param pandas_max_tokens:
LLM settings for the pandas step (default: 0.0 / 2048).
- type answer_max_tokens:
- param answer_max_tokens:
LLM settings for the answer step (default: 0.3 / 2048).
- type run_pandas:
- param run_pandas:
Run pandas analysis step by default (default:
True).- type run_answer:
- param run_answer:
Run Markdown answer step by default (default:
True).- type chart:
- param chart:
Default chart behaviour:
"auto"(infer from data), aChartSpec, a plaindict, orNoneto skip charts. Default:"auto".- type force_read_only:
- param force_read_only:
Block any non-SELECT SQL (default:
True).- type tracer:
- param tracer:
Optional
RactoTracerinstance.- type metrics:
- param metrics:
Optional
GatewayMetricsMiddlewareinstance.- type engine:
- param engine:
Optional pre-built SQLAlchemy
Engine(e.g. with connection pooling). When provided,connection_string/host/port/ etc. params inrun()are ignored.- type max_sql_retries:
- param max_sql_retries:
Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default:
2.- type max_rows:
- param max_rows:
Safety cap on returned rows — auto-injects
LIMIT {max_rows}into the SQL if no LIMIT is already present. Set to0to disable. Default:10_000.- type schema_cache_ttl:
- param schema_cache_ttl:
Seconds to cache the schema introspection result in-process. Set to
0to disable caching. Default:3600(1 hour).- type allowed_tables:
- param allowed_tables:
Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.
- type blocked_columns:
- param blocked_columns:
Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like
ssnorcredit_card_number.- type mask_columns:
- param mask_columns:
Column names whose values are replaced with
"***MASKED***"in result rows before they are returned or passed to the answer LLM.- type table_docs:
- param table_docs:
{table_name: description}— appended as inline schema comments so the LLM understands table business meaning.- type column_docs:
- param column_docs:
{table_name: {column_name: description}}— per-column inline comments.- type safe_mode:
- param safe_mode:
When
True, all exceptions are caught and returned asSQLAnalystResult(error=...)instead of being raised. Default:False.- type memory:
- param memory:
Optional conversation memory object (e.g.
RedisChatMemory). Must implementget_history(session_id) -> list[dict]andappend(session_id, role, content).- type rate_limiter:
- param rate_limiter:
Optional rate-limiter object (e.g.
RedisRateLimiter). Must implementcheck_and_consume(user_id, tokens) -> boolandget_remaining(user_id) -> int.- type user_id:
- param user_id:
Default user identifier used for rate limiting and audit. Can be overridden per-call in
run()/arun().- param Example:::
from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec
- pipeline = SQLAnalystPipeline(
kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,
) result = pipeline.run(
user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,
) if result.error:
print(“Pipeline error:”, result.error)
- else:
print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)
- run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Run the full pipeline synchronously.
- Parameters:
user_query (
str) – Plain-English question to answer from the database.connection_string (
str|None) – Full SQLAlchemy URI. Ignored whenengineis provided.driver (
str) – Individual connection params used when both connection_string and engine are omitted.engine (
Any) – Per-call pre-built SQLAlchemyEngine. Overrides the pipeline-levelengineand all connection params.schema (
str|None) – Pre-computed schema string.None→ fetched automatically (with optional cache).force_read_only (
bool|None) – Override the corresponding pipeline-level defaults for this call./ (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)
answer_max_tokens (
int|None) – Per-call LLM setting overrides.max_rows (
int|None) – Per-call row limit override. Overrides the pipeline-levelmax_rows.user_id (
str|None) – Per-call user ID for rate limiting and audit.session_id (
str|None) – Conversation session ID used to fetch and save memory context.
- Return type:
SQLAnalystResult- Returns:
SQLAnalystResult
- async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Async variant of
run()— identical parameters.Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async
achat()method.- Return type:
SQLAnalystResult
- class ractogateway.pipelines.sql_analyst.pipeline.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
SQLAnalystPipelineAsync-first variant of
SQLAnalystPipeline.run()is a coroutine — useawait pipeline.run(...)directly. Designed for FastAPI, aiohttp, and other async frameworks.All constructor parameters and
run()parameters are identical toSQLAnalystPipeline.Example:
from ractogateway.pipelines import AsyncSQLAnalystPipeline from ractogateway.openai_developer_kit import Chat pipeline = AsyncSQLAnalystPipeline( kit=Chat(model="gpt-4o"), pandas_kit=Chat(model="gpt-3.5-turbo"), max_rows=5_000, safe_mode=True, ) # In an async context: result = await pipeline.run( user_query="Top 5 products by quantity sold?", connection_string="postgresql://user:pass@localhost/shop", ) if result.error: print("Error:", result.error) else: print(result.answer) result.plotly_figure.show()
- async run(user_query, **kwargs)[source]
Async
run()— delegates toSQLAnalystPipeline.arun().- Return type:
SQLAnalystResult