Source code for ractogateway.pipelines.video_processor.pipeline

"""VideoProcessorPipeline — main entry point.

Processes a video through five stages:
  1. Load         — resolve source (path / URL / YouTube / bytes / frame list)
  2. Extract      — sample frames with OpenCV, deduplicate by similarity
  3. Transcribe   — extract audio, transcribe with chosen backend
  4. Analyse      — pass frames to vision LLM (individual or grid mode)
  5. Summarise    — produce comprehensive Markdown summary
  6. RAG store    — optionally index everything via RactoRAG

Usage::

    from ractogateway.openai_developer_kit import Chat
    from ractogateway.pipelines.video_processor import (
        VideoProcessorPipeline,
        DeduplicationMethod,
        TranscriberBackend,
    )

    kit = Chat(api_key="...", model="gpt-4o")

    pipeline = VideoProcessorPipeline(
        kit=kit,
        fps=1.0,
        similarity_threshold=85.0,
        transcriber=TranscriberBackend.FASTER_WHISPER,
        transcriber_model="base",
        analyze_frames=True,
        generate_summary=True,
        safe_mode=True,
    )

    result = pipeline.run("lecture.mp4")
    print(result.summary)
    print(result.get_transcript_text())

    # YouTube / URL / bytes also accepted:
    result = pipeline.run("https://www.youtube.com/watch?v=...")
    result = pipeline.run("https://example.com/video.mp4")
    result = pipeline.run(video_bytes)           # bytes
    result = pipeline.run(["frame1.jpg", ...])   # pre-extracted frames
"""

from __future__ import annotations

import asyncio
import contextlib
import re
import traceback as _tb
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Any

from ._models import (
    DeduplicationMethod,
    FrameAnalysisMode,
    StageError,
    TranscriberBackend,
    VideoProcessingMode,
    VideoProcessorResult,
    VideoProcessorUsage,
    VideoRateLimitExceededError,
)

# Sentinel for "not supplied by caller"
_UNSET = object()


def _make_stage_error(stage: str, exc: BaseException) -> StageError:
    """Capture a stage failure with full traceback into a StageError."""
    return StageError(
        stage=stage,
        error_type=type(exc).__name__,
        message=str(exc),
        traceback=_tb.format_exc(),
    )


[docs] class VideoProcessorPipeline: """Synchronous + asynchronous video processing pipeline. Parameters ---------- kit: A RactoGateway developer kit (``Chat``) used for both frame analysis and summary generation unless *analysis_kit* or *summary_kit* are provided. analysis_kit: Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to *kit* when not supplied. summary_kit: Optional separate kit for summary generation (e.g. a larger model). Falls back to *kit* when not supplied. transcriber: Which audio transcription backend to use. transcriber_model: Model name / size for the chosen backend. transcriber_api_key: API key for cloud transcription backends (or read from env vars). transcriber_base_url: Base URL for self-hosted endpoints (Ollama etc.). fps: Video frames to sample per second. similarity_threshold: Frames with similarity >= this % to the previous kept frame are discarded. E.g. ``90.0`` keeps frames that differ by more than 10 %. dedup_method: :attr:`DeduplicationMethod.PHASH` (fast, default) or :attr:`DeduplicationMethod.SSIM` (more accurate). max_frames: Hard cap on the number of kept frames (``None`` = no cap). frame_format: ``"JPEG"`` (smaller, lossy) or ``"PNG"`` (lossless). frame_analysis_mode: :attr:`FrameAnalysisMode.INDIVIDUAL` (one LLM call per frame, default) or :attr:`FrameAnalysisMode.GRID` (stitch into a collage). grid_size: Frames per grid collage (only used in GRID mode). batch_size: Concurrent LLM calls per batch during frame analysis. max_workers: Thread-pool size for concurrent LLM calls. max_process_workers: Process-pool size for CPU-bound frame extraction / hashing. language: BCP-47 language code for transcription (``None`` = auto-detect). transcribe_audio: Whether to extract and transcribe the audio track. analyze_frames: Whether to pass frames to the vision LLM. generate_summary: Whether to generate a comprehensive summary at the end. rag_pipeline: An optional :class:`ractogateway.rag.pipeline.RactoRAG` instance. When supplied and *store_in_rag* is ``True`` (or per-call), all extracted content is indexed for retrieval. safe_mode: Catch all exceptions and return them in ``result.error`` instead of raising. tracer: Optional :class:`ractogateway.telemetry.RactoTracer` for OTEL tracing. metrics: Optional :class:`ractogateway.telemetry.GatewayMetricsMiddleware`. rate_limiter: Duck-typed rate limiter with ``check_and_consume(user_id, tokens)`` and ``get_remaining(user_id)`` methods. user_id: Default user identifier passed to the rate limiter. """ def __init__( self, kit: Any, *, analysis_kit: Any = None, summary_kit: Any = None, # Transcription transcriber: TranscriberBackend = TranscriberBackend.FASTER_WHISPER, transcriber_model: str = "base", transcriber_api_key: str | None = None, transcriber_base_url: str | None = None, # Frame extraction fps: float = 1.0, similarity_threshold: float = 90.0, dedup_method: DeduplicationMethod = DeduplicationMethod.PHASH, max_frames: int | None = None, frame_format: str = "JPEG", # Analysis frame_analysis_mode: FrameAnalysisMode = FrameAnalysisMode.INDIVIDUAL, grid_size: int = 4, batch_size: int = 10, max_workers: int = 4, max_process_workers: int = 4, language: str | None = None, # Feature flags transcribe_audio: bool = True, analyze_frames: bool = True, generate_summary: bool = True, # Processing scope processing_mode: VideoProcessingMode = VideoProcessingMode.ACTIVE, focus_time_seconds: float | None = None, window_seconds: float = 5.0, # Integrations rag_pipeline: Any = None, # Safety & observability safe_mode: bool = False, tracer: Any = None, metrics: Any = None, rate_limiter: Any = None, user_id: str = "default", ) -> None: self._kit = kit self._analysis_kit = analysis_kit or kit self._summary_kit = summary_kit or kit self._transcriber_backend = TranscriberBackend(transcriber) self._transcriber_model = transcriber_model self._transcriber_api_key = transcriber_api_key self._transcriber_base_url = transcriber_base_url self._fps = fps self._similarity_threshold = similarity_threshold self._dedup_method = DeduplicationMethod(dedup_method) self._max_frames = max_frames self._frame_format = frame_format self._frame_analysis_mode = FrameAnalysisMode(frame_analysis_mode) self._grid_size = grid_size self._batch_size = batch_size self._max_workers = max_workers self._max_process_workers = max_process_workers self._language = language self._transcribe_audio = transcribe_audio self._analyze_frames = analyze_frames self._generate_summary = generate_summary self._processing_mode = VideoProcessingMode(processing_mode) self._focus_time_seconds = focus_time_seconds self._window_seconds = window_seconds self._rag_pipeline = rag_pipeline self._safe_mode = safe_mode self._tracer = tracer self._metrics = metrics self._rate_limiter = rate_limiter self._user_id = user_id # Build transcriber once (loads model lazily) from ._transcriber import get_transcriber # noqa: PLC0415 self._transcriber_obj = get_transcriber( self._transcriber_backend, self._transcriber_model, self._transcriber_api_key, self._transcriber_base_url, ) # ── Public sync API ──────────────────────────────────────────────────────
[docs] def run( self, source: str | Path | bytes | list, *, # Per-call overrides fps: Any = _UNSET, similarity_threshold: Any = _UNSET, dedup_method: Any = _UNSET, max_frames: Any = _UNSET, analyze_frames: Any = _UNSET, frame_analysis_mode: Any = _UNSET, grid_size: Any = _UNSET, batch_size: Any = _UNSET, transcribe_audio: Any = _UNSET, language: Any = _UNSET, generate_summary: Any = _UNSET, processing_mode: Any = _UNSET, focus_time_seconds: Any = _UNSET, window_seconds: Any = _UNSET, store_in_rag: bool = False, user_id: Any = _UNSET, ) -> VideoProcessorResult: """Process *source* and return a :class:`VideoProcessorResult`. All keyword arguments override the constructor defaults for this call only. In ``safe_mode=True`` fatal stage errors are captured into ``result.failed_stage`` / ``result.stage_errors`` and the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured into ``result.stage_errors`` so the pipeline continues with whatever data is available. """ return self._run_pipeline(source, locals())
[docs] async def arun( self, source: str | Path | bytes | list, *, fps: Any = _UNSET, similarity_threshold: Any = _UNSET, dedup_method: Any = _UNSET, max_frames: Any = _UNSET, analyze_frames: Any = _UNSET, frame_analysis_mode: Any = _UNSET, grid_size: Any = _UNSET, batch_size: Any = _UNSET, transcribe_audio: Any = _UNSET, language: Any = _UNSET, generate_summary: Any = _UNSET, processing_mode: Any = _UNSET, focus_time_seconds: Any = _UNSET, window_seconds: Any = _UNSET, store_in_rag: bool = False, user_id: Any = _UNSET, ) -> VideoProcessorResult: """Async variant of :meth:`run`.""" return await self._arun_pipeline(source, locals())
# ── Config resolution ──────────────────────────────────────────────────── def _resolve(self, kwargs: dict[str, Any], name: str, default: Any) -> Any: val = kwargs.get(name, _UNSET) return default if val is _UNSET else val def _resolved_config(self, kwargs: dict[str, Any]) -> dict[str, Any]: return { "fps": self._resolve(kwargs, "fps", self._fps), "similarity_threshold": self._resolve( kwargs, "similarity_threshold", self._similarity_threshold ), "dedup_method": DeduplicationMethod( self._resolve(kwargs, "dedup_method", self._dedup_method) ), "max_frames": self._resolve(kwargs, "max_frames", self._max_frames), "analyze_frames": self._resolve( kwargs, "analyze_frames", self._analyze_frames ), "frame_analysis_mode": FrameAnalysisMode( self._resolve(kwargs, "frame_analysis_mode", self._frame_analysis_mode) ), "grid_size": self._resolve(kwargs, "grid_size", self._grid_size), "batch_size": self._resolve(kwargs, "batch_size", self._batch_size), "transcribe_audio": self._resolve( kwargs, "transcribe_audio", self._transcribe_audio ), "language": self._resolve(kwargs, "language", self._language), "generate_summary": self._resolve( kwargs, "generate_summary", self._generate_summary ), "processing_mode": VideoProcessingMode( self._resolve(kwargs, "processing_mode", self._processing_mode) ), "focus_time_seconds": self._resolve( kwargs, "focus_time_seconds", self._focus_time_seconds ), "window_seconds": self._resolve( kwargs, "window_seconds", self._window_seconds ), "store_in_rag": kwargs.get("store_in_rag", False), "user_id": self._resolve(kwargs, "user_id", self._user_id), } # ── Rate limiter ───────────────────────────────────────────────────────── def _check_rate_limit(self, user_id: str, tokens: int = 1) -> None: if self._rate_limiter is None: return allowed = self._rate_limiter.check_and_consume(user_id, tokens) if not allowed: raise VideoRateLimitExceededError( f"Rate limit exceeded for user '{user_id}'. " f"Remaining: {self._rate_limiter.get_remaining(user_id)}" ) # ── Source label helper ────────────────────────────────────────────────── @staticmethod def _source_label(source: str | Path | bytes | list) -> str: if isinstance(source, (bytes, bytearray)): return "<bytes>" if isinstance(source, list): return f"<{len(source)} pre-extracted frames>" return str(source)
[docs] @staticmethod def parse_timestamp(value: float | int | str) -> float: """Parse timestamp values like ``130``, ``"02:10"``, ``"2 mins 10 sec"``.""" if isinstance(value, (int, float)): if float(value) < 0: raise ValueError("timestamp must be >= 0.") return float(value) text = value.strip().lower() if not text: raise ValueError("timestamp string cannot be empty.") # HH:MM:SS or MM:SS if ":" in text: parts = text.split(":") if len(parts) == 2: minutes, seconds = parts return float(minutes) * 60.0 + float(seconds) if len(parts) == 3: hours, minutes, seconds = parts return float(hours) * 3600.0 + float(minutes) * 60.0 + float(seconds) raise ValueError(f"Unsupported timestamp format: {value!r}") # Human-readable units: "2 mins 10 sec", "1h 3m", "90s" total = 0.0 for num, unit in re.findall( r"(\d+(?:\.\d+)?)\s*(h|hr|hrs|hour|hours|m|min|mins|minute|minutes|s|sec|secs|second|seconds)", text, ): val = float(num) if unit.startswith("h"): total += val * 3600.0 elif unit.startswith("m"): total += val * 60.0 else: total += val if total > 0: return total # Plain numeric string -> seconds return float(text)
def _resolve_window(self, cfg: dict[str, Any]) -> tuple[float | None, float | None]: mode = cfg["processing_mode"] if mode == VideoProcessingMode.ACTIVE: return None, None focus = cfg["focus_time_seconds"] if focus is None: raise ValueError( "focus_time_seconds is required when processing_mode='passive'." ) focus_s = self.parse_timestamp(focus) if focus_s < 0: raise ValueError("focus_time_seconds must be >= 0.") window = float(cfg["window_seconds"]) if window <= 0: raise ValueError("window_seconds must be > 0.") start = max(0.0, focus_s - window) end = focus_s + window return start, end @staticmethod def _shift_result_timestamps( result: VideoProcessorResult, *, offset_seconds: float, ) -> VideoProcessorResult: """Shift frame/transcript/section timestamps by an absolute offset.""" if offset_seconds == 0: return result frames = [ f.model_copy(update={"timestamp": f.timestamp + offset_seconds}) for f in result.frames ] transcript = [ seg.model_copy( update={ "start": seg.start + offset_seconds, "end": seg.end + offset_seconds, } ) for seg in result.transcript ] sections = [ sec.model_copy( update={ "timestamp_start": sec.timestamp_start + offset_seconds, "timestamp_end": sec.timestamp_end + offset_seconds, } ) for sec in result.sections ] return result.model_copy( update={ "frames": frames, "transcript": transcript, "sections": sections, } ) # ── Sync pipeline execution ────────────────────────────────────────────── def _fatal_stage_result( self, label: str, usage: VideoProcessorUsage, stage_errors: list[StageError], err: StageError, ) -> VideoProcessorResult: """Return a partial result when a fatal stage fails in safe_mode.""" all_errors = [*stage_errors, err] return VideoProcessorResult( video_path=label, usage=usage, failed_stage=err.stage, stage_errors=all_errors, error=str(err), ) def _run_pipeline( self, source: str | Path | bytes | list, call_kwargs: dict[str, Any], ) -> VideoProcessorResult: from ._analyzer import analyze_frames_sync # noqa: PLC0415 from ._extractor import ( # noqa: PLC0415 deduplicate_frames, extract_frames, extract_frames_window, load_frames_from_paths, ) from ._loader import resolve_video_source # noqa: PLC0415 from ._rag import store_result_in_rag # noqa: PLC0415 from ._summarizer import build_sections, generate_summary_sync # noqa: PLC0415 from ._transcriber import align_frames_to_transcript # noqa: PLC0415 cfg = self._resolved_config(call_kwargs) label = self._source_label(source) usage = VideoProcessorUsage() stage_errors: list[StageError] = [] processing_mode: VideoProcessingMode = cfg["processing_mode"] window_start_seconds: float | None = None window_end_seconds: float | None = None try: window_start_seconds, window_end_seconds = self._resolve_window(cfg) except Exception as exc: err = _make_stage_error("config", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise try: self._check_rate_limit(cfg["user_id"]) except Exception as exc: err = _make_stage_error("rate_limit", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise # ── 1. Resolve source ── FATAL: nothing to process without a source ── try: video_path, frame_paths = resolve_video_source(source) except Exception as exc: err = _make_stage_error("load", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise if ( processing_mode == VideoProcessingMode.PASSIVE and frame_paths is not None ): exc = ValueError( "processing_mode='passive' does not support pre-extracted frame lists. " "Provide a video file/URL/bytes source." ) err = _make_stage_error("load", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise exc # ── 2. Extract frames ── FATAL: no frames = pipeline is meaningless ── try: if frame_paths is not None: raw_frames = load_frames_from_paths( frame_paths, frame_format=self._frame_format ) elif window_start_seconds is not None: raw_frames = extract_frames_window( video_path, # type: ignore[arg-type] fps=cfg["fps"], max_frames=cfg["max_frames"], frame_format=self._frame_format, start_time_seconds=window_start_seconds, end_time_seconds=window_end_seconds, ) else: raw_frames = extract_frames( video_path, # type: ignore[arg-type] fps=cfg["fps"], max_frames=cfg["max_frames"], frame_format=self._frame_format, max_process_workers=self._max_process_workers, ) except Exception as exc: err = _make_stage_error("extract", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise RuntimeError(f"[Stage: extract] {err.error_type}: {err.message}") from exc usage.frames_extracted = len(raw_frames) # ── 3. Deduplicate ── FATAL: corrupted frame list is unrecoverable ─── try: frames = deduplicate_frames( raw_frames, similarity_threshold=cfg["similarity_threshold"], method=cfg["dedup_method"], ) except Exception as exc: err = _make_stage_error("deduplicate", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise RuntimeError( f"[Stage: deduplicate] {err.error_type}: {err.message}" ) from exc usage.frames_kept = sum(1 for f in frames if f.kept) usage.frames_discarded = sum(1 for f in frames if not f.kept) # ── 4. Transcription ── NON-FATAL: video still useful without audio ── transcript: list = [] if cfg["transcribe_audio"] and video_path is not None: try: with ThreadPoolExecutor(max_workers=1) as pool: fut = pool.submit( self._run_transcription, video_path, cfg["language"], usage, window_start_seconds, window_end_seconds, window_start_seconds or 0.0, ) transcript = fut.result() transcript = align_frames_to_transcript(frames, transcript) except Exception as exc: err = _make_stage_error("transcribe", exc) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: transcribe] {err.error_type}: {err.message}" ) from exc # ── 5. Frame analysis ── NON-FATAL: frames still kept, just no LLM text if cfg["analyze_frames"]: try: frames = analyze_frames_sync( frames, self._analysis_kit, mode=cfg["frame_analysis_mode"], batch_size=cfg["batch_size"], max_workers=self._max_workers, grid_size=cfg["grid_size"], usage=usage, ) except Exception as exc: err = _make_stage_error("analyze", exc) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: analyze] {err.error_type}: {err.message}" ) from exc # ── 6. Build sections ── NON-FATAL: result still has frames + transcript sections: list = [] try: sections = build_sections(frames, transcript) except Exception as exc: err = _make_stage_error("build_sections", exc) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: build_sections] {err.error_type}: {err.message}" ) from exc # ── 7. Summary ── NON-FATAL: rest of result is still valid ─────────── summary: str | None = None if cfg["generate_summary"]: try: summary = generate_summary_sync( sections, transcript, self._summary_kit, usage ) except Exception as exc: err = _make_stage_error("summarize", exc) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: summarize] {err.error_type}: {err.message}" ) from exc # ── 8. Build result ────────────────────────────────────────────────── top_error = str(stage_errors[0]) if stage_errors else None result = VideoProcessorResult( video_path=label, frames=frames, transcript=transcript, sections=sections, summary=summary, usage=usage, stage_errors=stage_errors, error=top_error, processing_mode=processing_mode, window_start_seconds=window_start_seconds, window_end_seconds=window_end_seconds, ) # ── 9. RAG storage ── NON-FATAL: indexing failure must not lose result if cfg["store_in_rag"] and self._rag_pipeline is not None: try: count = store_result_in_rag(result, self._rag_pipeline) result = result.model_copy( update={"rag_stored": True, "rag_chunk_count": count} ) except Exception as exc: err = _make_stage_error("rag_store", exc) if self._safe_mode: result = result.model_copy( update={ "stage_errors": [*result.stage_errors, err], "error": result.error or str(err), } ) else: raise RuntimeError( f"[Stage: rag_store] {err.error_type}: {err.message}" ) from exc return result def _run_transcription( self, video_path: Path, language: str | None, usage: VideoProcessorUsage, start_time_seconds: float | None = None, end_time_seconds: float | None = None, time_offset_seconds: float = 0.0, ) -> list: """Run audio extraction + transcription in a thread.""" from ._transcriber import extract_audio, get_audio_duration # noqa: PLC0415 audio_path = extract_audio( video_path, start_time_seconds=start_time_seconds, end_time_seconds=end_time_seconds, ) with contextlib.suppress(Exception): usage.audio_duration_seconds = get_audio_duration(audio_path) segments = self._transcriber_obj.transcribe(audio_path, language) if time_offset_seconds: segments = [ seg.model_copy( update={ "start": seg.start + time_offset_seconds, "end": seg.end + time_offset_seconds, } ) for seg in segments ] return segments # ── Async pipeline execution ───────────────────────────────────────────── @staticmethod def _build_qa_context( result: VideoProcessorResult, *, max_context_chars: int, ) -> str: lines: list[str] = [] if result.sections: for sec in result.sections: lines.append( f"[{sec.timestamp_start:.1f}s - {sec.timestamp_end:.1f}s]" ) if sec.visual_content: lines.append(f"VISUAL: {sec.visual_content}") if sec.audio_content: lines.append(f"AUDIO: {sec.audio_content}") lines.append("") elif result.transcript: for seg in result.transcript: lines.append(f"[{seg.start:.1f}s - {seg.end:.1f}s] {seg.text}") else: for frame in result.frames: if frame.kept and frame.analysis: lines.append(f"[{frame.timestamp:.1f}s] {frame.analysis}") context = "\n".join(lines).strip() if len(context) > max_context_chars: context = context[:max_context_chars] + "\n\n[context truncated]" return context def _qa_answer_sync( self, *, question: str, context: str, ) -> tuple[str, dict[str, int]]: from ractogateway._models.chat import ChatConfig # noqa: PLC0415 from ractogateway.prompts.engine import RactoPrompt # noqa: PLC0415 prompt = RactoPrompt( role="video question-answering specialist", aim=( "Answer the user question using only the provided video timeline context. " "Always anchor claims to timestamp evidence." ), constraints=[ "Do not fabricate details not present in context.", "Cite timestamp evidence when available.", "If context is insufficient, explicitly say so.", ], tone="Professional and precise.", output_format="Markdown with sections: Answer, Evidence, Confidence", context=context, ) try: resp = self._summary_kit.chat( ChatConfig( user_message=question, prompt=prompt, temperature=0.0, max_tokens=900, ) ) except TypeError: resp = self._summary_kit.chat(prompt=prompt) return (resp.content or "").strip(), (resp.usage or {}) async def _qa_answer_async( self, *, question: str, context: str, ) -> tuple[str, dict[str, int]]: from ractogateway._models.chat import ChatConfig # noqa: PLC0415 from ractogateway.prompts.engine import RactoPrompt # noqa: PLC0415 prompt = RactoPrompt( role="video question-answering specialist", aim=( "Answer the user question using only the provided video timeline context. " "Always anchor claims to timestamp evidence." ), constraints=[ "Do not fabricate details not present in context.", "Cite timestamp evidence when available.", "If context is insufficient, explicitly say so.", ], tone="Professional and precise.", output_format="Markdown with sections: Answer, Evidence, Confidence", context=context, ) try: resp = await self._summary_kit.achat( ChatConfig( user_message=question, prompt=prompt, temperature=0.0, max_tokens=900, ) ) except TypeError: resp = await self._summary_kit.achat(prompt=prompt) return (resp.content or "").strip(), (resp.usage or {})
[docs] def answer_question( self, source: str | Path | bytes | list, *, question: str, processing_mode: VideoProcessingMode | str = VideoProcessingMode.ACTIVE, focus_time: float | int | str | None = None, window_seconds: float = 5.0, max_context_chars: int = 40_000, **run_kwargs: Any, ) -> VideoProcessorResult: """Process video then answer a user question from extracted timeline context.""" if not question.strip(): raise ValueError("question cannot be empty.") mode = VideoProcessingMode(processing_mode) run_kwargs.setdefault("analyze_frames", True) run_kwargs.setdefault("transcribe_audio", True) run_kwargs.setdefault("generate_summary", False) run_kwargs["processing_mode"] = mode run_kwargs["window_seconds"] = window_seconds if focus_time is not None: run_kwargs["focus_time_seconds"] = self.parse_timestamp(focus_time) result = self.run(source, **run_kwargs) if result.failed_stage is not None: return result.model_copy(update={"question": question}) context = self._build_qa_context(result, max_context_chars=max_context_chars) if not context: return result.model_copy( update={ "question": question, "answer": "No analyzable context was extracted from this video run.", } ) try: answer, usage = self._qa_answer_sync(question=question, context=context) updated_usage = result.usage.model_copy( update={ "summary_input_tokens": ( result.usage.summary_input_tokens + int(usage.get("prompt_tokens", 0)) ), "summary_output_tokens": ( result.usage.summary_output_tokens + int(usage.get("completion_tokens", 0)) ), } ) return result.model_copy( update={ "question": question, "answer": answer, "usage": updated_usage, } ) except Exception as exc: err = _make_stage_error("answer", exc) if self._safe_mode: return result.model_copy( update={ "question": question, "stage_errors": [*result.stage_errors, err], "error": result.error or str(err), } ) raise RuntimeError( f"[Stage: answer] {err.error_type}: {err.message}" ) from exc
[docs] async def aanswer_question( self, source: str | Path | bytes | list, *, question: str, processing_mode: VideoProcessingMode | str = VideoProcessingMode.ACTIVE, focus_time: float | int | str | None = None, window_seconds: float = 5.0, max_context_chars: int = 40_000, **run_kwargs: Any, ) -> VideoProcessorResult: """Async variant of :meth:`answer_question`.""" if not question.strip(): raise ValueError("question cannot be empty.") mode = VideoProcessingMode(processing_mode) run_kwargs.setdefault("analyze_frames", True) run_kwargs.setdefault("transcribe_audio", True) run_kwargs.setdefault("generate_summary", False) run_kwargs["processing_mode"] = mode run_kwargs["window_seconds"] = window_seconds if focus_time is not None: run_kwargs["focus_time_seconds"] = self.parse_timestamp(focus_time) result = await self.arun(source, **run_kwargs) if result.failed_stage is not None: return result.model_copy(update={"question": question}) context = self._build_qa_context(result, max_context_chars=max_context_chars) if not context: return result.model_copy( update={ "question": question, "answer": "No analyzable context was extracted from this video run.", } ) try: answer, usage = await self._qa_answer_async(question=question, context=context) updated_usage = result.usage.model_copy( update={ "summary_input_tokens": ( result.usage.summary_input_tokens + int(usage.get("prompt_tokens", 0)) ), "summary_output_tokens": ( result.usage.summary_output_tokens + int(usage.get("completion_tokens", 0)) ), } ) return result.model_copy( update={ "question": question, "answer": answer, "usage": updated_usage, } ) except Exception as exc: err = _make_stage_error("answer", exc) if self._safe_mode: return result.model_copy( update={ "question": question, "stage_errors": [*result.stage_errors, err], "error": result.error or str(err), } ) raise RuntimeError( f"[Stage: answer] {err.error_type}: {err.message}" ) from exc
async def _arun_pipeline( self, source: str | Path | bytes | list, call_kwargs: dict[str, Any], ) -> VideoProcessorResult: from ._analyzer import analyze_frames_async # noqa: PLC0415 from ._extractor import ( # noqa: PLC0415 deduplicate_frames_fast, extract_frames, extract_frames_window, load_frames_from_paths, ) from ._loader import resolve_video_source # noqa: PLC0415 from ._rag import store_result_in_rag_async # noqa: PLC0415 from ._summarizer import build_sections, generate_summary_async # noqa: PLC0415 from ._transcriber import align_frames_to_transcript # noqa: PLC0415 cfg = self._resolved_config(call_kwargs) label = self._source_label(source) usage = VideoProcessorUsage() stage_errors: list[StageError] = [] # Python 3.10+: get_running_loop() is safe inside a coroutine; no deprecation. loop = asyncio.get_running_loop() processing_mode: VideoProcessingMode = cfg["processing_mode"] window_start_seconds: float | None = None window_end_seconds: float | None = None try: window_start_seconds, window_end_seconds = self._resolve_window(cfg) except Exception as exc: err = _make_stage_error("config", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise try: self._check_rate_limit(cfg["user_id"]) except Exception as exc: err = _make_stage_error("rate_limit", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise # ── 1. Resolve source — non-blocking via to_thread ── FATAL ────────── try: video_path, frame_paths = await asyncio.to_thread( resolve_video_source, source ) except Exception as exc: err = _make_stage_error("load", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise # ── 2. Extract frames — CPU-bound in thread pool ── FATAL ───────────── def _extract() -> list: if frame_paths is not None: if processing_mode == VideoProcessingMode.PASSIVE: raise ValueError( "processing_mode='passive' does not support pre-extracted frame lists. " "Provide a video file/URL/bytes source." ) return load_frames_from_paths( frame_paths, frame_format=self._frame_format ) if window_start_seconds is not None: return extract_frames_window( video_path, # type: ignore[arg-type] fps=cfg["fps"], max_frames=cfg["max_frames"], frame_format=self._frame_format, start_time_seconds=window_start_seconds, end_time_seconds=window_end_seconds, ) return extract_frames( video_path, # type: ignore[arg-type] fps=cfg["fps"], max_frames=cfg["max_frames"], frame_format=self._frame_format, max_process_workers=self._max_process_workers, ) try: with ThreadPoolExecutor(max_workers=self._max_process_workers) as pool: raw_frames = await loop.run_in_executor(pool, _extract) except Exception as exc: err = _make_stage_error("extract", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise RuntimeError(f"[Stage: extract] {err.error_type}: {err.message}") from exc usage.frames_extracted = len(raw_frames) # ── 3. Deduplicate — parallel pHash pre-computation ── FATAL ────────── # deduplicate_frames_fast computes all hashes in parallel via # ThreadPoolExecutor then does O(1) XOR+popcount comparisons. # Falls back to sequential SSIM path when method=SSIM. try: frames = await asyncio.to_thread( deduplicate_frames_fast, raw_frames, similarity_threshold=cfg["similarity_threshold"], method=cfg["dedup_method"], max_hash_workers=self._max_process_workers, ) except Exception as exc: err = _make_stage_error("deduplicate", exc) if self._safe_mode: return self._fatal_stage_result(label, usage, stage_errors, err) raise RuntimeError( f"[Stage: deduplicate] {err.error_type}: {err.message}" ) from exc usage.frames_kept = sum(1 for f in frames if f.kept) usage.frames_discarded = sum(1 for f in frames if not f.kept) # ── 4+5. Transcription || Frame analysis — RUN CONCURRENTLY ─────────── # Transcription (blocking I/O in a thread) and LLM analysis (async # network I/O via Semaphore-gated asyncio.gather) are independent after # deduplication completes. Running them in parallel halves wall-clock # time for typical lecture videos where each takes 1-10+ minutes. async def _transcribe_stage() -> list: if not cfg["transcribe_audio"] or video_path is None: return [] return await asyncio.to_thread( self._run_transcription, video_path, cfg["language"], usage, window_start_seconds, window_end_seconds, window_start_seconds or 0.0, ) async def _analyze_stage(current_frames: list) -> list: if not cfg["analyze_frames"]: return current_frames return await analyze_frames_async( current_frames, self._analysis_kit, mode=cfg["frame_analysis_mode"], batch_size=cfg["batch_size"], max_workers=self._max_workers, grid_size=cfg["grid_size"], usage=usage, ) transcript_result, frames_result = await asyncio.gather( _transcribe_stage(), _analyze_stage(frames), return_exceptions=True, ) # Handle transcription result (NON-FATAL) transcript: list = [] if isinstance(transcript_result, BaseException): err = _make_stage_error("transcribe", transcript_result) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: transcribe] {err.error_type}: {err.message}" ) from transcript_result else: transcript = transcript_result transcript = align_frames_to_transcript(frames, transcript) # Handle analysis result (NON-FATAL) if isinstance(frames_result, BaseException): err = _make_stage_error("analyze", frames_result) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: analyze] {err.error_type}: {err.message}" ) from frames_result else: frames = frames_result # ── 6. Build sections — NON-FATAL ───────────────────────────────────── sections: list = [] try: sections = build_sections(frames, transcript) except Exception as exc: err = _make_stage_error("build_sections", exc) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: build_sections] {err.error_type}: {err.message}" ) from exc # ── 7. Summary — NON-FATAL ──────────────────────────────────────────── summary: str | None = None if cfg["generate_summary"]: try: summary = await generate_summary_async( sections, transcript, self._summary_kit, usage ) except Exception as exc: err = _make_stage_error("summarize", exc) stage_errors.append(err) if not self._safe_mode: raise RuntimeError( f"[Stage: summarize] {err.error_type}: {err.message}" ) from exc # ── 8. Build result ─────────────────────────────────────────────────── top_error = str(stage_errors[0]) if stage_errors else None result = VideoProcessorResult( video_path=label, frames=frames, transcript=transcript, sections=sections, summary=summary, usage=usage, stage_errors=stage_errors, error=top_error, processing_mode=processing_mode, window_start_seconds=window_start_seconds, window_end_seconds=window_end_seconds, ) # ── 9. RAG storage — NON-FATAL ──────────────────────────────────────── if cfg["store_in_rag"] and self._rag_pipeline is not None: try: count = await store_result_in_rag_async(result, self._rag_pipeline) result = result.model_copy( update={"rag_stored": True, "rag_chunk_count": count} ) except Exception as exc: err = _make_stage_error("rag_store", exc) if self._safe_mode: result = result.model_copy( update={ "stage_errors": [*result.stage_errors, err], "error": result.error or str(err), } ) else: raise RuntimeError( f"[Stage: rag_store] {err.error_type}: {err.message}" ) from exc return result
# --------------------------------------------------------------------------- # Async-only variant (for FastAPI / async servers) # ---------------------------------------------------------------------------
[docs] class AsyncVideoProcessorPipeline: """Async-only variant of :class:`VideoProcessorPipeline`. Exposes a single ``async run()`` method — suitable for FastAPI endpoints where you do not want a sync ``run()`` in the public API. All constructor parameters are identical to :class:`VideoProcessorPipeline`. """ def __init__(self, *args: Any, **kwargs: Any) -> None: self._inner = VideoProcessorPipeline(*args, **kwargs)
[docs] async def run( self, source: str | Path | bytes | list, **kwargs: Any, ) -> VideoProcessorResult: """Async-only process entrypoint.""" return await self._inner.arun(source, **kwargs)
[docs] async def answer_question( self, source: str | Path | bytes | list, *, question: str, processing_mode: VideoProcessingMode | str = VideoProcessingMode.ACTIVE, focus_time: float | int | str | None = None, window_seconds: float = 5.0, max_context_chars: int = 40_000, **run_kwargs: Any, ) -> VideoProcessorResult: """Async-only variant of :meth:`VideoProcessorPipeline.aanswer_question`.""" return await self._inner.aanswer_question( source, question=question, processing_mode=processing_mode, focus_time=focus_time, window_seconds=window_seconds, max_context_chars=max_context_chars, **run_kwargs, )
[docs] @staticmethod def parse_timestamp(value: float | int | str) -> float: """Delegate to :meth:`VideoProcessorPipeline.parse_timestamp`.""" return VideoProcessorPipeline.parse_timestamp(value)