Video Processor Pipeline
VideoProcessorPipeline turns any video file — local, remote, or YouTube — into
structured knowledge: unique frames extracted at a configurable rate, audio transcribed
by your chosen model, every whiteboard equation and screen text captured by a vision LLM,
and a 7-section comprehensive summary produced automatically. Results can optionally
be indexed in a vector store for downstream Q&A via RactoRAG.
Use AsyncVideoProcessorPipeline in async app stacks (FastAPI, etc.).
Best Use Cases
Indexing lecture recordings and university course videos for RAG
Extracting all equations and proofs from math / physics tutorial videos
Building searchable archives of technical demo recordings
Generating structured notes from conference talks or webinars
Processing training videos for corporate knowledge bases
Installation
# Core — frame extraction, dedup, audio extraction, HTTP download
pip install "ractogateway[pipelines-video]"
# + local transcription with faster-whisper (recommended)
pip install "ractogateway[pipelines-video-whisper]"
# + YouTube video download via yt-dlp
pip install "ractogateway[pipelines-video-yt]"
# Everything at once
pip install "ractogateway[pipelines-video-full]"
Minimal Example
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.video_processor import (
VideoProcessorPipeline,
TranscriberBackend,
)
kit = Chat(api_key="sk-...", model="gpt-4o")
pipeline = VideoProcessorPipeline(
kit=kit,
fps=1.0, # sample 1 frame per second
similarity_threshold=85.0, # keep frames that differ by more than 15 %
transcriber=TranscriberBackend.FASTER_WHISPER,
transcriber_model="base",
analyze_frames=True,
generate_summary=True,
)
result = pipeline.run("lecture.mp4")
print(f"Kept {result.usage.frames_kept} / {result.usage.frames_extracted} frames")
print(result.summary)
Example output
Kept 18 / 120 frames
# Video Summary
## Overview
This lecture introduces Newton's Laws of Motion, covering all three laws with
mathematical formulations and worked examples on the whiteboard.
## Whiteboard / Board Content
- F = ma (Newton's Second Law)
- ΣF = 0 (First Law equilibrium condition)
- F₁₂ = −F₂₁ (Third Law)
- Worked example: m = 5 kg, a = 2 m/s² → F = 10 N
## Screen / Slide Content
- Slide 3: "Newton's Laws — Historical Context (1687)"
- Code snippet (Python simulation): `force = mass * acceleration`
## Key Concepts & Definitions
...
Accepted Video Sources
The pipeline accepts five different input types — no need to pre-download:
# 1. Local file path (str or Path)
result = pipeline.run("lecture.mp4")
result = pipeline.run(Path("/recordings/session1.mov"))
# 2. HTTP / HTTPS URL (requires httpx — included in pipelines-video)
result = pipeline.run("https://cdn.university.edu/physics101.mp4")
# 3. YouTube URL (requires yt-dlp — install pipelines-video-yt)
result = pipeline.run("https://www.youtube.com/watch?v=abc123xyz")
result = pipeline.run("https://youtu.be/abc123xyz")
# 4. Raw bytes buffer (in-memory)
with open("lecture.mp4", "rb") as f:
video_bytes = f.read()
result = pipeline.run(video_bytes)
# 5. Pre-extracted frame images (skip OpenCV extraction entirely)
frame_paths = ["frame_0.jpg", "frame_1.jpg", "frame_2.jpg"]
result = pipeline.run(frame_paths)
Frame Deduplication
Identical or near-identical frames (e.g., static slides) are automatically filtered to reduce LLM cost and noise.
Configuring the threshold
pipeline = VideoProcessorPipeline(
kit=kit,
similarity_threshold=85.0, # keep frames that differ by > 15 %
# similarity_threshold=95.0 # aggressive — only keep dramatically different frames
# similarity_threshold=60.0 # conservative — keep more frames
)
>= threshold → frame is discarded (too similar to previous)
< threshold → frame is kept
Choosing the algorithm
from ractogateway.pipelines.video_processor import DeduplicationMethod
# pHash — fast, good for most content (default)
pipeline = VideoProcessorPipeline(kit=kit, dedup_method=DeduplicationMethod.PHASH)
# SSIM — structural similarity, more accurate for subtle changes
# Requires: scikit-image (included in pipelines-video)
pipeline = VideoProcessorPipeline(kit=kit, dedup_method=DeduplicationMethod.SSIM)
Inspecting deduplication results
for frame in result.frames:
status = "KEPT" if frame.kept else "SKIP"
sim = f"{frame.similarity_to_prev:.1f}%" if frame.similarity_to_prev else "first"
print(f"Frame {frame.frame_id:3d} [{frame.timestamp:6.1f}s] {status} sim={sim}")
Example output
Frame 0 [ 0.0s] KEPT sim=first
Frame 1 [ 1.0s] SKIP sim=97.2%
Frame 2 [ 2.0s] SKIP sim=98.1%
Frame 3 [ 3.0s] KEPT sim=41.3% ← new content on board
Frame 4 [ 4.0s] KEPT sim=22.7% ← more writing
Audio Transcription
Choosing a backend
from ractogateway.pipelines.video_processor import TranscriberBackend
# Local — no API key needed
pipeline = VideoProcessorPipeline(
kit=kit,
transcriber=TranscriberBackend.FASTER_WHISPER,
transcriber_model="base", # tiny / base / small / medium / large-v3
)
# Cloud — OpenAI Whisper API
pipeline = VideoProcessorPipeline(
kit=kit,
transcriber=TranscriberBackend.OPENAI_API,
transcriber_model="whisper-1",
transcriber_api_key="sk-...", # or set OPENAI_API_KEY env var
)
# Cloud — Groq (ultra-fast, cheap)
pipeline = VideoProcessorPipeline(
kit=kit,
transcriber=TranscriberBackend.GROQ_API,
transcriber_model="whisper-large-v3-turbo",
transcriber_api_key="gsk_...", # or GROQ_API_KEY
)
# Cloud — Deepgram Nova 3
pipeline = VideoProcessorPipeline(
kit=kit,
transcriber=TranscriberBackend.DEEPGRAM_API,
transcriber_model="nova-3",
transcriber_api_key="...", # or DEEPGRAM_API_KEY
)
# Local — HuggingFace (any ASR model)
pipeline = VideoProcessorPipeline(
kit=kit,
transcriber=TranscriberBackend.HUGGINGFACE_LOCAL,
transcriber_model="openai/whisper-large-v3",
)
# Self-hosted — Ollama
pipeline = VideoProcessorPipeline(
kit=kit,
transcriber=TranscriberBackend.OLLAMA,
transcriber_model="whisper",
transcriber_base_url="http://localhost:11434",
)
Language detection
# Auto-detect (default)
pipeline = VideoProcessorPipeline(kit=kit, language=None)
# Force a specific language
pipeline = VideoProcessorPipeline(kit=kit, language="en")
pipeline = VideoProcessorPipeline(kit=kit, language="fr")
pipeline = VideoProcessorPipeline(kit=kit, language="de")
Disabling transcription
# Vision analysis only — no audio
pipeline = VideoProcessorPipeline(kit=kit, transcribe_audio=False)
Vision LLM Analysis
Every kept frame is passed to a vision-capable model to extract:
All text/equations written on the whiteboard or blackboard (copied verbatim)
All text, code, or diagrams visible on screen
A brief description of the scene
Choosing the analysis provider
from ractogateway.openai_developer_kit import Chat as GPTChat
from ractogateway.anthropic_developer_kit import Chat as ClaudeChat
from ractogateway.google_developer_kit import Chat as GeminiChat
# GPT-4o (OpenAI)
pipeline = VideoProcessorPipeline(
kit=GPTChat(model="gpt-4o"),
analyze_frames=True,
)
# Claude 3.5 Sonnet (Anthropic)
pipeline = VideoProcessorPipeline(
kit=ClaudeChat(model="claude-sonnet-4-6"),
analyze_frames=True,
)
# Gemini 1.5 Flash (Google) — cheapest for high frame counts
pipeline = VideoProcessorPipeline(
kit=GeminiChat(model="gemini-1.5-flash"),
analyze_frames=True,
)
Individual vs Grid mode
from ractogateway.pipelines.video_processor import FrameAnalysisMode
# INDIVIDUAL — one LLM call per frame (highest accuracy)
pipeline = VideoProcessorPipeline(
kit=kit,
frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL,
batch_size=10, # 10 concurrent calls per batch
max_workers=4, # thread-pool size
)
# GRID — stitch 4 frames into a 2×2 collage → one LLM call (4× cheaper)
pipeline = VideoProcessorPipeline(
kit=kit,
frame_analysis_mode=FrameAnalysisMode.GRID,
grid_size=4, # frames per collage
)
Separate kits per step
Use a powerful model for analysis and a faster/cheaper one for summary:
pipeline = VideoProcessorPipeline(
kit=GPTChat(model="gpt-4o-mini"), # fallback / summary
analysis_kit=GPTChat(model="gpt-4o"), # vision analysis
summary_kit=GPTChat(model="gpt-4o"), # summary
)
Summary Generation
The summary LLM receives a chronological log of every visual analysis + transcript segment and produces a structured 7-section Markdown document:
Overview — what the video is about
Key Topics Covered — bulleted list
Whiteboard / Board Content — ALL equations and formulas verbatim
Screen / Slide Content — all visible text and code
Detailed Explanation — timeline walkthrough
Key Concepts & Definitions — important terms
Conclusions / Takeaways — key points to remember
result = pipeline.run("lecture.mp4")
# Full Markdown summary
print(result.summary)
# Save to file
result.to_markdown("lecture_notes.md")
RAG Storage
Index the full video content for Q&A retrieval using any RactoRAG-compatible store:
from ractogateway.rag.pipeline import RactoRAG
from ractogateway.rag.embedders import OpenAIEmbedder
from ractogateway.rag.stores import PineconeStore # or ChromaStore, FAISSStore, etc.
rag = RactoRAG(
embedder=OpenAIEmbedder(api_key="sk-..."),
store=PineconeStore(api_key="...", index_name="lectures"),
)
pipeline = VideoProcessorPipeline(
kit=kit,
rag_pipeline=rag,
)
result = pipeline.run("lecture.mp4", store_in_rag=True)
print(f"Stored {result.rag_chunk_count} chunks")
# Now query the video content
answer = rag.query("What is Newton's Second Law?")
print(answer)
Stored chunks include:
One document per
VideoSection(visual + audio merged)One document per transcript segment (audio-only retrieval)
One document for the full summary
Async Usage
import asyncio
from ractogateway.pipelines.video_processor import AsyncVideoProcessorPipeline
pipeline = AsyncVideoProcessorPipeline(
kit=kit,
fps=1.0,
similarity_threshold=85.0,
transcriber=TranscriberBackend.FASTER_WHISPER,
generate_summary=True,
)
async def process():
result = await pipeline.run("lecture.mp4")
print(result.summary)
asyncio.run(process())
FastAPI integration
from fastapi import FastAPI, UploadFile
from ractogateway.pipelines.video_processor import AsyncVideoProcessorPipeline
app = FastAPI()
pipeline = AsyncVideoProcessorPipeline(kit=kit, safe_mode=True)
@app.post("/process-video")
async def process_video(file: UploadFile):
video_bytes = await file.read()
result = await pipeline.run(video_bytes)
return {
"summary": result.summary,
"frames_kept": result.usage.frames_kept,
"transcript": result.get_transcript_text(),
"error": result.error,
}
Passive Mode — Focused Time Windows
Instead of processing the entire video, passive mode extracts and analyses only a narrow time window around a specific timestamp. This is ideal for targeted Q&A on long recordings.
from ractogateway.pipelines.video_processor import VideoProcessingMode
# Process only the 10-second window around the 2:10 mark (125s–135s)
result = pipeline.run(
"lecture.mp4",
processing_mode=VideoProcessingMode.PASSIVE,
focus_time_seconds=130.0, # center of window
window_seconds=5.0, # ±5 s → [125 s, 135 s]
)
print(result.window_start_seconds) # 125.0
print(result.window_end_seconds) # 135.0
Timestamps in the result are absolute (relative to the original video source),
not relative to the window start — so frame.timestamp=127.3 means 2:07 in the
full video.
Timestamp formats
focus_time_seconds (and answer_question’s focus_time) accept any of:
pipeline.run("v.mp4", processing_mode="passive", focus_time_seconds=130)
pipeline.run("v.mp4", processing_mode="passive", focus_time_seconds="02:10")
pipeline.run("v.mp4", processing_mode="passive", focus_time_seconds="2 mins 10 sec")
pipeline.run("v.mp4", processing_mode="passive", focus_time_seconds="1h 2m 10s")
You can also call the parser directly:
VideoProcessorPipeline.parse_timestamp("02:10") # → 130.0
VideoProcessorPipeline.parse_timestamp("2 mins 10s") # → 130.0
Q&A — Answering Questions from Video Content
answer_question combines passive-mode windowed processing with a focused LLM
question-answering call. It returns the normal VideoProcessorResult with
result.question and result.answer populated.
result = pipeline.answer_question(
"lecture.mp4",
question="Which equation appears near the 2-minute mark?",
processing_mode="passive",
focus_time="02:00",
window_seconds=10.0,
)
print(result.answer)
# ## Answer
# At [115.0s - 125.0s] the whiteboard shows **F = ma** (Newton's Second Law).
#
# ## Evidence
# Frame at 117.3 s: "Board: F = ma"
#
# ## Confidence
# High — equation clearly visible in 3 consecutive frames.
Active mode Q&A
For shorter videos or when you want the LLM to draw from the entire timeline:
result = pipeline.answer_question(
"short_demo.mp4",
question="What Python library does the presenter use for plotting?",
processing_mode="active", # scan the whole video
)
print(result.answer)
Async variant
result = await pipeline.aanswer_question(
"lecture.mp4",
question="What is the definition of entropy given at 5:30?",
processing_mode="passive",
focus_time="5:30",
window_seconds=8.0,
)
Per-Call Overrides
Any constructor parameter can be overridden per call:
# Constructor defaults
pipeline = VideoProcessorPipeline(
kit=kit,
fps=1.0,
similarity_threshold=90.0,
generate_summary=True,
)
# Override for a specific run
result = pipeline.run(
"dense_lecture.mp4",
fps=2.0, # sample more frames
similarity_threshold=70.0, # keep more variation
language="fr", # French audio
generate_summary=False, # skip summary for speed
)
Production Controls
Safe mode
# Never raises — errors go to result.error
pipeline = VideoProcessorPipeline(kit=kit, safe_mode=True)
result = pipeline.run("video.mp4")
if result.error:
print(f"Processing failed: {result.error}")
Rate limiting
from ractogateway.redis import RedisRateLimiter, RateLimitConfig
limiter = RedisRateLimiter(
url="redis://localhost:6379",
config=RateLimitConfig(max_tokens_per_minute=10),
)
pipeline = VideoProcessorPipeline(
kit=kit,
rate_limiter=limiter,
user_id="user_42",
)
Telemetry
from ractogateway.telemetry import RactoTracer
tracer = RactoTracer(otlp_endpoint="http://localhost:4317")
pipeline = VideoProcessorPipeline(
kit=kit,
tracer=tracer,
)
Controlling concurrency
pipeline = VideoProcessorPipeline(
kit=kit,
max_workers=8, # more concurrent LLM analysis calls
max_process_workers=4, # more CPU processes for frame extraction
batch_size=20, # larger analysis batches
)
Result Object
result = pipeline.run("lecture.mp4")
# Source metadata
result.video_path # "lecture.mp4"
result.error # None or error string (safe_mode only)
# Frames
result.frames # list[FrameEntry]
result.usage.frames_extracted # 120
result.usage.frames_kept # 18
result.usage.frames_discarded # 102
# Transcript
result.transcript # list[TranscriptSegment]
result.get_transcript_text() # full text as one string
# Visual content
result.get_all_visual_content() # all analyses in timestamp order
# Sections (merged)
result.sections # list[VideoSection] — visual + audio by time
# Summary
result.summary # Markdown string
# RAG
result.rag_stored # True if stored
result.rag_chunk_count # 42
# Token usage
result.usage.total_tokens # 12500
result.usage.total_analysis_tokens # 10000
result.usage.total_summary_tokens # 2500
result.usage.audio_duration_seconds # 3600.0
Export helpers
# JSON (image bytes excluded automatically)
result.to_json("lecture_result.json")
json_str = result.to_json() # no path → returns string
# Markdown report (summary + transcript + sections)
result.to_markdown("lecture_notes.md")
md_str = result.to_markdown() # no path → returns string
Complete End-to-End Example
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.video_processor import (
VideoProcessorPipeline,
DeduplicationMethod,
FrameAnalysisMode,
TranscriberBackend,
)
kit = Chat(api_key="sk-...", model="gpt-4o")
pipeline = VideoProcessorPipeline(
kit=kit,
# Frame extraction
fps=1.0,
similarity_threshold=85.0,
dedup_method=DeduplicationMethod.PHASH,
frame_format="JPEG",
# Vision analysis
analyze_frames=True,
frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL,
batch_size=10,
max_workers=4,
# Transcription
transcribe_audio=True,
transcriber=TranscriberBackend.FASTER_WHISPER,
transcriber_model="small",
language="en",
# Output
generate_summary=True,
# Safety
safe_mode=True,
)
# Works with any source
for source in [
"lecture.mp4",
"https://youtu.be/dQw4w9WgXcQ",
"https://cdn.example.com/video.mp4",
]:
result = pipeline.run(source)
if result.error:
print(f"Error: {result.error}")
continue
print(f"\n{'='*60}")
print(f"Source: {result.video_path}")
print(f"Frames: {result.usage.frames_kept} kept / {result.usage.frames_extracted} extracted")
print(f"Tokens: {result.usage.total_tokens:,}")
print(f"\n{result.summary}")
result.to_markdown(f"notes_{result.video_path.replace('/', '_')}.md")