ractogateway.pipelines.sql_analyst.pipeline

SQLAnalystPipeline — natural-language → SQL → pandas → answer + chart.

Two classes are exported:

  • SQLAnalystPipelinerun() is sync, arun() is async.

  • AsyncSQLAnalystPipelinerun() is async (no sync variant). Designed for FastAPI / async frameworks where you just await pipeline.run(...).

Per-step model control

You can assign a different LLM kit (provider + model) to each pipeline step:

pipeline = SQLAnalystPipeline(
    kit=Chat(model="gpt-4o"),                       # default fallback
    sql_kit=Chat(model="gpt-4o"),                   # SQL generation
    pandas_kit=Chat(model="gpt-3.5-turbo"),         # cheaper for pandas
    answer_kit=Chat(model="gpt-4o"),                # rich markdown answer
)

When a per-step kit is None the default kit is used for that step.

Production features

  • SQL retry loop — on DB execution error the LLM is re-prompted with the error message to auto-fix the SQL (up to max_sql_retries times).

  • Pre-built engine — pass a pre-configured SQLAlchemy Engine (with connection pooling) directly via the engine parameter.

  • Row limit capmax_rows auto-injects LIMIT to prevent unbounded result sets from overwhelming memory.

  • Schema cache — schema introspection results are cached in-process for schema_cache_ttl seconds to avoid repeated DB round-trips.

  • Table RBACallowed_tables hides all other tables from the LLM.

  • Column blockingblocked_columns removes sensitive columns from the schema shown to the LLM.

  • Data maskingmask_columns replaces values with "***MASKED***" in result rows before passing them to the LLM or returning to the caller.

  • Custom docstable_docs / column_docs annotate the schema with business descriptions so the LLM generates more accurate SQL.

  • Conversation memorymemory (any object with get_history / append methods, e.g. RedisChatMemory) + session_id appends prior Q&A context to each SQL prompt.

  • Rate limitingrate_limiter (any object with check_and_consume / get_remaining methods, e.g. RedisRateLimiter) + user_id gates requests.

  • Safe modesafe_mode=True catches all exceptions and returns a SQLAnalystResult with the error field set instead of raising.

class ractogateway.pipelines.sql_analyst.pipeline.SQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: object

Natural-language to SQL + pandas + Markdown answer + chart pipeline.

Converts a plain-English question into:

  1. A read-only SQL query (LLM step — sql_kit)

  2. Pandas analysis code executed against the SQL result (LLM step — pandas_kit)

  3. A rich Markdown answer with table + insights (LLM step — answer_kit)

  4. An optional Plotly figure built deterministically from a ChartSpec (zero LLM calls — pure dtype heuristics or user-provided spec)

Two variants

  • SQLAnalystPipelinerun() sync, arun() async.

  • AsyncSQLAnalystPipelinerun() is async (same as arun()).

type kit:

Any

param kit:

Default LLM kit used for any step that doesn’t have its own kit.

type sql_kit:

Any | None

param sql_kit:

Override kit for SQL generation. Falls back to kit.

type pandas_kit:

Any | None

param pandas_kit:

Override kit for pandas code generation. Falls back to kit.

type answer_kit:

Any | None

param answer_kit:

Override kit for Markdown answer generation. Falls back to kit.

type answer_prompt:

RactoPrompt | None

param answer_prompt:

Override default system prompts for each step.

type sql_max_tokens:

int

param sql_max_tokens:

LLM settings for the SQL step (default: 0.0 / 1024).

type pandas_max_tokens:

int

param pandas_max_tokens:

LLM settings for the pandas step (default: 0.0 / 2048).

type answer_max_tokens:

int

param answer_max_tokens:

LLM settings for the answer step (default: 0.3 / 2048).

type run_pandas:

bool

param run_pandas:

Run pandas analysis step by default (default: True).

type run_answer:

bool

param run_answer:

Run Markdown answer step by default (default: True).

type chart:

ChartSpec | dict[str, Any] | str | None

param chart:

Default chart behaviour: "auto" (infer from data), a ChartSpec, a plain dict, or None to skip charts. Default: "auto".

type force_read_only:

bool

param force_read_only:

Block any non-SELECT SQL (default: True).

type tracer:

Any | None

param tracer:

Optional RactoTracer instance.

type metrics:

Any | None

param metrics:

Optional GatewayMetricsMiddleware instance.

type engine:

Any | None

param engine:

Optional pre-built SQLAlchemy Engine (e.g. with connection pooling). When provided, connection_string / host / port / etc. params in run() are ignored.

type max_sql_retries:

int

param max_sql_retries:

Number of times to retry SQL generation when a DB execution error occurs. Each retry re-sends the LLM the original question plus the error message so it can self-correct. Default: 2.

type max_rows:

int

param max_rows:

Safety cap on returned rows — auto-injects LIMIT {max_rows} into the SQL if no LIMIT is already present. Set to 0 to disable. Default: 10_000.

type schema_cache_ttl:

float

param schema_cache_ttl:

Seconds to cache the schema introspection result in-process. Set to 0 to disable caching. Default: 3600 (1 hour).

type allowed_tables:

list[str] | None

param allowed_tables:

Allowlist of table names shown to the LLM. All other tables are hidden, preventing the LLM from generating SQL that touches them.

type blocked_columns:

list[str] | None

param blocked_columns:

Column names to strip from the schema shown to the LLM (case-insensitive). Useful for hiding PII columns like ssn or credit_card_number.

type mask_columns:

list[str] | None

param mask_columns:

Column names whose values are replaced with "***MASKED***" in result rows before they are returned or passed to the answer LLM.

type table_docs:

dict[str, str] | None

param table_docs:

{table_name: description} — appended as inline schema comments so the LLM understands table business meaning.

type column_docs:

dict[str, dict[str, str]] | None

param column_docs:

{table_name: {column_name: description}} — per-column inline comments.

type safe_mode:

bool

param safe_mode:

When True, all exceptions are caught and returned as SQLAnalystResult(error=...) instead of being raised. Default: False.

type memory:

Any | None

param memory:

Optional conversation memory object (e.g. RedisChatMemory). Must implement get_history(session_id) -> list[dict] and append(session_id, role, content).

type rate_limiter:

Any | None

param rate_limiter:

Optional rate-limiter object (e.g. RedisRateLimiter). Must implement check_and_consume(user_id, tokens) -> bool and get_remaining(user_id) -> int.

type user_id:

str | None

param user_id:

Default user identifier used for rate limiting and audit. Can be overridden per-call in run() / arun().

param Example:::

from ractogateway.openai_developer_kit import Chat from ractogateway.pipelines import SQLAnalystPipeline, ChartSpec

pipeline = SQLAnalystPipeline(

kit=Chat(model=”gpt-4o”), pandas_kit=Chat(model=”gpt-3.5-turbo”), # cheaper for pandas max_rows=5_000, allowed_tables=[“orders”, “customers”, “products”], mask_columns=[“email”, “phone”], safe_mode=True,

) result = pipeline.run(

user_query=”Top 5 products by quantity sold?”, connection_string=”postgresql://user:pass@localhost/shop”,

) if result.error:

print(“Pipeline error:”, result.error)

else:

print(result.answer) result.plotly_figure.show() print(result.usage.total_tokens) result.to_csv(“output.csv”)

run(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Run the full pipeline synchronously.

Parameters:
  • user_query (str) – Plain-English question to answer from the database.

  • connection_string (str | None) – Full SQLAlchemy URI. Ignored when engine is provided.

  • driver (str) – Individual connection params used when both connection_string and engine are omitted.

  • engine (Any) – Per-call pre-built SQLAlchemy Engine. Overrides the pipeline-level engine and all connection params.

  • schema (str | None) – Pre-computed schema string. None → fetched automatically (with optional cache).

  • force_read_only (bool | None) – Override the corresponding pipeline-level defaults for this call.

  • / (sql_temperature / sql_max_tokens / pandas_temperature / pandas_max_tokens)

  • answer_max_tokens (int | None) – Per-call LLM setting overrides.

  • max_rows (int | None) – Per-call row limit override. Overrides the pipeline-level max_rows.

  • user_id (str | None) – Per-call user ID for rate limiting and audit.

  • session_id (str | None) – Conversation session ID used to fetch and save memory context.

Return type:

SQLAnalystResult

Returns:

SQLAnalystResult

async arun(user_query, *, connection_string=None, host='localhost', port=5432, database=None, username=None, password=None, driver='postgresql', engine=<object object>, schema=None, run_pandas=None, run_answer=None, chart=<object object>, force_read_only=None, sql_temperature=None, sql_max_tokens=None, pandas_temperature=None, pandas_max_tokens=None, answer_temperature=None, answer_max_tokens=None, max_rows=None, user_id=None, session_id=None)[source]

Async variant of run() — identical parameters.

Blocking SQLAlchemy calls run in a thread executor. LLM calls use each kit’s async achat() method.

Return type:

SQLAnalystResult

class ractogateway.pipelines.sql_analyst.pipeline.AsyncSQLAnalystPipeline(kit, *, sql_kit=None, pandas_kit=None, answer_kit=None, sql_prompt=None, pandas_prompt=None, answer_prompt=None, sql_temperature=0.0, sql_max_tokens=1024, pandas_temperature=0.0, pandas_max_tokens=2048, answer_temperature=0.3, answer_max_tokens=2048, run_pandas=True, run_answer=True, chart='auto', force_read_only=True, tracer=None, metrics=None, engine=None, max_sql_retries=2, max_rows=10000, schema_cache_ttl=3600.0, schema_include_indexes=True, schema_include_row_counts=False, schema_include_sample_values=False, schema_sample_value_limit=8, allowed_tables=None, blocked_columns=None, mask_columns=None, table_docs=None, column_docs=None, safe_mode=False, analysis_engine='pandas', memory=None, rate_limiter=None, user_id=None)[source]

Bases: SQLAnalystPipeline

Async-first variant of SQLAnalystPipeline.

run() is a coroutine — use await pipeline.run(...) directly. Designed for FastAPI, aiohttp, and other async frameworks.

All constructor parameters and run() parameters are identical to SQLAnalystPipeline.

Example:

from ractogateway.pipelines import AsyncSQLAnalystPipeline
from ractogateway.openai_developer_kit import Chat

pipeline = AsyncSQLAnalystPipeline(
    kit=Chat(model="gpt-4o"),
    pandas_kit=Chat(model="gpt-3.5-turbo"),
    max_rows=5_000,
    safe_mode=True,
)

# In an async context:
result = await pipeline.run(
    user_query="Top 5 products by quantity sold?",
    connection_string="postgresql://user:pass@localhost/shop",
)
if result.error:
    print("Error:", result.error)
else:
    print(result.answer)
    result.plotly_figure.show()
async run(user_query, **kwargs)[source]

Async run() — delegates to SQLAnalystPipeline.arun().

Return type:

SQLAnalystResult