ractogateway.pipelines.video_processor.pipeline
VideoProcessorPipeline — main entry point.
- Processes a video through five stages:
Load — resolve source (path / URL / YouTube / bytes / frame list)
Extract — sample frames with OpenCV, deduplicate by similarity
Transcribe — extract audio, transcribe with chosen backend
Analyse — pass frames to vision LLM (individual or grid mode)
Summarise — produce comprehensive Markdown summary
RAG store — optionally index everything via RactoRAG
Usage:
from ractogateway.openai_developer_kit import Chat
from ractogateway.pipelines.video_processor import (
VideoProcessorPipeline,
DeduplicationMethod,
TranscriberBackend,
)
kit = Chat(api_key="...", model="gpt-4o")
pipeline = VideoProcessorPipeline(
kit=kit,
fps=1.0,
similarity_threshold=85.0,
transcriber=TranscriberBackend.FASTER_WHISPER,
transcriber_model="base",
analyze_frames=True,
generate_summary=True,
safe_mode=True,
)
result = pipeline.run("lecture.mp4")
print(result.summary)
print(result.get_transcript_text())
# YouTube / URL / bytes also accepted:
result = pipeline.run("https://www.youtube.com/watch?v=...")
result = pipeline.run("https://example.com/video.mp4")
result = pipeline.run(video_bytes) # bytes
result = pipeline.run(["frame1.jpg", ...]) # pre-extracted frames
- class ractogateway.pipelines.video_processor.pipeline.VideoProcessorPipeline(kit, *, analysis_kit=None, summary_kit=None, transcriber=TranscriberBackend.FASTER_WHISPER, transcriber_model='base', transcriber_api_key=None, transcriber_base_url=None, fps=1.0, similarity_threshold=90.0, dedup_method=DeduplicationMethod.PHASH, max_frames=None, frame_format='JPEG', frame_analysis_mode=FrameAnalysisMode.INDIVIDUAL, grid_size=4, batch_size=10, max_workers=4, max_process_workers=4, language=None, transcribe_audio=True, analyze_frames=True, generate_summary=True, processing_mode=VideoProcessingMode.ACTIVE, focus_time_seconds=None, window_seconds=5.0, rag_pipeline=None, safe_mode=False, tracer=None, metrics=None, rate_limiter=None, user_id='default')[source]
Bases:
objectSynchronous + asynchronous video processing pipeline.
- Parameters:
kit (
Any) – A RactoGateway developer kit (Chat) used for both frame analysis and summary generation unless analysis_kit or summary_kit are provided.analysis_kit (
Any) – Optional separate kit for vision/frame analysis (e.g. a vision-specific model). Falls back to kit when not supplied.summary_kit (
Any) – Optional separate kit for summary generation (e.g. a larger model). Falls back to kit when not supplied.transcriber (
TranscriberBackend) – Which audio transcription backend to use.transcriber_model (
str) – Model name / size for the chosen backend.transcriber_api_key (
str|None) – API key for cloud transcription backends (or read from env vars).transcriber_base_url (
str|None) – Base URL for self-hosted endpoints (Ollama etc.).fps (
float) – Video frames to sample per second.similarity_threshold (
float) – Frames with similarity >= this % to the previous kept frame are discarded. E.g.90.0keeps frames that differ by more than 10 %.dedup_method (
DeduplicationMethod) –DeduplicationMethod.PHASH(fast, default) orDeduplicationMethod.SSIM(more accurate).max_frames (
int|None) – Hard cap on the number of kept frames (None= no cap).frame_format (
str) –"JPEG"(smaller, lossy) or"PNG"(lossless).frame_analysis_mode (
FrameAnalysisMode) –FrameAnalysisMode.INDIVIDUAL(one LLM call per frame, default) orFrameAnalysisMode.GRID(stitch into a collage).grid_size (
int) – Frames per grid collage (only used in GRID mode).batch_size (
int) – Concurrent LLM calls per batch during frame analysis.max_workers (
int) – Thread-pool size for concurrent LLM calls.max_process_workers (
int) – Process-pool size for CPU-bound frame extraction / hashing.language (
str|None) – BCP-47 language code for transcription (None= auto-detect).transcribe_audio (
bool) – Whether to extract and transcribe the audio track.analyze_frames (
bool) – Whether to pass frames to the vision LLM.generate_summary (
bool) – Whether to generate a comprehensive summary at the end.rag_pipeline (
Any) – An optionalractogateway.rag.pipeline.RactoRAGinstance. When supplied and store_in_rag isTrue(or per-call), all extracted content is indexed for retrieval.safe_mode (
bool) – Catch all exceptions and return them inresult.errorinstead of raising.tracer (
Any) – Optionalractogateway.telemetry.RactoTracerfor OTEL tracing.metrics (
Any) – Optionalractogateway.telemetry.GatewayMetricsMiddleware.rate_limiter (
Any) – Duck-typed rate limiter withcheck_and_consume(user_id, tokens)andget_remaining(user_id)methods.user_id (
str) – Default user identifier passed to the rate limiter.
- run(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]
Process source and return a
VideoProcessorResult.All keyword arguments override the constructor defaults for this call only. In
safe_mode=Truefatal stage errors are captured intoresult.failed_stage/result.stage_errorsand the pipeline returns a partial result instead of raising. Non-fatal stage errors (transcription, analysis, summary) are always captured intoresult.stage_errorsso the pipeline continues with whatever data is available.- Return type:
VideoProcessorResult
- async arun(source, *, fps=<object object>, similarity_threshold=<object object>, dedup_method=<object object>, max_frames=<object object>, analyze_frames=<object object>, frame_analysis_mode=<object object>, grid_size=<object object>, batch_size=<object object>, transcribe_audio=<object object>, language=<object object>, generate_summary=<object object>, processing_mode=<object object>, focus_time_seconds=<object object>, window_seconds=<object object>, store_in_rag=False, user_id=<object object>)[source]
Async variant of
run().- Return type:
VideoProcessorResult
- static parse_timestamp(value)[source]
Parse timestamp values like
130,"02:10","2 mins 10 sec".- Return type:
- answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Process video then answer a user question from extracted timeline context.
- Return type:
VideoProcessorResult
- async aanswer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Async variant of
answer_question().- Return type:
VideoProcessorResult
- class ractogateway.pipelines.video_processor.pipeline.AsyncVideoProcessorPipeline(*args, **kwargs)[source]
Bases:
objectAsync-only variant of
VideoProcessorPipeline.Exposes a single
async run()method — suitable for FastAPI endpoints where you do not want a syncrun()in the public API.All constructor parameters are identical to
VideoProcessorPipeline.- async run(source, **kwargs)[source]
Async-only process entrypoint.
- Return type:
VideoProcessorResult
- async answer_question(source, *, question, processing_mode=VideoProcessingMode.ACTIVE, focus_time=None, window_seconds=5.0, max_context_chars=40000, **run_kwargs)[source]
Async-only variant of
VideoProcessorPipeline.aanswer_question().- Return type:
VideoProcessorResult