Source code for ractogateway.rag.pipeline

"""RactoRAG — the unified RAG pipeline.

Combines file reading, text processing, chunking, embedding, vector storage,
retrieval, and LLM-based generation into a single, ergonomic interface.

Example::

    from ractogateway import openai_developer_kit as opd
    from ractogateway.rag.pipeline import RactoRAG
    from ractogateway.rag.embedders.openai_embedder import OpenAIEmbedder
    from ractogateway.rag.stores.chroma_store import ChromaStore

    kit = opd.OpenAIDeveloperKit(model="gpt-4o")
    rag = RactoRAG(
        vector_store=ChromaStore(collection="docs"),
        embedder=OpenAIEmbedder(),
        llm_kit=kit,
    )
    rag.ingest("report.pdf")
    response = rag.query("What were the key findings?")
    print(response.answer.content)
"""

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import Any

from ractogateway._models.chat import ChatConfig
from ractogateway.adapters.base import LLMResponse
from ractogateway.prompts.engine import RactoPrompt
from ractogateway.rag._models.document import Chunk, Document
from ractogateway.rag._models.retrieval import RAGResponse, RetrievalResult
from ractogateway.rag.chunkers.base import BaseChunker
from ractogateway.rag.chunkers.recursive_chunker import RecursiveChunker
from ractogateway.rag.embedders.base import BaseEmbedder
from ractogateway.rag.processors.base import BaseProcessor
from ractogateway.rag.processors.cleaner import TextCleaner
from ractogateway.rag.readers.registry import FileReaderRegistry
from ractogateway.rag.stores.base import BaseVectorStore

_DEFAULT_CONTEXT_TEMPLATE = """\
Use the following retrieved context to answer the user's question.
If the context does not contain enough information, say so clearly.

--- CONTEXT ---
{context}
--- END CONTEXT ---

Question: {question}"""

_DEFAULT_RAG_PROMPT = RactoPrompt(
    role="You are a precise, factual question-answering assistant.",
    aim="Answer the user's question accurately based solely on the provided context.",
    constraints=[
        "Do not invent facts not present in the context.",
        "If the context is insufficient, explicitly state that.",
        "Cite the source document when possible.",
    ],
    tone="Clear, concise, and professional.",
    output_format="A direct answer to the question, optionally with bullet points.",
)


[docs] class RactoRAG: """Production-grade RAG pipeline for RactoGateway. Parameters ---------- vector_store: Any :class:`~ractogateway.rag.stores.base.BaseVectorStore` instance. embedder: Any :class:`~ractogateway.rag.embedders.base.BaseEmbedder` instance. chunker: How to split documents. Defaults to :class:`~ractogateway.rag.chunkers.recursive_chunker.RecursiveChunker` with ``chunk_size=512, overlap=50``. processors: List of text processors applied to each chunk before embedding. Defaults to ``[TextCleaner()]``. llm_kit: Any developer kit (``OpenAIDeveloperKit``, ``GoogleDeveloperKit``, or ``AnthropicDeveloperKit``). Required for :meth:`query` / :meth:`aquery`. context_template: Template string for injecting retrieved context into the LLM prompt. Must contain ``{context}`` and ``{question}`` placeholders. reader_registry: Custom :class:`~ractogateway.rag.readers.registry.FileReaderRegistry`. Defaults to a registry with all built-in readers. default_prompt: RACTO prompt used for generation. Falls back to a built-in RAG prompt. """ def __init__( self, vector_store: BaseVectorStore | None = None, embedder: BaseEmbedder | None = None, *, store: BaseVectorStore | None = None, chunker: BaseChunker | None = None, processors: list[BaseProcessor] | None = None, llm_kit: Any | None = None, context_template: str | None = None, reader_registry: FileReaderRegistry | None = None, default_prompt: RactoPrompt | None = None, ) -> None: if vector_store is not None and store is not None: raise TypeError( "Pass only one of 'vector_store' or legacy alias 'store', not both." ) resolved_store = vector_store if vector_store is not None else store if resolved_store is None: raise TypeError( "Missing required vector store. Pass 'vector_store=...' " "(or legacy alias 'store=...')." ) if embedder is None: raise TypeError("Missing required embedder. Pass 'embedder=...'.") self._store = resolved_store self._embedder = embedder self._chunker = chunker or RecursiveChunker(chunk_size=512, overlap=50) self._processors: list[BaseProcessor] = ( processors if processors is not None else [TextCleaner()] ) self._llm_kit = llm_kit self._context_template = context_template or _DEFAULT_CONTEXT_TEMPLATE self._reader_registry = reader_registry or FileReaderRegistry() self._default_prompt = default_prompt or _DEFAULT_RAG_PROMPT # ================================================================== # Ingest # ==================================================================
[docs] def ingest(self, path: str | Path, **metadata: Any) -> list[Chunk]: """Read, chunk, embed, and store a single file. Parameters ---------- path: Path to the file to ingest. **metadata: Extra metadata merged into each chunk's ``ChunkMetadata.extra``. Returns ------- list[Chunk] The chunks that were added to the vector store. """ doc = self._reader_registry.read(path) if metadata: doc.metadata.update(metadata) return self._process_document(doc)
[docs] def ingest_dir( self, directory: str | Path, pattern: str = "**/*", **metadata: Any, ) -> list[Chunk]: """Recursively ingest all supported files in a directory. Parameters ---------- directory: Root directory to scan. pattern: Glob pattern relative to *directory*. **metadata: Extra metadata merged into every chunk. Returns ------- list[Chunk] All chunks added across all ingested files. """ root = Path(directory) all_chunks: list[Chunk] = [] for file_path in root.glob(pattern): if not file_path.is_file(): continue if file_path.suffix.lower() not in self._reader_registry.supported_extensions: continue try: chunks = self.ingest(file_path, **metadata) all_chunks.extend(chunks) except Exception as exc: # Log and continue rather than aborting the entire batch import warnings warnings.warn(f"Failed to ingest {file_path}: {exc}", stacklevel=2) return all_chunks
[docs] def ingest_text( self, text: str, source: str = "manual", **metadata: Any, ) -> list[Chunk]: """Ingest a raw text string directly (no file needed). Parameters ---------- text: The text content to ingest. source: A label identifying the source (stored in metadata). **metadata: Extra metadata merged into each chunk. """ doc = Document(content=text, source=source, metadata=metadata) return self._process_document(doc)
[docs] async def aingest(self, path: str | Path, **metadata: Any) -> list[Chunk]: """Async variant of :meth:`ingest`.""" doc = self._reader_registry.read(path) if metadata: doc.metadata.update(metadata) return await self._aprocess_document(doc)
[docs] async def aingest_dir( self, directory: str | Path, pattern: str = "**/*", **metadata: Any, ) -> list[Chunk]: """Async variant of :meth:`ingest_dir`.""" root = Path(directory) tasks = [] for file_path in root.glob(pattern): if not file_path.is_file(): continue if file_path.suffix.lower() not in self._reader_registry.supported_extensions: continue tasks.append(self.aingest(file_path, **metadata)) results = await asyncio.gather(*tasks, return_exceptions=True) all_chunks: list[Chunk] = [] for r in results: if isinstance(r, list): all_chunks.extend(r) return all_chunks
[docs] async def aingest_text( self, text: str, source: str = "manual", **metadata: Any, ) -> list[Chunk]: """Async variant of :meth:`ingest_text`.""" doc = Document(content=text, source=source, metadata=metadata) return await self._aprocess_document(doc)
# ================================================================== # Retrieve # ==================================================================
[docs] def retrieve( self, query: str, top_k: int = 5, filters: dict[str, Any] | None = None, ) -> list[RetrievalResult]: """Embed *query* and retrieve the top-k most relevant chunks. Parameters ---------- query: Natural-language question or search phrase. top_k: Number of results to return. filters: Optional metadata filters (store-specific format). Returns ------- list[RetrievalResult] Ranked results (rank 1 = most relevant). """ embeddings = self._embedder.embed([query]) return self._store.search(embeddings[0], top_k=top_k, filters=filters)
[docs] async def aretrieve( self, query: str, top_k: int = 5, filters: dict[str, Any] | None = None, ) -> list[RetrievalResult]: """Async variant of :meth:`retrieve`.""" embeddings = await self._embedder.aembed([query]) return self._store.search(embeddings[0], top_k=top_k, filters=filters)
# ================================================================== # RAG Query (retrieve + generate) # ==================================================================
[docs] def query( self, question: str, top_k: int = 5, filters: dict[str, Any] | None = None, prompt: RactoPrompt | None = None, temperature: float = 0.0, max_tokens: int = 2048, ) -> RAGResponse: """Retrieve relevant chunks and generate an answer. Parameters ---------- question: The user's question. top_k: Number of context chunks to retrieve. filters: Optional metadata filters for retrieval. prompt: Override the default RACTO prompt for generation. temperature: LLM temperature (default ``0.0`` for factual answers). max_tokens: Maximum tokens in the generated answer. Returns ------- RAGResponse Contains the generated answer plus the retrieved source chunks. Raises ------ RuntimeError If no ``llm_kit`` was provided. """ self._require_llm_kit() sources = self.retrieve(question, top_k=top_k, filters=filters) context = self._build_context(sources) user_message = self._context_template.format(context=context, question=question) config = ChatConfig( user_message=user_message, prompt=prompt or self._default_prompt, temperature=temperature, max_tokens=max_tokens, ) llm_kit = self._require_llm_kit() answer: LLMResponse = llm_kit.chat(config) return RAGResponse( answer=answer, sources=sources, query=question, context_used=context, )
[docs] async def aquery( self, question: str, top_k: int = 5, filters: dict[str, Any] | None = None, prompt: RactoPrompt | None = None, temperature: float = 0.0, max_tokens: int = 2048, ) -> RAGResponse: """Async variant of :meth:`query`.""" self._require_llm_kit() sources = await self.aretrieve(question, top_k=top_k, filters=filters) context = self._build_context(sources) user_message = self._context_template.format(context=context, question=question) config = ChatConfig( user_message=user_message, prompt=prompt or self._default_prompt, temperature=temperature, max_tokens=max_tokens, ) llm_kit = self._require_llm_kit() answer: LLMResponse = await llm_kit.achat(config) return RAGResponse( answer=answer, sources=sources, query=question, context_used=context, )
# ================================================================== # Convenience properties # ================================================================== @property def store(self) -> BaseVectorStore: """The underlying vector store.""" return self._store @property def embedder(self) -> BaseEmbedder: """The underlying embedder.""" return self._embedder
[docs] def count(self) -> int: """Return the total number of indexed chunks.""" return self._store.count()
[docs] def clear(self) -> None: """Remove all indexed chunks from the vector store.""" self._store.clear()
# ================================================================== # Internal helpers # ================================================================== def _process_document(self, doc: Document) -> list[Chunk]: """Chunk → process → embed → store.""" chunks = self._chunker.chunk(doc) chunks = self._apply_processors(chunks) chunks = self._embed_chunks(chunks) self._store.add(chunks) return chunks async def _aprocess_document(self, doc: Document) -> list[Chunk]: """Async chunk → process → embed → store.""" chunks = self._chunker.chunk(doc) chunks = self._apply_processors(chunks) chunks = await self._aembed_chunks(chunks) self._store.add(chunks) return chunks def _apply_processors(self, chunks: list[Chunk]) -> list[Chunk]: if not self._processors: return chunks processed: list[Chunk] = [] for chunk in chunks: text = chunk.content for proc in self._processors: text = proc.process(text) processed.append(chunk.model_copy(update={"content": text})) return processed def _embed_chunks(self, chunks: list[Chunk]) -> list[Chunk]: texts = [c.content for c in chunks] embeddings = self._embedder.embed(texts) return [ chunk.model_copy(update={"embedding": emb}) for chunk, emb in zip(chunks, embeddings, strict=False) ] async def _aembed_chunks(self, chunks: list[Chunk]) -> list[Chunk]: texts = [c.content for c in chunks] embeddings = await self._embedder.aembed(texts) return [ chunk.model_copy(update={"embedding": emb}) for chunk, emb in zip(chunks, embeddings, strict=False) ] def _build_context(self, results: list[RetrievalResult]) -> str: parts: list[str] = [] for i, result in enumerate(results, start=1): source = result.chunk.metadata.source page = result.chunk.metadata.page loc = f" (page {page})" if page else "" parts.append(f"[{i}] Source: {source}{loc}\n{result.chunk.content}") return "\n\n".join(parts) def _require_llm_kit(self) -> Any: if self._llm_kit is None: raise RuntimeError( "RactoRAG.query() requires an llm_kit. " "Pass one at construction time: RactoRAG(..., llm_kit=kit)" ) return self._llm_kit