ractogateway.pipelines.sql_analyst

SQL Analyst Pipeline — natural-language to SQL + pandas + answer + chart.

class ractogateway.pipelines.sql_analyst.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: SQLAnalystPipeline

Async-first variant of SQLAnalystPipeline.

run() is a coroutine — use await pipeline.run(...) directly. Designed for FastAPI, aiohttp, and other async frameworks.

All constructor parameters and run() parameters are identical to SQLAnalystPipeline.

Example:

from ractogateway.pipelines import AsyncSQLAnalystPipeline
from ractogateway.openai_developer_kit import Chat

pipeline = AsyncSQLAnalystPipeline(
    kit=Chat(model="gpt-4o"),
    pandas_kit=Chat(model="gpt-3.5-turbo"),
    max_rows=5_000,
    safe_mode=True,
)

# In an async context:
result = await pipeline.run(
    user_query="Top 5 products by quantity sold?",
    connection_string="postgresql://user:pass@localhost/shop",
)
if result.error:
    print("Error:", result.error)
else:
    print(result.answer)
    result.plotly_figure.show()
async run(user_query, **kwargs)[source]

Async run() — delegates to SQLAnalystPipeline.arun().

Return type:

SQLAnalystResult

class ractogateway.pipelines.sql_analyst.ChartSpec(**data)[source]

Bases: BaseModel

Specification for a Plotly chart.

Pass to SQLAnalystPipeline as chart=ChartSpec(...) or as a plain dict (e.g. chart={"chart_type": "bar", "x": "customer", "y": "revenue"}). Use chart="auto" to let the pipeline infer the best chart type from the DataFrame’s column dtypes with no extra LLM call.

Supported chart types

bar · line · scatter · pie · histogram · box · area · heatmap · violin · funnel

Example:

from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

result = pipeline.run(
    user_query="Top 5 customers by revenue?",
    ...,
    chart=ChartSpec(chart_type="bar", x="customer_name", y="revenue",
                    title="Top 5 Customers"),
)
result.plotly_figure.show()

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

chart_type: str
x: str | None
y: str | list[str] | None
color: str | None
title: str
x_label: str | None
y_label: str | None
orientation: Literal['v', 'h'] | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.sql_analyst.PipelineUsage(**data)[source]

Bases: BaseModel

Aggregated token usage across all LLM calls in the pipeline.

Tracks each step (SQL generation, pandas code generation, markdown answer generation) separately so you can see exactly where tokens are consumed.

Properties

total_input_tokens:

Sum of all prompt tokens across every LLM step.

total_output_tokens:

Sum of all completion tokens across every LLM step.

total_tokens:

Grand total of every token consumed by the pipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

sql_input_tokens: int
sql_output_tokens: int
pandas_input_tokens: int
pandas_output_tokens: int
answer_input_tokens: int
answer_output_tokens: int
property total_input_tokens: int
property total_output_tokens: int
property total_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exception ractogateway.pipelines.sql_analyst.RateLimitExceededError[source]

Bases: RuntimeError

Raised when the rate limiter denies a request for a given user.

class ractogateway.pipelines.sql_analyst.ReadOnlySQLGuard[source]

Bases: object

Validates that a SQL string contains only read (SELECT) operations.

Uses a keyword regex to detect any DML/DDL statement that would modify data or schema. Call check() before executing any LLM-generated SQL when force_read_only=True.

Example:

ReadOnlySQLGuard.check("SELECT * FROM users")   # passes silently
ReadOnlySQLGuard.check("DROP TABLE users")       # raises ReadOnlyViolationError
static check(sql)[source]

Raise ReadOnlyViolationError if sql contains a write operation.

Parameters:

sql (str) – The SQL string to validate.

Raises:

ReadOnlyViolationError – If the SQL contains a disallowed keyword.

Return type:

None

exception ractogateway.pipelines.sql_analyst.ReadOnlyViolationError[source]

Bases: ValueError

Raised when generated SQL contains a write operation in force_read_only mode.

class ractogateway.pipelines.sql_analyst.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: object

Natural-language to SQL + pandas + Markdown answer + chart pipeline.

Converts a plain-English question into:

  1. A read-only SQL query (LLM step — sql_kit)

  2. Pandas analysis code executed against the SQL result (LLM step — pandas_kit)

  3. A rich Markdown answer with table + insights (LLM step — answer_kit)

  4. An optional Plotly figure built deterministically from a ChartSpec (zero LLM calls — pure dtype heuristics or user-provided spec)

Two variants

  • SQLAnalystPipelinerun() sync, arun() async.

  • AsyncSQLAnalystPipelinerun() is async (same as arun()).

type kit:

Any

param kit:

Default LLM kit used for any step that doesn’t have its own kit.

type sql_kit:

Any | None

param sql_kit:

Override kit for SQL generation. Falls back to kit.

type pandas_kit:

Any | None

param pandas_kit:

Override kit for pandas code generation. Falls back to kit.

type answer_kit:

Any | None

param answer_kit:

Override kit for Markdown answer generation. Falls back to kit.

type answer_prompt:

RactoPrompt | None

param answer_prompt:

Override default system prompts for each step.

type sql_max_tokens:

int

param sql_max_tokens:

LLM settings for the SQL step (default: 0.0 / 1024).

type pandas_max_tokens:

int

param pandas_max_tokens:

LLM settings for the pandas step (default: 0.0 / 2048).

type answer_max_tokens:

int

param answer_max_tokens:

LLM settings for the answer step (default: 0.3 / 2048).

type run_pandas:

bool

param run_pandas:

Run pandas analysis step by default (default: True).

type run_answer:

bool

param run_answer:

Run Markdown answer step by default (default: True).

type chart:

ChartSpec | dict[str, Any] | str | None

param chart:

Default chart behaviour: "auto" (infer from data), a ChartSpec, a plain dict, or None to skip charts. Default: "auto".

type force_read_only:

bool

param force_read_only:

Block any non-SELECT SQL (default: True).

type tracer:

Any | None

param tracer:

Optional RactoTracer instance.

type metrics:

Any | None

param metrics:

Optional GatewayMetricsMiddleware instance.

type engine:

Any | None

param engine:

Optional pre-built SQLAlchemy Engine (e.g. with connection pooling). When provided, connection_string / host / port / etc. params in run() are ignored.

type max_sql_retries:

int

param max_sql_retries:

Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default: 2.

type max_rows:

int

param max_rows:

Safety cap on returned rows — auto-injects LIMIT {max_rows} into the SQL if no LIMIT is already present. Set to 0 to disable. Default: 10_000.

type schema_cache_ttl:

float

param schema_cache_ttl:

Seconds to cache the schema introspection result in-process. Set to 0 to disable caching. Default: 3600 (1 hour).

type allowed_tables:

list[str] | None

param allowed_tables:

Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.

type blocked_columns:

list[str] | None

param blocked_columns:

Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like ssn or credit_card_number.

type mask_columns:

list[str] | None

param mask_columns:

Column names whose values are replaced with "***MASKED***" in result rows before they are returned or passed to the answer LLM.

type table_docs:

dict[str, str] | None

param table_docs:

{table_name: description} — appended as inline schema comments so the LLM understands table business meaning.

type column_docs:

dict[str, dict[str, str]] | None

param column_docs:

{table_name: {column_name: description}} — per-column inline comments.

type safe_mode:

bool

param safe_mode:

When True, all exceptions are caught and returned as SQLAnalystResult(error=...) instead of being raised. Default: False.

type memory:

Any | None

param memory:

Optional conversation memory object (e.g. RedisChatMemory). Must implement get_history(session_id) -> list[dict] and append(session_id, role, content).

type rate_limiter:

Any | None

param rate_limiter:

Optional rate-limiter object (e.g. RedisRateLimiter). Must implement check_and_consume(user_id, tokens) -> bool and get_remaining(user_id) -> int.

type user_id:

str | None

param user_id:

Default user identifier used for rate limiting and audit. Can be overridden per-call in run() / arun().

param Example:::

from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

pipeline = SQLAnalystPipeline(

kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,

) result = pipeline.run(

user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,

) if result.error:

print(“Pipeline error:”, result.error)

else:

print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)

run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Run the full pipeline synchronously.

Parameters:
  • user_query (str) – Plain-English question to answer from the database.

  • connection_string (str | None) – Full SQLAlchemy URI. Ignored when engine is provided.

  • driver (str) – Individual connection params used when both connection_string and engine are omitted.

  • engine (Any) – Per-call pre-built SQLAlchemy Engine. Overrides the pipeline-level engine and all connection params.

  • schema (str | None) – Pre-computed schema string. None → fetched automatically (with optional cache).

  • force_read_only (bool | None) – Override the corresponding pipeline-level defaults for this call.

  • / (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)

  • answer_max_tokens (int | None) – Per-call LLM setting overrides.

  • max_rows (int | None) – Per-call row limit override. Overrides the pipeline-level max_rows.

  • user_id (str | None) – Per-call user ID for rate limiting and audit.

  • session_id (str | None) – Conversation session ID used to fetch and save memory context.

Return type:

SQLAnalystResult

Returns:

SQLAnalystResult

async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Async variant of run() — identical parameters.

Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async achat() method.

Return type:

SQLAnalystResult

class ractogateway.pipelines.sql_analyst.SQLAnalystResult(**data)[source]

Bases: BaseModel

Result returned by SQLAnalystPipeline.

All fields except user_query have sensible defaults so that a partial result can be returned when safe_mode=True and an error occurs.

Fields

user_query:

The original natural-language question.

schema_used:

The database schema string that was passed to (or fetched for) the LLM.

sql_query:

The generated (and possibly LIMIT-injected) SQL SELECT statement.

columns:

Column names returned by the SQL query.

row_count:

Number of rows in raw_rows.

raw_rows:

All rows from the SQL result as a list of dicts.

pandas_code:

The LLM-generated pandas analysis code (None if run_pandas=False).

pandas_result:

Output of executing pandas_code — DataFrame, scalar, or any value assigned to result inside the code. None if run_pandas=False.

answer:

Rich Markdown answer written by the LLM, including a results table and key insights. None if run_answer=False.

chart_spec:

The ChartSpec dict used to build the Plotly figure. None if no chart was requested.

plotly_figure:

A plotly.graph_objects.Figure object ready to call .show() or .to_html(). None if no chart was requested or plotly is not installed.

usage:

Aggregated token counts for all LLM steps in the pipeline.

error:

Set when safe_mode=True and an exception occurs. None means the pipeline completed successfully.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_query: str
schema_used: str
sql_query: str
columns: list[str]
row_count: int
raw_rows: list[dict[str, Any]]
pandas_code: str | None
pandas_result: Any | None
answer: str | None
chart_spec: dict[str, Any] | None
plotly_figure: Any | None
usage: PipelineUsage
error: str | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

to_csv(path=None)[source]

Export the raw SQL result rows to CSV.

Does not require pandas — uses the standard-library csv module.

Parameters:

path (str | None) – File path to write to. When None the CSV string is returned.

Return type:

str | None

Returns:

str | None – CSV string when path is None; otherwise None (file written).

to_json(path=None, *, indent=2)[source]

Export the raw SQL result rows to JSON.

Parameters:
  • path (str | None) – File path to write to. When None the JSON string is returned.

  • indent (int) – JSON indentation level (default: 2).

Return type:

str | None

Returns:

str | None – JSON string when path is None; otherwise None (file written).

to_excel(path, *, sheet_name='Results')[source]

Export the raw SQL result rows to an Excel file.

Requires pandas and openpyxl:

pip install ractogateway[pipelines-sql] openpyxl
Parameters:
  • path (str) – File path to write to (must end in .xlsx).

  • sheet_name (str) – Excel sheet name (default: "Results").

Return type:

None

ractogateway.pipelines.sql_analyst.clear_schema_cache()[source]

Evict all entries from the in-process schema cache.

Useful in tests or when the database schema changes at runtime.

Return type:

None