ractogateway.pipelines.video_processor

VideoProcessorPipeline — process tutorial/lecture videos for RAG & Q&A.

Quick start:

from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.video_processor import (
    VideoProcessorPipeline,
    TranscriberBackend,
    DeduplicationMethod,
)

kit = Chat(api_key="sk-...", model="gpt-4o")

pipeline = VideoProcessorPipeline(
    kit=kit,
    fps=1.0,
    similarity_threshold=85.0,
    transcriber=TranscriberBackend.FASTER_WHISPER,
    transcriber_model="base",
    analyze_frames=True,
    generate_summary=True,
)

# Accepts: local path, URL, YouTube link, bytes buffer, or pre-extracted frames
result = pipeline.run("lecture.mp4")
print(result.summary)
result.to_markdown("report.md")

Install:

pip install ractogateway[pipelines-video]           # core (OpenCV, pHash, ffmpeg)
pip install ractogateway[pipelines-video-whisper]   # + faster-whisper
pip install ractogateway[pipelines-video-full]      # all of the above
pip install ractogateway[pipelines-video-yt]        # + yt-dlp (YouTube support)
class ractogateway.pipelines.video_processor.AsyncVideoProcessorPipeline(*args, **kwargs)[source]

Bases: object

Async-only variant of VideoProcessorPipeline.

Exposes a single async run() method — suitable for FastAPI endpoints where you do not want a sync run() in the public API.

All constructor parameters are identical to VideoProcessorPipeline.

async run(source, **kwargs)[source]

Async-only process entrypoint.

Return type:

VideoProcessorResult

async answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Async-only variant of VideoProcessorPipeline.aanswer_question().

Return type:

VideoProcessorResult

static parse_timestamp(value)[source]

Delegate to VideoProcessorPipeline.parse_timestamp().

Return type:

float

class ractogateway.pipelines.video_processor.VideoProcessorPipeline(kit, *, analysis_kit=None, summary_kit=None, transcriber=TranscriberBackend.FASTER_WHISPER, transcriber_model='base', transcriber_api_key=None, transcriber_base_url=None, fps=1.0, similarity_threshold=90.0, dedup_method=DeduplicationMethod.PHASH, max_frames=None, frame_format='JPEG', frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL, grid_size=4, batch_size=10, max_workers=4, max_process_workers=4, language=None, transcribe_audio=True, analyze_frames=True, generate_summary=True, processing_mode=VideoProcessingMode.ACTIVE, focus_time_seconds=None, window_seconds=5.0, rag_pipeline=None, safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]

Bases: object

Synchronous + asynchronous video processing pipeline.

Parameters:
  • kit (Any) – A RactoGateway developer kit (Chat) used for both frame analysis and summary generation unless analysis_kit or summary_kit are provided.

  • analysis_kit (Any) – Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to kit when not supplied.

  • summary_kit (Any) – Optional separate kit for summary generation (e.g. a larger model). Falls back to kit when not supplied.

  • transcriber (TranscriberBackend) – Which audio transcription backend to use.

  • transcriber_model (str) – Model name / size for the chosen backend.

  • transcriber_api_key (str | None) – API key for cloud transcription backends (or read from env vars).

  • transcriber_base_url (str | None) – Base URL for self-hosted endpoints (Ollama etc.).

  • fps (float) – Video frames to sample per second.

  • similarity_threshold (float) – Frames with similarity >= this % to the previous kept frame are discarded. E.g. 90.0 keeps frames that differ by more than 10 %.

  • dedup_method (DeduplicationMethod) – DeduplicationMethod.PHASH (fast, default) or DeduplicationMethod.SSIM (more accurate).

  • max_frames (int | None) – Hard cap on the number of kept frames (None = no cap).

  • frame_format (str) – "JPEG" (smaller, lossy) or "PNG" (lossless).

  • frame_analysis_mode (FrameAnalysisMode) – FrameAnalysisMode.INDIVIDUAL (one LLM call per frame, default) or FrameAnalysisMode.GRID (stitch into a collage).

  • grid_size (int) – Frames per grid collage (only used in GRID mode).

  • batch_size (int) – Concurrent LLM calls per batch during frame analysis.

  • max_workers (int) – Thread-pool size for concurrent LLM calls.

  • max_process_workers (int) – Process-pool size for CPU-bound frame extraction / hashing.

  • language (str | None) – BCP-47 language code for transcription (None = auto-detect).

  • transcribe_audio (bool) – Whether to extract and transcribe the audio track.

  • analyze_frames (bool) – Whether to pass frames to the vision LLM.

  • generate_summary (bool) – Whether to generate a comprehensive summary at the end.

  • rag_pipeline (Any) – An optional ractogateway.rag.pipeline.RactoRAG instance. When supplied and store_in_rag is True (or per-call), all extracted content is indexed for retrieval.

  • safe_mode (bool) – Catch all exceptions and return them in result.error instead of raising.

  • tracer (Any) – Optional ractogateway.telemetry.RactoTracer for OTEL tracing.

  • metrics (Any) – Optional ractogateway.telemetry.GatewayMetricsMiddleware.

  • rate_limiter (Any) – Duck-typed rate limiter with check_and_consume(user_id, tokens) and get_remaining(user_id) methods.

  • user_id (str) – Default user identifier passed to the rate limiter.

run(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]

Process source and return a VideoProcessorResult.

All keyword arguments override the constructor defaults for this call only. In safe_mode=True fatal stage errors are captured into result.failed_stage / result.stage_errors and the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured into result.stage_errors so the pipeline continues with whatever data is available.

Return type:

VideoProcessorResult

async arun(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]

Async variant of run().

Return type:

VideoProcessorResult

static parse_timestamp(value)[source]

Parse timestamp values like 130, "02:10", "2 mins 10 sec".

Return type:

float

answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Process video then answer a user question from extracted timeline context.

Return type:

VideoProcessorResult

async aanswer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]

Async variant of answer_question().

Return type:

VideoProcessorResult

class ractogateway.pipelines.video_processor.DeduplicationMethod(*values)[source]

Bases: str, Enum

Frame similarity algorithm used for deduplication.

PHASH = 'phash'
SSIM = 'ssim'
class ractogateway.pipelines.video_processor.FrameAnalysisMode(*values)[source]

Bases: str, Enum

How frames are sent to the vision LLM.

INDIVIDUAL = 'individual'
GRID = 'grid'
class ractogateway.pipelines.video_processor.VideoProcessingMode(*values)[source]

Bases: str, Enum

How much of the video should be processed.

ACTIVE = 'active'
PASSIVE = 'passive'
class ractogateway.pipelines.video_processor.TranscriberBackend(*values)[source]

Bases: str, Enum

Audio transcription backend.

FASTER_WHISPER = 'faster-whisper'
OPENAI_WHISPER = 'openai-whisper'
HUGGINGFACE_LOCAL = 'huggingface-local'
OPENAI_API = 'openai-api'
GOOGLE_API = 'google-api'
HUGGINGFACE_API = 'huggingface-api'
GROQ_API = 'groq-api'
DEEPGRAM_API = 'deepgram-api'
OLLAMA = 'ollama'
class ractogateway.pipelines.video_processor.VideoConfig(**data)[source]

Bases: BaseModel

All tunable hyperparameters for VideoProcessorPipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

fps: float

Frames to sample per second of video.

similarity_threshold: float

Discard a frame whose similarity to the previous kept frame is >= this %. Lower = keep more frames. Range 0-100.

max_frames: int | None

Hard cap on frames kept (None = no cap).

dedup_method: DeduplicationMethod

Algorithm used to compare frame similarity.

frame_format: str

‘JPEG’ (smaller) or ‘PNG’ (lossless).

Type:

Image format for kept frames

analyze_frames: bool

Pass kept frames to the vision LLM for content extraction.

frame_analysis_mode: FrameAnalysisMode

Individual = one LLM call per frame; Grid = stitch frames into a collage.

grid_size: int

Number of frames per grid collage (used when frame_analysis_mode=’grid’).

batch_size: int

How many frames to submit to the LLM concurrently per batch.

max_workers: int

Thread-pool size for concurrent LLM frame analysis calls.

max_process_workers: int

Process-pool size for CPU-bound frame extraction / hashing.

transcribe_audio: bool

Extract and transcribe the video’s audio track.

transcriber_backend: TranscriberBackend

Which transcription engine to use.

transcriber_model: str

Model name / size — interpretation is backend-specific.

Examples:

faster-whisper / openai-whisper : “tiny” “base” “small” “medium” “large-v3” huggingface-local / -api : HF model ID e.g. “openai/whisper-large-v3” openai-api : “whisper-1” google-api : “long” “short” “latest_long” groq-api : “whisper-large-v3” “whisper-large-v3-turbo” deepgram-api : “nova-3” “nova-2” “enhanced” “base” ollama : model name on server e.g. “whisper”

transcriber_api_key: str | None

API key for cloud transcription backends (falls back to env vars).

transcriber_base_url: str | None

Base URL for self-hosted / Ollama transcription endpoints.

language: str | None

BCP-47 language code (e.g. ‘en’, ‘fr’). None = auto-detect.

generate_summary: bool

Generate a comprehensive textual summary at the end.

store_in_rag: bool

Push all extracted content into the supplied rag_pipeline for Q&A.

processing_mode: VideoProcessingMode

active processes full video; passive processes only a time window.

focus_time_seconds: float | None

10).

Type:

Center timestamp in seconds for passive mode (e.g. 130 for 02

window_seconds: float

Passive-mode half-window size in seconds (focus ± window_seconds).

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.video_processor.FrameEntry(**data)[source]

Bases: BaseModel

One video frame, after extraction and optional analysis.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

frame_id: int

Zero-based sequential frame identifier.

timestamp: float

Position in the video in seconds.

similarity_to_prev: float | None

Similarity percentage to the previous kept frame (None for first frame).

kept: bool

False if discarded by the deduplication step.

analysis: str | None

LLM-generated description of visual content (whiteboard, screen, etc.).

image_data: bytes | None

Raw image bytes for kept + analyzed frames.

image_format: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.video_processor.StageError(**data)[source]

Bases: BaseModel

Structured record of a failure in one pipeline stage.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

stage: str

Name of the pipeline stage that failed (e.g. ‘extract’, ‘transcribe’).

error_type: str

Exception class name (e.g. ‘ImportError’, ‘RuntimeError’).

message: str

str(exc) — the error message.

traceback: str | None

Full Python traceback as a string (available in safe_mode).

class ractogateway.pipelines.video_processor.TranscriptSegment(**data)[source]

Bases: BaseModel

A time-bounded transcription segment aligned to frame IDs.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

start: float

Segment start time in seconds.

end: float

Segment end time in seconds.

text: str

Transcribed text for this segment.

frame_ids: list[int]

IDs of kept frames whose timestamps fall within [start, end].

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.video_processor.VideoSection(**data)[source]

Bases: BaseModel

A merged time section combining visual analysis + audio transcript.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

timestamp_start: float
timestamp_end: float
frame_ids: list[int]
visual_content: str

Combined LLM analyses for all frames in this section.

audio_content: str

Concatenated transcript text for this section’s time range.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.pipelines.video_processor.VideoProcessorResult(**data)[source]

Bases: BaseModel

Full output of a VideoProcessorPipeline run.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

video_path: str

Original source identifier (path, URL, or ‘<bytes>’ for buffer input).

frames: list[FrameEntry]

All extracted frames (kept and discarded).

transcript: list[TranscriptSegment]

Audio transcript segmented by timestamp.

sections: list[VideoSection]

Merged visual + audio sections ordered by time.

summary: str | None

Comprehensive LLM-generated summary of the entire video.

rag_stored: bool
rag_chunk_count: int
usage: VideoProcessorUsage
error: str | None

Short description of the first fatal error (backward-compatible).

failed_stage: str | None

Name of the stage that caused a fatal pipeline abort, if any.

stage_errors: list[StageError]

All per-stage errors collected during the run (fatal + non-fatal).

processing_mode: VideoProcessingMode

Whether this run processed full video (active) or a window (passive).

window_start_seconds: float | None

Passive-mode window start timestamp in source-video seconds.

window_end_seconds: float | None

Passive-mode window end timestamp in source-video seconds.

question: str | None

Optional user question answered from this run.

answer: str | None

Answer generated for question, when question-answer mode is used.

property has_errors: bool

True if any stage encountered an error.

property is_failed: bool

True if the pipeline aborted early due to a fatal stage error.

get_transcript_text()[source]

Full transcript as a single string.

Return type:

str

get_all_visual_content()[source]

All frame analyses concatenated in timestamp order.

Return type:

str

to_json(path=None, *, indent=2)[source]

Serialise result to JSON. Returns JSON string if path is None.

Return type:

str | None

to_markdown(path=None)[source]

Build a structured Markdown report. Returns string if path is None.

Return type:

str | None

class ractogateway.pipelines.video_processor.VideoProcessorUsage(**data)[source]

Bases: BaseModel

Accounting of tokens and frame counts across the full pipeline.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

frames_extracted: int
frames_kept: int
frames_discarded: int
analysis_input_tokens: int
analysis_output_tokens: int
summary_input_tokens: int
summary_output_tokens: int
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

audio_duration_seconds: float
property total_analysis_tokens: int
property total_summary_tokens: int
property total_tokens: int
exception ractogateway.pipelines.video_processor.VideoRateLimitExceededError[source]

Bases: RuntimeError

Raised when a rate_limiter denies a VideoProcessorPipeline request.