Source code for ractogateway.pipelines.video_processor._rag

"""RactoRAG integration for VideoProcessorPipeline.

Stores extracted video content (visual analyses + transcript segments) into
the existing RactoRAG pipeline for subsequent Q&A retrieval.
"""

from __future__ import annotations

from typing import Any

from ._models import TranscriptSegment, VideoProcessorResult, VideoSection

# ---------------------------------------------------------------------------
# Lazy import guard
# ---------------------------------------------------------------------------


def _require_ractorag() -> Any:
    try:
        from ractogateway.rag.pipeline import RactoRAG
        return RactoRAG
    except ImportError as exc:
        raise ImportError(
            "RactoRAG is required for RAG storage. "
            "Install with: pip install ractogateway[rag]"
        ) from exc


# ---------------------------------------------------------------------------
# Document helpers
# ---------------------------------------------------------------------------


def _section_to_text(section: VideoSection, video_path: str) -> str:
    """Convert a VideoSection into a plain-text document for RAG ingestion."""
    parts: list[str] = [
        f"[Video: {video_path}]",
        f"[Time: {section.timestamp_start:.1f}s - {section.timestamp_end:.1f}s]",
    ]
    if section.visual_content:
        parts.append(f"Visual content:\n{section.visual_content}")
    if section.audio_content:
        parts.append(f"Audio / transcript:\n{section.audio_content}")
    return "\n\n".join(parts)


def _transcript_to_text(seg: TranscriptSegment, video_path: str) -> str:
    """Convert a single TranscriptSegment to a plain-text RAG document."""
    return (
        f"[Video: {video_path}]\n"
        f"[Time: {seg.start:.1f}s - {seg.end:.1f}s]\n"
        f"Transcript: {seg.text}"
    )


# ---------------------------------------------------------------------------
# Storage functions
# ---------------------------------------------------------------------------


[docs] def store_result_in_rag( result: VideoProcessorResult, rag_pipeline: Any, ) -> int: """Ingest all video sections and transcript segments into *rag_pipeline*. Parameters ---------- result: The completed :class:`VideoProcessorResult` to store. rag_pipeline: A :class:`ractogateway.rag.pipeline.RactoRAG` instance (or any object with a compatible ``add_text(text, metadata)`` interface). Returns ------- int Number of chunks successfully stored. """ _require_ractorag() # import guard / helpful error chunks_stored = 0 video_path = result.video_path # Store merged sections (visual + audio) — best for Q&A for section in result.sections: text = _section_to_text(section, video_path) metadata = { "source": video_path, "type": "video_section", "timestamp_start": section.timestamp_start, "timestamp_end": section.timestamp_end, "frame_ids": section.frame_ids, } try: rag_pipeline.add_text(text, metadata=metadata) chunks_stored += 1 except AttributeError: # Fallback: try add_document interface rag_pipeline.add_document(text, metadata=metadata) chunks_stored += 1 # Also store raw transcript segments for audio-only retrieval for seg in result.transcript: # Skip if already covered by a section if any( sec.timestamp_start <= seg.start and sec.timestamp_end >= seg.end for sec in result.sections ): continue text = _transcript_to_text(seg, video_path) metadata = { "source": video_path, "type": "transcript_segment", "timestamp_start": seg.start, "timestamp_end": seg.end, } try: rag_pipeline.add_text(text, metadata=metadata) chunks_stored += 1 except AttributeError: rag_pipeline.add_document(text, metadata=metadata) chunks_stored += 1 # Store summary as a single high-level document if result.summary: summary_text = ( f"[Video: {video_path}]\n" f"[Type: summary]\n\n" f"{result.summary}" ) metadata = { "source": video_path, "type": "video_summary", } try: rag_pipeline.add_text(summary_text, metadata=metadata) chunks_stored += 1 except AttributeError: rag_pipeline.add_document(summary_text, metadata=metadata) chunks_stored += 1 return chunks_stored
[docs] async def store_result_in_rag_async( result: VideoProcessorResult, rag_pipeline: Any, ) -> int: """Async variant of :func:`store_result_in_rag`. Falls back to the sync implementation if the rag_pipeline does not expose async add methods. """ # Check for async interface if hasattr(rag_pipeline, "aadd_text") or hasattr(rag_pipeline, "aadd_document"): chunks_stored = 0 video_path = result.video_path for section in result.sections: text = _section_to_text(section, video_path) metadata = { "source": video_path, "type": "video_section", "timestamp_start": section.timestamp_start, "timestamp_end": section.timestamp_end, "frame_ids": section.frame_ids, } add_fn = getattr(rag_pipeline, "aadd_text", None) or rag_pipeline.aadd_document await add_fn(text, metadata=metadata) chunks_stored += 1 for seg in result.transcript: if any( sec.timestamp_start <= seg.start and sec.timestamp_end >= seg.end for sec in result.sections ): continue text = _transcript_to_text(seg, video_path) metadata = { "source": video_path, "type": "transcript_segment", "timestamp_start": seg.start, "timestamp_end": seg.end, } add_fn = getattr(rag_pipeline, "aadd_text", None) or rag_pipeline.aadd_document await add_fn(text, metadata=metadata) chunks_stored += 1 if result.summary: summary_text = ( f"[Video: {video_path}]\n[Type: summary]\n\n{result.summary}" ) add_fn = getattr(rag_pipeline, "aadd_text", None) or rag_pipeline.aadd_document await add_fn(summary_text, metadata={"source": video_path, "type": "video_summary"}) chunks_stored += 1 return chunks_stored # Sync fallback return store_result_in_rag(result, rag_pipeline)