ractogateway.pipelines.sql_analyst
SQL Analyst Pipeline — natural-language to SQL + pandas + answer + chart.
- class ractogateway.pipelines.sql_analyst.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
SQLAnalystPipelineAsync-first variant of
SQLAnalystPipeline.run()is a coroutine — useawait pipeline.run(...)directly. Designed for FastAPI, aiohttp, and other async frameworks.All constructor parameters and
run()parameters are identical toSQLAnalystPipeline.Example:
from ractogateway.pipelines import AsyncSQLAnalystPipeline from ractogateway.openai_developer_kit import Chat pipeline = AsyncSQLAnalystPipeline( kit=Chat(model="gpt-4o"), pandas_kit=Chat(model="gpt-3.5-turbo"), max_rows=5_000, safe_mode=True, ) # In an async context: result = await pipeline.run( user_query="Top 5 products by quantity sold?", connection_string="postgresql://user:pass@localhost/shop", ) if result.error: print("Error:", result.error) else: print(result.answer) result.plotly_figure.show()
- async run(user_query, **kwargs)[source]
Async
run()— delegates toSQLAnalystPipeline.arun().- Return type:
SQLAnalystResult
- class ractogateway.pipelines.sql_analyst.ChartSpec(**data)[source]
Bases:
BaseModelSpecification for a Plotly chart.
Pass to
SQLAnalystPipelineaschart=ChartSpec(...)or as a plaindict(e.g.chart={"chart_type": "bar", "x": "customer", "y": "revenue"}). Usechart="auto"to let the pipeline infer the best chart type from the DataFrame’s column dtypes with no extra LLM call.Supported chart types
bar·line·scatter·pie·histogram·box·area·heatmap·violin·funnelExample:
from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec result = pipeline.run( user_query="Top 5 customers by revenue?", ..., chart=ChartSpec(chart_type="bar", x="customer_name", y="revenue", title="Top 5 Customers"), ) result.plotly_figure.show()
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- chart_type: str
- title: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.sql_analyst.PipelineUsage(**data)[source]
Bases:
BaseModelAggregated token usage across all LLM calls in the pipeline.
Tracks each step (SQL generation, pandas code generation, markdown answer generation) separately so you can see exactly where tokens are consumed.
Properties
- total_input_tokens:
Sum of all prompt tokens across every LLM step.
- total_output_tokens:
Sum of all completion tokens across every LLM step.
- total_tokens:
Grand total of every token consumed by the pipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- sql_input_tokens: int
- sql_output_tokens: int
- pandas_input_tokens: int
- pandas_output_tokens: int
- answer_input_tokens: int
- answer_output_tokens: int
- property total_input_tokens: int
- property total_output_tokens: int
- property total_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- exception ractogateway.pipelines.sql_analyst.RateLimitExceededError[source]
Bases:
RuntimeErrorRaised when the rate limiter denies a request for a given user.
- class ractogateway.pipelines.sql_analyst.ReadOnlySQLGuard[source]
Bases:
objectValidates that a SQL string contains only read (SELECT) operations.
Uses a keyword regex to detect any DML/DDL statement that would modify data or schema. Call
check()before executing any LLM-generated SQL whenforce_read_only=True.Example:
ReadOnlySQLGuard.check("SELECT * FROM users") # passes silently ReadOnlySQLGuard.check("DROP TABLE users") # raises ReadOnlyViolationError
- exception ractogateway.pipelines.sql_analyst.ReadOnlyViolationError[source]
Bases:
ValueErrorRaised when generated SQL contains a write operation in force_read_only mode.
- class ractogateway.pipelines.sql_analyst.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]
Bases:
objectNatural-language to SQL + pandas + Markdown answer + chart pipeline.
Converts a plain-English question into:
A read-only SQL query (LLM step — sql_kit)
Pandas analysis code executed against the SQL result (LLM step — pandas_kit)
A rich Markdown answer with table + insights (LLM step — answer_kit)
An optional Plotly figure built deterministically from a
ChartSpec(zero LLM calls — pure dtype heuristics or user-provided spec)
Two variants
SQLAnalystPipeline—run()sync,arun()async.AsyncSQLAnalystPipeline—run()is async (same asarun()).
- type kit:
- param kit:
Default LLM kit used for any step that doesn’t have its own kit.
- type sql_kit:
- param sql_kit:
Override kit for SQL generation. Falls back to kit.
- type pandas_kit:
- param pandas_kit:
Override kit for pandas code generation. Falls back to kit.
- type answer_kit:
- param answer_kit:
Override kit for Markdown answer generation. Falls back to kit.
- type answer_prompt:
- param answer_prompt:
Override default system prompts for each step.
- type sql_max_tokens:
- param sql_max_tokens:
LLM settings for the SQL step (default: 0.0 / 1024).
- type pandas_max_tokens:
- param pandas_max_tokens:
LLM settings for the pandas step (default: 0.0 / 2048).
- type answer_max_tokens:
- param answer_max_tokens:
LLM settings for the answer step (default: 0.3 / 2048).
- type run_pandas:
- param run_pandas:
Run pandas analysis step by default (default:
True).- type run_answer:
- param run_answer:
Run Markdown answer step by default (default:
True).- type chart:
- param chart:
Default chart behaviour:
"auto"(infer from data), aChartSpec, a plaindict, orNoneto skip charts. Default:"auto".- type force_read_only:
- param force_read_only:
Block any non-SELECT SQL (default:
True).- type tracer:
- param tracer:
Optional
RactoTracerinstance.- type metrics:
- param metrics:
Optional
GatewayMetricsMiddlewareinstance.- type engine:
- param engine:
Optional pre-built SQLAlchemy
Engine(e.g. with connection pooling). When provided,connection_string/host/port/ etc. params inrun()are ignored.- type max_sql_retries:
- param max_sql_retries:
Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default:
2.- type max_rows:
- param max_rows:
Safety cap on returned rows — auto-injects
LIMIT {max_rows}into the SQL if no LIMIT is already present. Set to0to disable. Default:10_000.- type schema_cache_ttl:
- param schema_cache_ttl:
Seconds to cache the schema introspection result in-process. Set to
0to disable caching. Default:3600(1 hour).- type allowed_tables:
- param allowed_tables:
Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.
- type blocked_columns:
- param blocked_columns:
Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like
ssnorcredit_card_number.- type mask_columns:
- param mask_columns:
Column names whose values are replaced with
"***MASKED***"in result rows before they are returned or passed to the answer LLM.- type table_docs:
- param table_docs:
{table_name: description}— appended as inline schema comments so the LLM understands table business meaning.- type column_docs:
- param column_docs:
{table_name: {column_name: description}}— per-column inline comments.- type safe_mode:
- param safe_mode:
When
True, all exceptions are caught and returned asSQLAnalystResult(error=...)instead of being raised. Default:False.- type memory:
- param memory:
Optional conversation memory object (e.g.
RedisChatMemory). Must implementget_history(session_id) -> list[dict]andappend(session_id, role, content).- type rate_limiter:
- param rate_limiter:
Optional rate-limiter object (e.g.
RedisRateLimiter). Must implementcheck_and_consume(user_id, tokens) -> boolandget_remaining(user_id) -> int.- type user_id:
- param user_id:
Default user identifier used for rate limiting and audit. Can be overridden per-call in
run()/arun().- param Example:::
from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec
- pipeline = SQLAnalystPipeline(
kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,
) result = pipeline.run(
user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,
) if result.error:
print(“Pipeline error:”, result.error)
- else:
print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)
- run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Run the full pipeline synchronously.
- Parameters:
user_query (
str) – Plain-English question to answer from the database.connection_string (
str|None) – Full SQLAlchemy URI. Ignored whenengineis provided.driver (
str) – Individual connection params used when both connection_string and engine are omitted.engine (
Any) – Per-call pre-built SQLAlchemyEngine. Overrides the pipeline-levelengineand all connection params.schema (
str|None) – Pre-computed schema string.None→ fetched automatically (with optional cache).force_read_only (
bool|None) – Override the corresponding pipeline-level defaults for this call./ (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)
answer_max_tokens (
int|None) – Per-call LLM setting overrides.max_rows (
int|None) – Per-call row limit override. Overrides the pipeline-levelmax_rows.user_id (
str|None) – Per-call user ID for rate limiting and audit.session_id (
str|None) – Conversation session ID used to fetch and save memory context.
- Return type:
SQLAnalystResult- Returns:
SQLAnalystResult
- async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]
Async variant of
run()— identical parameters.Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async
achat()method.- Return type:
SQLAnalystResult
- class ractogateway.pipelines.sql_analyst.SQLAnalystResult(**data)[source]
Bases:
BaseModelResult returned by
SQLAnalystPipeline.All fields except
user_queryhave sensible defaults so that a partial result can be returned whensafe_mode=Trueand an error occurs.Fields
- user_query:
The original natural-language question.
- schema_used:
The database schema string that was passed to (or fetched for) the LLM.
- sql_query:
The generated (and possibly LIMIT-injected) SQL SELECT statement.
- columns:
Column names returned by the SQL query.
- row_count:
Number of rows in
raw_rows.- raw_rows:
All rows from the SQL result as a list of dicts.
- pandas_code:
The LLM-generated pandas analysis code (
Noneifrun_pandas=False).- pandas_result:
Output of executing
pandas_code— DataFrame, scalar, or any value assigned toresultinside the code.Noneifrun_pandas=False.- answer:
Rich Markdown answer written by the LLM, including a results table and key insights.
Noneifrun_answer=False.- chart_spec:
The
ChartSpecdict used to build the Plotly figure.Noneif no chart was requested.- plotly_figure:
A
plotly.graph_objects.Figureobject ready to call.show()or.to_html().Noneif no chart was requested or plotly is not installed.- usage:
Aggregated token counts for all LLM steps in the pipeline.
- error:
Set when
safe_mode=Trueand an exception occurs.Nonemeans the pipeline completed successfully.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_query: str
- schema_used: str
- sql_query: str
- row_count: int
- usage: PipelineUsage
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- to_csv(path=None)[source]
Export the raw SQL result rows to CSV.
Does not require pandas — uses the standard-library
csvmodule.
- to_json(path=None, *, indent=2)[source]
Export the raw SQL result rows to JSON.
- to_excel(path, *, sheet_name='Results')[source]
Export the raw SQL result rows to an Excel file.
Requires
pandasandopenpyxl:pip install ractogateway[pipelines-sql] openpyxl