ractogateway.pipelines.video_processor
VideoProcessorPipeline — process tutorial/lecture videos for RAG & Q&A.
Quick start:
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.video_processor import (
VideoProcessorPipeline,
TranscriberBackend,
DeduplicationMethod,
)
kit = Chat(api_key="sk-...", model="gpt-4o")
pipeline = VideoProcessorPipeline(
kit=kit,
fps=1.0,
similarity_threshold=85.0,
transcriber=TranscriberBackend.FASTER_WHISPER,
transcriber_model="base",
analyze_frames=True,
generate_summary=True,
)
# Accepts: local path, URL, YouTube link, bytes buffer, or pre-extracted frames
result = pipeline.run("lecture.mp4")
print(result.summary)
result.to_markdown("report.md")
Install:
pip install ractogateway[pipelines-video] # core (OpenCV, pHash, ffmpeg)
pip install ractogateway[pipelines-video-whisper] # + faster-whisper
pip install ractogateway[pipelines-video-full] # all of the above
pip install ractogateway[pipelines-video-yt] # + yt-dlp (YouTube support)
- class ractogateway.pipelines.video_processor.AsyncVideoProcessorPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
VideoProcessorPipeline.Exposes a single
async run()method — suitable for FastAPI endpoints where you do not want a syncrun()in the public API.All constructor parameters are identical to
VideoProcessorPipeline.- async run(source, **kwargs)[source]
Async-only process entrypoint.
- Return type:
VideoProcessorResult
- async answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Async-only variant of
VideoProcessorPipeline.aanswer_question().- Return type:
VideoProcessorResult
- class ractogateway.pipelines.video_processor.VideoProcessorPipeline(kit, *, analysis_kit=None, summary_kit=None, transcriber=TranscriberBackend.FASTER_WHISPER, transcriber_model='base', transcriber_api_key=None, transcriber_base_url=None, fps=1.0, similarity_threshold=90.0, dedup_method=DeduplicationMethod.PHASH, max_frames=None, frame_format='JPEG', frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL, grid_size=4, batch_size=10, max_workers=4, max_process_workers=4, language=None, transcribe_audio=True, analyze_frames=True, generate_summary=True, processing_mode=VideoProcessingMode.ACTIVE, focus_time_seconds=None, window_seconds=5.0, rag_pipeline=None, safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectSynchronous + asynchronous video processing pipeline.
- Parameters:
kit (
Any) – A RactoGateway developer kit (Chat) used for both frame analysis and summary generation unless analysis_kit or summary_kit are provided.analysis_kit (
Any) – Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to kit when not supplied.summary_kit (
Any) – Optional separate kit for summary generation (e.g. a larger model). Falls back to kit when not supplied.transcriber (
TranscriberBackend) – Which audio transcription backend to use.transcriber_model (
str) – Model name / size for the chosen backend.transcriber_api_key (
str|None) – API key for cloud transcription backends (or read from env vars).transcriber_base_url (
str|None) – Base URL for self-hosted endpoints (Ollama etc.).fps (
float) – Video frames to sample per second.similarity_threshold (
float) – Frames with similarity >= this % to the previous kept frame are discarded. E.g.90.0keeps frames that differ by more than 10 %.dedup_method (
DeduplicationMethod) –DeduplicationMethod.PHASH(fast, default) orDeduplicationMethod.SSIM(more accurate).max_frames (
int|None) – Hard cap on the number of kept frames (None= no cap).frame_format (
str) –"JPEG"(smaller, lossy) or"PNG"(lossless).frame_analysis_mode (
FrameAnalysisMode) –FrameAnalysisMode.INDIVIDUAL(one LLM call per frame, default) orFrameAnalysisMode.GRID(stitch into a collage).grid_size (
int) – Frames per grid collage (only used in GRID mode).batch_size (
int) – Concurrent LLM calls per batch during frame analysis.max_workers (
int) – Thread-pool size for concurrent LLM calls.max_process_workers (
int) – Process-pool size for CPU-bound frame extraction / hashing.language (
str|None) – BCP-47 language code for transcription (None= auto-detect).transcribe_audio (
bool) – Whether to extract and transcribe the audio track.analyze_frames (
bool) – Whether to pass frames to the vision LLM.generate_summary (
bool) – Whether to generate a comprehensive summary at the end.rag_pipeline (
Any) – An optionalractogateway.rag.pipeline.RactoRAGinstance. When supplied and store_in_rag isTrue(or per-call), all extracted content is indexed for retrieval.safe_mode (
bool) – Catch all exceptions and return them inresult.errorinstead of raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracerfor OTEL tracing.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]
Process source and return a
VideoProcessorResult.All keyword arguments override the constructor defaults for this call only. In
safe_mode=Truefatal stage errors are captured intoresult.failed_stage/result.stage_errorsand the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured intoresult.stage_errorsso the pipeline continues with whatever data is available.- Return type:
VideoProcessorResult
- async arun(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]
Async variant of
run().- Return type:
VideoProcessorResult
- static parse_timestamp(value)[source]
Parse timestamp values like
130,"02:10","2 mins 10 sec".- Return type:
- answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Process video then answer a user question from extracted timeline context.
- Return type:
VideoProcessorResult
- async aanswer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Async variant of
answer_question().- Return type:
VideoProcessorResult
- class ractogateway.pipelines.video_processor.DeduplicationMethod(*values)[source]
-
Frame similarity algorithm used for deduplication.
- PHASH = 'phash'
- SSIM = 'ssim'
- class ractogateway.pipelines.video_processor.FrameAnalysisMode(*values)[source]
-
How frames are sent to the vision LLM.
- INDIVIDUAL = 'individual'
- GRID = 'grid'
- class ractogateway.pipelines.video_processor.VideoProcessingMode(*values)[source]
-
How much of the video should be processed.
- ACTIVE = 'active'
- PASSIVE = 'passive'
- class ractogateway.pipelines.video_processor.TranscriberBackend(*values)[source]
-
Audio transcription backend.
- FASTER_WHISPER = 'faster-whisper'
- OPENAI_WHISPER = 'openai-whisper'
- HUGGINGFACE_LOCAL = 'huggingface-local'
- OPENAI_API = 'openai-api'
- GOOGLE_API = 'google-api'
- HUGGINGFACE_API = 'huggingface-api'
- GROQ_API = 'groq-api'
- DEEPGRAM_API = 'deepgram-api'
- OLLAMA = 'ollama'
- class ractogateway.pipelines.video_processor.VideoConfig(**data)[source]
Bases:
BaseModelAll tunable hyperparameters for VideoProcessorPipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- fps: float
Frames to sample per second of video.
- similarity_threshold: float
Discard a frame whose similarity to the previous kept frame is >= this %. Lower = keep more frames. Range 0-100.
- dedup_method: DeduplicationMethod
Algorithm used to compare frame similarity.
- frame_format: str
‘JPEG’ (smaller) or ‘PNG’ (lossless).
- Type:
Image format for kept frames
- analyze_frames: bool
Pass kept frames to the vision LLM for content extraction.
- frame_analysis_mode: FrameAnalysisMode
Individual = one LLM call per frame; Grid = stitch frames into a collage.
- grid_size: int
Number of frames per grid collage (used when frame_analysis_mode=’grid’).
- batch_size: int
How many frames to submit to the LLM concurrently per batch.
- max_workers: int
Thread-pool size for concurrent LLM frame analysis calls.
- max_process_workers: int
Process-pool size for CPU-bound frame extraction / hashing.
- transcribe_audio: bool
Extract and transcribe the video’s audio track.
- transcriber_backend: TranscriberBackend
Which transcription engine to use.
- transcriber_model: str
Model name / size — interpretation is backend-specific.
- Examples:
faster-whisper / openai-whisper : “tiny” “base” “small” “medium” “large-v3” huggingface-local / -api : HF model ID e.g. “openai/whisper-large-v3” openai-api : “whisper-1” google-api : “long” “short” “latest_long” groq-api : “whisper-large-v3” “whisper-large-v3-turbo” deepgram-api : “nova-3” “nova-2” “enhanced” “base” ollama : model name on server e.g. “whisper”
- generate_summary: bool
Generate a comprehensive textual summary at the end.
- store_in_rag: bool
Push all extracted content into the supplied rag_pipeline for Q&A.
- processing_mode: VideoProcessingMode
active processes full video; passive processes only a time window.
- focus_time_seconds: float | None
10).
- Type:
Center timestamp in seconds for passive mode (e.g. 130 for 02
- window_seconds: float
Passive-mode half-window size in seconds (focus ± window_seconds).
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.video_processor.FrameEntry(**data)[source]
Bases:
BaseModelOne video frame, after extraction and optional analysis.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- frame_id: int
Zero-based sequential frame identifier.
- timestamp: float
Position in the video in seconds.
- similarity_to_prev: float | None
Similarity percentage to the previous kept frame (None for first frame).
- kept: bool
False if discarded by the deduplication step.
- image_format: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.video_processor.StageError(**data)[source]
Bases:
BaseModelStructured record of a failure in one pipeline stage.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- stage: str
Name of the pipeline stage that failed (e.g. ‘extract’, ‘transcribe’).
- error_type: str
Exception class name (e.g. ‘ImportError’, ‘RuntimeError’).
- message: str
str(exc) — the error message.
- class ractogateway.pipelines.video_processor.TranscriptSegment(**data)[source]
Bases:
BaseModelA time-bounded transcription segment aligned to frame IDs.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- start: float
Segment start time in seconds.
- end: float
Segment end time in seconds.
- text: str
Transcribed text for this segment.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.video_processor.VideoSection(**data)[source]
Bases:
BaseModelA merged time section combining visual analysis + audio transcript.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- timestamp_start: float
- timestamp_end: float
- visual_content: str
Combined LLM analyses for all frames in this section.
- audio_content: str
Concatenated transcript text for this section’s time range.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.pipelines.video_processor.VideoProcessorResult(**data)[source]
Bases:
BaseModelFull output of a VideoProcessorPipeline run.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- video_path: str
Original source identifier (path, URL, or ‘<bytes>’ for buffer input).
- frames: list[FrameEntry]
All extracted frames (kept and discarded).
- transcript: list[TranscriptSegment]
Audio transcript segmented by timestamp.
- sections: list[VideoSection]
Merged visual + audio sections ordered by time.
- rag_stored: bool
- rag_chunk_count: int
- usage: VideoProcessorUsage
- stage_errors: list[StageError]
All per-stage errors collected during the run (fatal + non-fatal).
- processing_mode: VideoProcessingMode
Whether this run processed full video (active) or a window (passive).
- property has_errors: bool
True if any stage encountered an error.
- property is_failed: bool
True if the pipeline aborted early due to a fatal stage error.
- get_all_visual_content()[source]
All frame analyses concatenated in timestamp order.
- Return type:
- to_json(path=None, *, indent=2)[source]
Serialise result to JSON. Returns JSON string if path is None.
- class ractogateway.pipelines.video_processor.VideoProcessorUsage(**data)[source]
Bases:
BaseModelAccounting of tokens and frame counts across the full pipeline.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- frames_extracted: int
- frames_kept: int
- frames_discarded: int
- analysis_input_tokens: int
- analysis_output_tokens: int
- summary_input_tokens: int
- summary_output_tokens: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- audio_duration_seconds: float
- property total_analysis_tokens: int
- property total_summary_tokens: int
- property total_tokens: int
- exception ractogateway.pipelines.video_processor.VideoRateLimitExceededError[source]
Bases:
RuntimeErrorRaised when a rate_limiter denies a VideoProcessorPipeline request.