RAG Pipeline

Pipeline

class ractogateway.rag.pipeline.RactoRAG(vector_store=None, embedder=None, *, store=None, chunker=None, processors=None, llm_kit=None, context_template=None, reader_registry=None, default_prompt=None)[source]

Bases: object

Production-grade RAG pipeline for RactoGateway.

Parameters:
ingest(path, **metadata)[source]

Read, chunk, embed, and store a single file.

Parameters:
  • path (str | Path) – Path to the file to ingest.

  • **metadata (Any) – Extra metadata merged into each chunk’s ChunkMetadata.extra.

Return type:

list[Chunk]

Returns:

list[Chunk] – The chunks that were added to the vector store.

ingest_dir(directory, pattern='**/*', **metadata)[source]

Recursively ingest all supported files in a directory.

Parameters:
  • directory (str | Path) – Root directory to scan.

  • pattern (str) – Glob pattern relative to directory.

  • **metadata (Any) – Extra metadata merged into every chunk.

Return type:

list[Chunk]

Returns:

list[Chunk] – All chunks added across all ingested files.

ingest_text(text, source='manual', **metadata)[source]

Ingest a raw text string directly (no file needed).

Parameters:
  • text (str) – The text content to ingest.

  • source (str) – A label identifying the source (stored in metadata).

  • **metadata (Any) – Extra metadata merged into each chunk.

Return type:

list[Chunk]

async aingest(path, **metadata)[source]

Async variant of ingest().

Return type:

list[Chunk]

async aingest_dir(directory, pattern='**/*', **metadata)[source]

Async variant of ingest_dir().

Return type:

list[Chunk]

async aingest_text(text, source='manual', **metadata)[source]

Async variant of ingest_text().

Return type:

list[Chunk]

retrieve(query, top_k=5, filters=None)[source]

Embed query and retrieve the top-k most relevant chunks.

Parameters:
  • query (str) – Natural-language question or search phrase.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked results (rank 1 = most relevant).

async aretrieve(query, top_k=5, filters=None)[source]

Async variant of retrieve().

Return type:

list[RetrievalResult]

query(question, top_k=5, filters=None, prompt=None, temperature=0.0, max_tokens=2048)[source]

Retrieve relevant chunks and generate an answer.

Parameters:
  • question (str) – The user’s question.

  • top_k (int) – Number of context chunks to retrieve.

  • filters (dict[str, Any] | None) – Optional metadata filters for retrieval.

  • prompt (RactoPrompt | None) – Override the default RACTO prompt for generation.

  • temperature (float) – LLM temperature (default 0.0 for factual answers).

  • max_tokens (int) – Maximum tokens in the generated answer.

Return type:

RAGResponse

Returns:

RAGResponse – Contains the generated answer plus the retrieved source chunks.

Raises:

RuntimeError – If no llm_kit was provided.

async aquery(question, top_k=5, filters=None, prompt=None, temperature=0.0, max_tokens=2048)[source]

Async variant of query().

Return type:

RAGResponse

property store: BaseVectorStore

The underlying vector store.

property embedder: BaseEmbedder

The underlying embedder.

count()[source]

Return the total number of indexed chunks.

Return type:

int

clear()[source]

Remove all indexed chunks from the vector store.

Return type:

None

Models

Core document and chunk models for RAG.

Every piece of content in the RAG pipeline is represented as a Document (raw, as loaded from a file) or a Chunk (a processed, embeddable slice of a document). Both are strict Pydantic models with no unvalidated fields.

class ractogateway.rag._models.document.ChunkMetadata(**data)[source]

Bases: BaseModel

Provenance and positional data attached to every chunk.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

source: str
page: int | None
chunk_index: int
total_chunks: int
start_char: int
end_char: int
doc_id: str
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.rag._models.document.Document(**data)[source]

Bases: BaseModel

A raw document loaded from a file or supplied as plain text.

Parameters:
  • content (str) – The full extracted text of the document.

  • source (str) – Absolute file path, URL, or a descriptive label (e.g. "manual").

  • metadata (dict[str, Any]) – Free-form key/value pairs (file size, author, MIME type, …).

  • doc_id (str) – Auto-generated UUID; override only when you need stable IDs.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

doc_id: str
content: str
source: str
metadata: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.rag._models.document.Chunk(**data)[source]

Bases: BaseModel

A single embeddable slice of a document.

Produced by a BaseChunker and enriched with an embedding vector by a BaseEmbedder.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

chunk_id: str
doc_id: str
content: str
embedding: list[float] | None
metadata: ChunkMetadata
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Retrieval and RAG response models.

class ractogateway.rag._models.retrieval.RetrievalConfig(**data)[source]

Bases: BaseModel

Input parameters for a vector-store search.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

query: str
top_k: int
filters: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.rag._models.retrieval.RetrievalResult(**data)[source]

Bases: BaseModel

A single retrieved chunk together with its relevance score.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

chunk: Chunk
score: float
rank: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.rag._models.retrieval.RAGResponse(**data)[source]

Bases: BaseModel

Combined output from a RAG query (retrieval + generation).

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

answer: LLMResponse
sources: list[RetrievalResult]
query: str
context_used: str
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Readers

Abstract base class for all file readers.

class ractogateway.rag.readers.base.BaseReader[source]

Bases: ABC

Read content from a file path, raw bytes, or a binary buffer.

Concrete subclasses must implement _read_path() and may override _read_bytes() to support bytes/buffer input. The public read() method handles all type coercion automatically.

abstract property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

read(source)[source]

Load source and return its content as a Document.

Parameters:

source (str | Path | bytes | BinaryIO) –

str or Path

File path read from disk. Both absolute and relative paths are accepted.

bytes

Raw file bytes. Document.source is set to "<bytes>".

binary file-like object

Any object with a .read() -> bytes method — e.g. io.BytesIO, an open binary file handle, a network stream. Document.source is set to "<buffer>".

Return type:

Document

FileReaderRegistry — auto-detects the right reader for any file extension.

class ractogateway.rag.readers.registry.FileReaderRegistry(readers=None)[source]

Bases: object

Registry that maps file extensions to BaseReader instances.

By default all built-in readers are registered. You can add custom readers with register().

Example:

registry = FileReaderRegistry()
doc = registry.read("report.pdf")
register(reader)[source]

Add reader to the registry for all its supported extensions.

Return type:

None

get_reader(path)[source]

Return the reader for path’s extension.

Raises:

ValueError – If no reader supports the file’s extension.

Return type:

BaseReader

read(path)[source]

Convenience method: detect reader and return a Document.

Return type:

Document

property supported_extensions: frozenset[str]

All extensions currently registered.

Plain-text reader — handles .txt, .md, .rst, .log and similar files.

class ractogateway.rag.readers.text_reader.TextReader(encoding='utf-8')[source]

Bases: BaseReader

Read any UTF-8 (or latin-1 fallback) plain-text file.

No external dependencies required.

Accepts a file path (str / Path), raw bytes, or any binary file-like object with a .read() method.

Parameters:

encoding (str) – Primary encoding to try. Falls back to "latin-1" on error.

property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

PDF reader — uses pypdf (lazy import).

Install with: pip install ractogateway[rag-pdf]

class ractogateway.rag.readers.pdf_reader.PdfReader(extract_images=False)[source]

Bases: BaseReader

Extract text from PDF files using pypdf.

Accepts a file path (str / Path), raw bytes, or any binary file-like object with a .read() method.

Parameters:

extract_images (bool) – Reserved for future use — image extraction is not yet supported.

property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

Word document reader — uses python-docx (lazy import).

Install with: pip install ractogateway[rag-word]

class ractogateway.rag.readers.word_reader.WordReader[source]

Bases: BaseReader

Extract text from Microsoft Word (.docx) files using python-docx.

Accepts a file path (str / Path), raw bytes, or any binary file-like object with a .read() method.

property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

Spreadsheet reader — handles CSV (stdlib) and XLSX (openpyxl, lazy).

Install xlsx support with: pip install ractogateway[rag-excel]

class ractogateway.rag.readers.spreadsheet_reader.SpreadsheetReader(max_rows=None, include_header=True)[source]

Bases: BaseReader

Read CSV and Excel spreadsheets into plain text.

Each row is rendered as a tab-separated line; an optional header row is prepended. Multiple sheets in an XLSX workbook are separated by a --- Sheet: <name> --- divider.

Accepts a file path (str / Path), raw bytes, or any binary file-like object with a .read() method. When bytes/buffer are provided, XLSX format is detected via the ZIP magic header (PK\x03\x04); everything else is treated as CSV/TSV.

Parameters:
  • max_rows (int | None) – Maximum number of rows to read per sheet (None = all).

  • include_header (bool) – Whether to repeat the header row at the start of each sheet section.

property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

Image reader — uses Pillow (lazy import) to extract metadata.

Images are represented as a textual description of their EXIF/metadata, plus an optional prompt to an LLM for visual description. Pixel data is not stored in the Document; use RactoFile for multimodal vision calls.

Install with: pip install ractogateway[rag-image]

class ractogateway.rag.readers.image_reader.ImageReader(include_exif=True)[source]

Bases: BaseReader

Extract metadata from image files and represent them as text Documents.

The resulting Document.content is a human-readable summary of image properties (size, mode, format, EXIF tags). Pass the image to a vision LLM separately using RactoFile for actual visual understanding.

Accepts a file path (str / Path), raw bytes, or any binary file-like object with a .read() method.

Parameters:

include_exif (bool) – Whether to extract and include EXIF metadata in the content.

property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

HTML reader — uses stdlib html.parser (no extra deps).

class ractogateway.rag.readers.html_reader.HtmlReader[source]

Bases: BaseReader

Extract visible text from HTML files using the stdlib HTML parser.

No external dependencies required.

Accepts a file path (str / Path), raw bytes, or any binary file-like object with a .read() method.

property supported_extensions: frozenset[str]

Lower-case extensions (with dot) this reader handles, e.g. {".pdf"}.

Chunkers

Abstract base class for text chunkers.

class ractogateway.rag.chunkers.base.BaseChunker[source]

Bases: ABC

Split a Document into a list of Chunk objects.

Each chunk preserves provenance (doc_id, chunk_index, start_char, end_char) in its ChunkMetadata.

abstractmethod chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

Fixed-size character chunker with configurable overlap.

class ractogateway.rag.chunkers.fixed_chunker.FixedChunker(chunk_size=512, overlap=50)[source]

Bases: BaseChunker

Split text into fixed-size character windows with overlap.

Parameters:
  • chunk_size (int) – Maximum number of characters per chunk.

  • overlap (int) – Number of characters to repeat at the start of the next chunk. Must be less than chunk_size.

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

Recursive character text splitter (LangChain-style).

Tries progressively finer separators ("\n\n", "\n", ". ", " " and finally character-by-character) until every piece fits within chunk_size.

class ractogateway.rag.chunkers.recursive_chunker.RecursiveChunker(chunk_size=512, overlap=50, separators=None)[source]

Bases: BaseChunker

Split text recursively using a priority list of separators.

Parameters:
  • chunk_size (int) – Maximum number of characters per chunk.

  • overlap (int) – Number of characters of overlap between consecutive chunks.

  • separators (list[str] | None) – Ordered list of separator strings to try. The first separator that produces pieces within chunk_size is used.

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

Sentence-aware chunker — uses NLTK sent_tokenize (lazy import).

Install with: pip install ractogateway[rag-nlp]

class ractogateway.rag.chunkers.sentence_chunker.SentenceChunker(sentences_per_chunk=5, overlap_sentences=1, language='english')[source]

Bases: BaseChunker

Split text into groups of sentences using NLTK.

Parameters:
  • sentences_per_chunk (int) – Number of sentences per chunk.

  • overlap_sentences (int) – Number of sentences to repeat at the start of the next chunk.

  • language (str) – Language for the NLTK sentence tokenizer (default: "english").

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

Semantic chunker — splits at embedding-space boundaries.

Uses cosine similarity between adjacent sentence embeddings to detect topic shifts. Requires an BaseEmbedder and NLTK sent_tokenize.

Install with: pip install ractogateway[rag-nlp]

class ractogateway.rag.chunkers.semantic_chunker.SemanticChunker(embedder, threshold=0.5, min_chunk_size=2, language='english')[source]

Bases: BaseChunker

Split documents where the semantic similarity between adjacent sentences drops below a threshold.

Parameters:
  • embedder (BaseEmbedder) – Any BaseEmbedder instance.

  • threshold (float) – Cosine similarity below which a split is inserted (default: 0.5).

  • min_chunk_size (int) – Minimum number of sentences per chunk (prevents ultra-fine splits).

  • language (str) – NLTK sentence tokenizer language.

chunk(document)[source]

Split document into chunks.

Parameters:

document (Document) – The fully-loaded document to split.

Return type:

list[Chunk]

Returns:

list[Chunk] – Ordered list of non-overlapping (or slightly overlapping) chunks.

Processors

Abstract base class for text processors.

class ractogateway.rag.processors.base.BaseProcessor[source]

Bases: ABC

Transform a text string and return the processed result.

Processors are applied to chunk content before embedding. They can normalise whitespace, lemmatize tokens, remove stop words, etc.

Chain multiple processors with ProcessingPipeline.

abstractmethod process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

Text cleaning processor — no extra dependencies.

class ractogateway.rag.processors.cleaner.TextCleaner(normalize_unicode=True, strip_html=True, strip_control_chars=True, collapse_whitespace=True, collapse_blank_lines=True)[source]

Bases: BaseProcessor

Normalise text for embedding and retrieval.

Steps applied (all optional via constructor flags):

  1. Unicode normalisation (NFC)

  2. Strip residual HTML tags

  3. Remove control characters

  4. Collapse multiple spaces to one

  5. Collapse runs of blank lines to at most two newlines

  6. Strip leading/trailing whitespace

Parameters:
  • normalize_unicode (bool) – Apply unicodedata.normalize("NFC", text).

  • strip_html (bool) – Remove <tag> patterns.

  • strip_control_chars (bool) – Remove non-printable control characters.

  • collapse_whitespace (bool) – Collapse sequences of spaces/tabs to a single space.

  • collapse_blank_lines (bool) – Collapse 3+ consecutive newlines to 2.

process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

Lemmatization processor — uses NLTK WordNetLemmatizer (lazy import).

Install with: pip install ractogateway[rag-nlp]

Note: Lemmatization changes the surface form of text and can degrade embedding quality for neural models (which were trained on unmodified text). Use this processor only when building keyword-index pipelines or when explicitly required for your retrieval strategy.

class ractogateway.rag.processors.lemmatizer.Lemmatizer(use_pos_tagging=True)[source]

Bases: BaseProcessor

Reduce words to their base (lemma) form using NLTK WordNet.

Parameters:

use_pos_tagging (bool) – If True, use POS tagging to improve lemmatization accuracy. Slightly slower but produces better results.

process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

ProcessingPipeline — chain multiple processors sequentially.

class ractogateway.rag.processors.pipeline.ProcessingPipeline(processors)[source]

Bases: BaseProcessor

Apply a sequence of BaseProcessor objects to text.

Example:

pipeline = ProcessingPipeline([TextCleaner(), Lemmatizer()])
processed = pipeline.process("  Hello,   worlds!  ")
Parameters:

processors (list[BaseProcessor]) – Ordered list of processors to apply. Each processor receives the output of the previous one.

process(text)[source]

Process text and return the transformed string.

Parameters:

text (str) – Input text (chunk content or raw document content).

Return type:

str

Returns:

str – Processed text. Must be a non-empty string when input is non-empty.

Embedders

Abstract base class for embedding providers.

class ractogateway.rag.embedders.base.BaseEmbedder[source]

Bases: ABC

Embed a list of texts into dense float vectors.

All embedders implement both sync embed() and async aembed() variants. The dimension of returned vectors is declared via the dimension property (-1 if unknown until the first call).

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

abstractmethod embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

abstractmethod async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

OpenAI embedding provider.

Install with: pip install ractogateway[openai]

class ractogateway.rag.embedders.openai_embedder.OpenAIEmbedder(model='text-embedding-3-small', *, api_key=None, base_url=None, dimensions=None, batch_size=256)[source]

Bases: BaseEmbedder

Embed texts using the OpenAI Embeddings API.

Parameters:
  • model (str) – OpenAI embedding model (default "text-embedding-3-small").

  • api_key (str | None) – OpenAI API key. Falls back to OPENAI_API_KEY env var.

  • base_url (str | None) – Custom base URL (Azure OpenAI or proxy).

  • dimensions (int | None) – Override output dimensionality (supported for text-embedding-3-*).

  • batch_size (int) – Maximum number of texts per API call.

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

Google Gemini embedding provider.

Install with: pip install ractogateway[google]

class ractogateway.rag.embedders.google_embedder.GoogleEmbedder(model='text-embedding-004', *, api_key=None, task_type=None, batch_size=100)[source]

Bases: BaseEmbedder

Embed texts using the Google Gemini Embeddings API.

Parameters:
  • model (str) – Gemini embedding model (default "text-embedding-004").

  • api_key (str | None) – Gemini API key. Falls back to GEMINI_API_KEY env var.

  • task_type (str | None) – Gemini task type hint (e.g. "RETRIEVAL_DOCUMENT", "RETRIEVAL_QUERY"). None lets the API decide.

  • batch_size (int) – Maximum number of texts per API call.

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

Voyage AI embedding provider (Anthropic-aligned, best for Claude RAG).

Install with: pip install ractogateway[rag-voyage]

class ractogateway.rag.embedders.voyage_embedder.VoyageEmbedder(model='voyage-3', *, api_key=None, input_type='document', batch_size=128)[source]

Bases: BaseEmbedder

Embed texts using the Voyage AI API.

Voyage AI embeddings are optimised for Anthropic Claude RAG pipelines and are the recommended choice when using Claude as the generation LLM.

Parameters:
  • model (str) – Voyage model name (default "voyage-3").

  • api_key (str | None) – Voyage API key. Falls back to VOYAGE_API_KEY env var.

  • input_type (str | None) – "query" for queries, "document" for documents to index. Using the correct type improves retrieval quality.

  • batch_size (int) – Maximum texts per API call.

property dimension: int

Dimensionality of the embedding vectors.

Returns -1 if not known until after the first call.

embed(texts)[source]

Embed texts synchronously.

Parameters:

texts (list[str]) – Non-empty list of strings to embed.

Return type:

list[list[float]]

Returns:

list[list[float]] – One embedding vector per input text, in the same order.

async aembed(texts)[source]

Async variant of embed().

Return type:

list[list[float]]

PageIndexRAG — Vectorless BM25 Pipeline

class ractogateway.rag.page_index.pipeline.PageIndexRAG(llm_kit=None, *, processors=None, reader_registry=None, context_template="Use the following retrieved page excerpts to answer the user's question.\\nIf the excerpts do not contain enough information, say so clearly.\\n\\n--- CONTEXT ---\\n{context}\\n--- END CONTEXT ---\\n\\nQuestion: {question}", default_prompt=None, page_size=1000, page_overlap=100, k1=1.5, b=0.75, top_keywords=20, ocr_backend=None, ocr_fallback=True, min_ocr_confidence=0.0)[source]

Bases: object

Vectorless RAG pipeline that indexes documents at the page level.

Parameters:
  • llm_kit (Any) – Any RactoGateway developer kit (OpenAI, Anthropic, Google, Ollama, HuggingFace). Required only for query() / aquery(). Pass None to use the pipeline in retrieve-only mode.

  • processors (Sequence[BaseProcessor] | None) – Text processors applied to each page before indexing. Defaults to [TextCleaner()].

  • reader_registry (FileReaderRegistry | None) – File reader registry used to load non-PDF documents. Defaults to a FileReaderRegistry with all built-in readers registered.

  • context_template (str) – Jinja-style template with {context} and {question} placeholders used when building the LLM prompt.

  • default_prompt (RactoPrompt | None) – RactoPrompt used for generation. Defaults to a built-in factual Q&A prompt.

  • page_size (int) – Maximum character length of each page window for non-PDF files (default 1 000).

  • page_overlap (int) – Character overlap between consecutive windows (default 100).

  • k1 (float) – BM25 term-frequency saturation parameter (default 1.5).

  • b (float) – BM25 length-normalisation parameter (default 0.75).

  • top_keywords (int) – Number of top TF-weighted keywords to extract per page for the decision index (default 20).

retrieve(query, top_k=5)[source]

Retrieve the most relevant pages for query.

Uses two-stage retrieval: decision index (candidate selection) → BM25 scoring (ranking).

Parameters:
  • query (str) – Natural-language question or keyword string.

  • top_k (int) – Maximum number of results to return.

Return type:

list[PageIndexResult]

Returns:

list[PageIndexResult] – Pages ranked by BM25 score (most relevant first).

async aretrieve(query, top_k=5)[source]

Async variant of retrieve().

Return type:

list[PageIndexResult]

ingest(path, **metadata)[source]

Read a file and add its pages to the index.

PDFs are split page-by-page; all other file types are split into fixed-size character windows.

Parameters:
  • path (str) – Absolute or relative path to the file.

  • **metadata (Any) – Arbitrary key/value pairs stored in PageEntry.extra.

Return type:

list[PageEntry]

Returns:

list[PageEntry] – All page entries created from this file.

async aingest(path, **metadata)[source]

Async variant of ingest().

Return type:

list[PageEntry]

ingest_text(text, source='manual', **metadata)[source]

Index raw text directly (no file I/O).

Parameters:
  • text (str) – Plain text to index.

  • source (str) – Descriptive label stored in PageEntry.source.

  • **metadata (Any) – Arbitrary key/value pairs stored in PageEntry.extra.

Return type:

list[PageEntry]

async aingest_text(text, source='manual', **metadata)[source]

Async variant of ingest_text().

Return type:

list[PageEntry]

ingest_dir(directory, pattern='**/*', *, on_progress=None, **metadata)[source]

Ingest all files matching pattern inside directory.

Files that cannot be read are logged and skipped; the rest are indexed normally.

Parameters:
  • directory (str) – Root directory to search.

  • pattern (str) – Glob pattern relative to directory (default "**/*").

  • on_progress (Callable[[int, int], None] | None) – Optional callback (done, total) -> None called after each file is processed (or skipped). Useful for progress bars.

  • **metadata (Any) – Forwarded to every ingest() call.

Return type:

list[PageEntry]

async aingest_dir(directory, pattern='**/*', *, max_concurrent=4, on_progress=None, **metadata)[source]

Async parallel variant of ingest_dir().

Parameters:
  • directory (str) – Root directory to search.

  • pattern (str) – Glob pattern relative to directory (default "**/*").

  • max_concurrent (int) – Maximum number of files ingested concurrently (default 4).

  • on_progress (Callable[[int, int], None] | None) – Optional callback (done, total) -> None called after each file finishes (thread-safe; called from the event loop).

  • **metadata (Any) – Forwarded to every aingest() call.

Return type:

list[PageEntry]

add_document(path, **metadata)[source]

Alias for ingest().

Return type:

list[PageEntry]

add_texts(texts, source='manual', **metadata)[source]

Ingest a list of text strings.

Return type:

list[PageEntry]

search(query, *, top_k=5, prompt=None, temperature=0.0, max_tokens=2048)[source]

Alias for query().

Return type:

PageIndexResponse

query(question, *, top_k=5, prompt=None, temperature=0.0, max_tokens=2048)[source]

Retrieve relevant pages and generate an answer with the LLM kit.

Parameters:
  • question (str) – Natural-language question to answer.

  • top_k (int) – Number of pages to retrieve.

  • prompt (RactoPrompt | None) – Override the kit’s default prompt for this call.

  • temperature (float) – Sampling temperature for generation.

  • max_tokens (int) – Maximum generation tokens.

Return type:

PageIndexResponse

Returns:

PageIndexResponse – Contains the generated answer, ranked sources, and the context string that was supplied to the model.

Raises:

ValueError – If no llm_kit was provided and generation is requested.

async aquery(question, *, top_k=5, prompt=None, temperature=0.0, max_tokens=2048)[source]

Async variant of query().

Return type:

PageIndexResponse

remove_document(doc_id)[source]

Remove all pages belonging to doc_id from the index.

Parameters:

doc_id (str) – The doc_id value from any PageEntry returned during ingestion.

Return type:

int

Returns:

int – Number of page entries removed.

clear()[source]

Remove all indexed entries and reset the pipeline to empty state.

Return type:

None

save(path)[source]

Serialise the full index to a JSON file.

The saved file contains all PageEntry records, BM25 term weights, and deduplication hashes. Reload with load().

Parameters:

path (str) – Destination file path (will be created or overwritten).

Return type:

None

classmethod load(path, **kwargs)[source]

Load a previously saved index from path.

Parameters:
  • path (str) – JSON file written by save().

  • **kwargs (Any) – Forwarded to the constructor (e.g. llm_kit=kit).

Return type:

PageIndexRAG

Returns:

PageIndexRAG – A new instance with the index fully restored.

property entry_count: int

Total number of indexed page entries.

property document_count: int

Number of distinct documents ingested.

PageIndex Models

Pydantic models for the PageIndexRAG pipeline.

class ractogateway.rag.page_index._models.PageEntry(**data)[source]

Bases: BaseModel

A single page (or fixed-size window) extracted from a document.

Produced by PageIndexRAG during ingestion and stored in the in-process index.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

entry_id: str
page_number: int | None
content: str
source: str
section_title: str | None
keywords: list[str]
doc_id: str
char_count: int
extra: dict[str, Any]
ocr_applied: bool
ocr_confidence: float | None
content_hash: str | None
property text: str

Alias for content.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.rag.page_index._models.PageIndexResult(**data)[source]

Bases: BaseModel

A single retrieved page together with its BM25 relevance score.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

entry: PageEntry
score: float
rank: int
matched_terms: list[str]
property content: str

Alias for entry.content.

property text: str

Alias for entry.content.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.rag.page_index._models.PageIndexResponse(**data)[source]

Bases: BaseModel

Full response from PageIndexRAG.query() / PageIndexRAG.aquery().

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

answer: LLMResponse | None
sources: list[PageIndexResult]
query: str
context_used: str
property results: list[PageIndexResult]

Alias for sources.

property pages: list[PageIndexResult]

Alias for sources.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

PageIndex BM25 Engine

Pure-Python BM25 index and decision-tree inverted index.

No external dependencies required — everything is implemented with the Python standard library.

Two components work together for two-stage retrieval:

  1. _DecisionIndex — an inverted keyword index that maps content terms to page entry IDs. Given a tokenised query it returns the union of candidate entry IDs in O(|query terms|) time. This is the “decision tree” routing layer.

  2. BM25Index — Okapi BM25 (k1=1.5, b=0.75) that scores the candidates returned by the decision index. Only candidates are scored, so the full corpus is never re-ranked on every query.

ractogateway.rag.page_index._bm25.extract_keywords(text, top_n=20)[source]

Return the top-n most frequent content tokens from text.

Return type:

list[str]

class ractogateway.rag.page_index._bm25.BM25Index(k1=1.5, b=0.75)[source]

Bases: object

Okapi BM25 scorer over a corpus of PageEntry texts.

Parameters:
  • k1 (float) – Term-frequency saturation parameter (default 1.5).

  • b (float) – Length normalisation parameter (default 0.75).

add(entry_id, text)[source]

Tokenise text and add the entry to the index.

Return type:

None

remove(entry_id)[source]

Remove entry_id from the index.

Return type:

None

clear()[source]
Return type:

None

score(query, candidate_ids=None)[source]

Score candidates against query and return ranked results.

Parameters:
  • query (str) – Raw query string.

  • candidate_ids (set[str] | None) – Subset of entry IDs to score. When None the entire corpus is scored (full-scan fallback).

Return type:

list[tuple[str, float, list[str]]]

Returns:

list of (entry_id, bm25_score, matched_terms) – Sorted descending by score, ties broken by entry_id for stability.

property entry_count: int

Stores

Abstract base class for vector stores.

class ractogateway.rag.stores.base.BaseVectorStore[source]

Bases: ABC

Persist and search embedding vectors.

All vector stores share the same interface: add(), search(), delete(), clear(), and count(). The underlying storage backend is determined by the concrete subclass.

abstractmethod add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

abstractmethod search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

abstractmethod delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

abstractmethod clear()[source]

Remove all chunks from the store.

Return type:

None

abstractmethod count()[source]

Return the total number of indexed chunks.

Return type:

int

In-memory vector store — pure Python, zero extra dependencies.

Uses brute-force cosine similarity over a list of stored vectors. Suitable for development, testing, and small corpora (< 10k chunks).

class ractogateway.rag.stores.in_memory_store.InMemoryVectorStore(similarity='cosine')[source]

Bases: BaseVectorStore

Pure-Python brute-force vector store — no extra dependencies.

This store keeps all chunks and their embeddings in memory. It is not suitable for production-scale corpora but requires no installation.

Parameters:

similarity (str) – Similarity function to use. Currently only "cosine" is supported.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

ChromaDB vector store (lazy import).

Install with: pip install ractogateway[rag-chroma]

class ractogateway.rag.stores.chroma_store.ChromaStore(collection='ractogateway', *, path=None, host=None, port=8000, distance_function='cosine')[source]

Bases: BaseVectorStore

Vector store backed by ChromaDB.

Supports both in-process (path or None for ephemeral) and HTTP-client modes (host + port).

Parameters:
  • collection (str) – Name of the ChromaDB collection.

  • path (str | None) – Persist directory for a local persistent client. None = ephemeral.

  • host (str | None) – ChromaDB server host (enables HTTP client mode).

  • port (int) – ChromaDB server port (default 8000).

  • distance_function (str) – "cosine", "l2", or "ip" (inner product).

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

FAISS vector store (lazy import).

Install with: pip install ractogateway[rag-faiss]

class ractogateway.rag.stores.faiss_store.FAISSStore(dimension=None, index_type='flat_ip')[source]

Bases: BaseVectorStore

Vector store backed by Facebook AI Similarity Search (FAISS).

Stores embeddings in a flat L2 or cosine (Inner Product) index. All data is in-memory; call save() / load() to persist.

Parameters:
  • dimension (int | None) – Embedding dimension. Inferred from the first add() call if None.

  • index_type (str) – "flat_l2" or "flat_ip" (inner product / cosine when normalised).

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

save(path)[source]

Persist the FAISS index to path.index and chunks to path.chunks.

Return type:

None

load(path)[source]

Load a previously saved index from path.

Return type:

None

Pinecone vector store (lazy import).

Install with: pip install ractogateway[rag-pinecone]

class ractogateway.rag.stores.pinecone_store.PineconeStore(index_name, *, api_key=None, namespace='', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Pinecone cloud.

Parameters:
  • index_name (str) – Name of the Pinecone index (must already exist).

  • api_key (str | None) – Pinecone API key. Falls back to PINECONE_API_KEY env var.

  • namespace (str) – Pinecone namespace for logical data isolation.

  • environment – Deprecated Pinecone environment string (for legacy pod-based indexes).

  • batch_size (int) – Number of vectors per upsert batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

Qdrant vector store (lazy import).

Install with: pip install ractogateway[rag-qdrant]

class ractogateway.rag.stores.qdrant_store.QdrantStore(collection='ractogateway', *, url=None, api_key=None, distance='cosine', dimension=None, batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Qdrant.

Parameters:
  • collection (str) – Qdrant collection name.

  • url (str | None) – Qdrant server URL. None = in-memory.

  • api_key (str | None) – Qdrant cloud API key (optional).

  • distance (str) – "cosine", "euclid", or "dot".

  • dimension (int | None) – Vector dimension. Inferred on first add if None.

  • batch_size (int) – Points per upsert batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

Weaviate vector store (lazy import).

Install with: pip install ractogateway[rag-weaviate]

class ractogateway.rag.stores.weaviate_store.WeaviateStore(class_name='RactoChunk', *, url=None, api_key=None, additional_headers=None, distance_metric='cosine', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Weaviate.

Supports embedded (local, no server needed), local server, and Weaviate Cloud (WCS) connections.

Parameters:
  • class_name (str) – Weaviate class (collection) name.

  • url (str | None) – Weaviate server URL. None = use embedded Weaviate.

  • api_key (str | None) – Weaviate Cloud API key.

  • additional_headers (dict[str, str] | None) – Extra HTTP headers (e.g. for OpenAI API key pass-through to Weaviate).

  • distance_metric (str) – "cosine" or "l2-squared".

  • batch_size (int) – Objects per batch import.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

Milvus / Zilliz vector store (lazy import).

Install with: pip install ractogateway[rag-milvus]

class ractogateway.rag.stores.milvus_store.MilvusStore(collection='ractogateway', *, host='localhost', port=19530, uri=None, token=None, dimension=None, metric_type='IP', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by Milvus or Zilliz Cloud.

Parameters:
  • collection (str) – Milvus collection name.

  • host (str) – Milvus server host (default "localhost").

  • port (int) – Milvus server port (default 19530).

  • uri (str | None) – Zilliz Cloud URI (overrides host/port when set).

  • token (str | None) – Zilliz Cloud API token.

  • dimension (int | None) – Embedding dimension. Inferred on first add.

  • metric_type (str) – "IP" (inner product / cosine) or "L2".

  • batch_size (int) – Vectors per insert batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int

PostgreSQL + pgvector store (lazy import).

Install with: pip install ractogateway[rag-pgvector]

class ractogateway.rag.stores.pgvector_store.PGVectorStore(dsn, *, table='rag_chunks', dimension=None, distance='cosine', batch_size=100)[source]

Bases: BaseVectorStore

Vector store backed by PostgreSQL with the pgvector extension.

Parameters:
  • dsn (str) – PostgreSQL connection string (e.g. "postgresql://user:pass@localhost/mydb").

  • table (str) – Table name (default "rag_chunks").

  • dimension (int | None) – Embedding dimension. Inferred on first add.

  • distance (str) – "cosine", "l2", or "inner".

  • batch_size (int) – Rows per INSERT batch.

add(chunks)[source]

Add chunks (with embeddings) to the store.

Parameters:

chunks (list[Chunk]) – Chunks to index. Each chunk must have a non-None embedding.

Raises:

ValueError – If any chunk has embedding=None.

Return type:

None

search(embedding, top_k=5, filters=None)[source]

Search for the top_k most similar chunks.

Parameters:
  • embedding (list[float]) – Query embedding vector.

  • top_k (int) – Number of results to return.

  • filters (dict[str, Any] | None) – Optional metadata filters (store-specific format).

Return type:

list[RetrievalResult]

Returns:

list[RetrievalResult] – Ranked list of results (rank 1 = most similar).

delete(chunk_ids)[source]

Remove chunks with the given IDs from the store.

Return type:

None

clear()[source]

Remove all chunks from the store.

Return type:

None

count()[source]

Return the total number of indexed chunks.

Return type:

int