ractogateway.pipelines.video_processor.pipeline

VideoProcessorPipeline — main entry point.

Processes a video through five stages:
  1. Load — resolve source (path / URL / YouTube / bytes / frame list)

  2. Extract — sample frames with OpenCV, deduplicate by similarity

  3. Transcribe — extract audio, transcribe with chosen backend

  4. Analyse — pass frames to vision LLM (individual or grid mode)

  5. Summarise — produce comprehensive Markdown summary

  6. RAG store — optionally index everything via RactoRAG

Usage:

from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.video_processor import (
    VideoProcessorPipeline,
    DeduplicationMethod,
    TranscriberBackend,
)

kit = Chat(api_key="...", model="gpt-4o")

pipeline = VideoProcessorPipeline(
    kit=kit,
    fps=1.0,
    similarity_threshold=85.0,
    transcriber=TranscriberBackend.FASTER_WHISPER,
    transcriber_model="base",
    analyze_frames=True,
    generate_summary=True,
    safe_mode=True,
)

result = pipeline.run("lecture.mp4")
print(result.summary)
print(result.get_transcript_text())

# YouTube / URL / bytes also accepted:
result = pipeline.run("https://www.youtube.com/watch?v=...")
result = pipeline.run("https://example.com/video.mp4")
result = pipeline.run(video_bytes)           # bytes
result = pipeline.run(["frame1.jpg", ...])   # pre-extracted frames
class ractogateway.pipelines.video_processor.pipeline.VideoProcessorPipeline(kit, *, analysis_kit=None, summary_kit=None, transcriber=TranscriberBackend.FASTER_WHISPER, transcriber_model='base', transcriber_api_key=None, transcriber_base_url=None, fps=1.0, similarity_threshold=90.0, dedup_method=DeduplicationMethod.PHASH, max_frames=None, frame_format='JPEG', frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL, grid_size=4, batch_size=10, max_workers=4, max_process_workers=4, language=None, transcribe_audio=True, analyze_frames=True, generate_summary=True, processing_mode=VideoProcessingMode.ACTIVE, focus_time_seconds=None, window_seconds=5.0, rag_pipeline=None, safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Synchronous + asynchronous video processing pipeline.

Parameters:
  • kit (Any) – A RactoGateway developer kit (Chat) used for both frame analysis and summary generation unless analysis_kit or summary_kit are provided.

  • analysis_kit (Any) – Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to kit when not supplied.

  • summary_kit (Any) – Optional separate kit for summary generation (e.g. a larger model). Falls back to kit when not supplied.

  • transcriber (TranscriberBackend) – Which audio transcription backend to use.

  • transcriber_model (str) – Model name / size for the chosen backend.

  • transcriber_api_key (str | None) – API key for cloud transcription backends (or read from env vars).

  • transcriber_base_url (str | None) – Base URL for self-hosted endpoints (Ollama etc.).

  • fps (float) – Video frames to sample per second.

  • similarity_threshold (float) – Frames with similarity >= this % to the previous kept frame are discarded. E.g. 90.0 keeps frames that differ by more than 10 %.

  • dedup_method (DeduplicationMethod) – DeduplicationMethod.PHASH (fast, default) or DeduplicationMethod.SSIM (more accurate).

  • max_frames (int | None) – Hard cap on the number of kept frames (None = no cap).

  • frame_format (str) – "JPEG" (smaller, lossy) or "PNG" (lossless).

  • frame_analysis_mode (FrameAnalysisMode) – FrameAnalysisMode.INDIVIDUAL (one LLM call per frame, default) or FrameAnalysisMode.GRID (stitch into a collage).

  • grid_size (int) – Frames per grid collage (only used in GRID mode).

  • batch_size (int) – Concurrent LLM calls per batch during frame analysis.

  • max_workers (int) – Thread-pool size for concurrent LLM calls.

  • max_process_workers (int) – Process-pool size for CPU-bound frame extraction / hashing.

  • language (str | None) – BCP-47 language code for transcription (None = auto-detect).

  • transcribe_audio (bool) – Whether to extract and transcribe the audio track.

  • analyze_frames (bool) – Whether to pass frames to the vision LLM.

  • generate_summary (bool) – Whether to generate a comprehensive summary at the end.

  • rag_pipeline (Any) – An optional ractogateway.rag.pipeline.RactoRAG instance. When supplied and store_in_rag is True (or per-call), all extracted content is indexed for retrieval.

  • safe_mode (bool) – Catch all exceptions and return them in result.error instead of raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer for OTEL tracing.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]

Process source and return a VideoProcessorResult.

All keyword arguments override the constructor defaults for this call only. In safe_mode=True fatal stage errors are captured into result.failed_stage / result.stage_errors and the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured into result.stage_errors so the pipeline continues with whatever data is available.

Return type:

VideoProcessorResult

async arun(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]

Async variant of run().

Return type:

VideoProcessorResult

static parse_timestamp(value)[source]

Parse timestamp values like 130, "02:10", "2 mins 10 sec".

Return type:

float

answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Process video then answer a user question from extracted timeline context.

Return type:

VideoProcessorResult

async aanswer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Async variant of answer_question().

Return type:

VideoProcessorResult

class ractogateway.pipelines.video_processor.pipeline.AsyncVideoProcessorPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of VideoProcessorPipeline.

Exposes a single async run() method — suitable for FastAPI endpoints where you do not want a sync run() in the public API.

All constructor parameters are identical to VideoProcessorPipeline.

async run(source, **kwargs)[source]

Async-only process entrypoint.

Return type:

VideoProcessorResult

async answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Async-only variant of VideoProcessorPipeline.aanswer_question().

Return type:

VideoProcessorResult

static parse_timestamp(value)[source]

Delegate to VideoProcessorPipeline.parse_timestamp().

Return type:

float