ractogateway.pipelines.video_processor._extractor

Frame extraction and deduplication for VideoProcessorPipeline.

Extraction uses OpenCV (CPU-bound → ProcessPoolExecutor). Deduplication supports two algorithms:

  • pHash : perceptual hash via imagehash (fast, default)

  • SSIM : structural similarity via scikit-image (more accurate)

ractogateway.pipelines.video_processor._extractor.extract_frames(video_path, *, fps=1.0, max_frames=None, frame_format='JPEG', max_process_workers=4)[source]

Extract frames from video_path at fps frames-per-second.

Returns a list of (frame_id, timestamp_seconds, image_bytes) tuples sorted by frame_id. Uses a ProcessPoolExecutor for speed.

Return type:

list[tuple[int, float, bytes]]

ractogateway.pipelines.video_processor._extractor.extract_frames_window(video_path, *, fps=1.0, max_frames=None, frame_format='JPEG', start_time_seconds=0.0, end_time_seconds=None)[source]

Extract frames from a bounded time window of video_path.

Timestamps in returned tuples are absolute to the original source video. This path is intentionally single-process because passive windows are small and this avoids process-pool overhead for short clips.

Return type:

list[tuple[int, float, bytes]]

ractogateway.pipelines.video_processor._extractor.load_frames_from_paths(frame_paths, *, frame_format='JPEG')[source]

Load pre-extracted frame images from disk.

Timestamps are inferred as sequential integers (0, 1, 2 …) since the user skipped the extraction step.

Return type:

list[tuple[int, float, bytes]]

ractogateway.pipelines.video_processor._extractor.compute_similarity(img_bytes_a, img_bytes_b, method)[source]

Compute similarity % between two images using method.

Return type:

float

ractogateway.pipelines.video_processor._extractor.deduplicate_frames(raw_frames, *, similarity_threshold, method)[source]

Deduplicate raw_frames using the chosen similarity method.

A frame is discarded when its similarity to the last kept frame is >= similarity_threshold (e.g. 90 %).

Parameters:
  • raw_frames (list[tuple[int, float, bytes]]) – List of (frame_id, timestamp, image_bytes) as returned by extract_frames().

  • similarity_threshold (float) – Percentage threshold (0-100). Frames >= this are dropped.

  • method (DeduplicationMethod) – DeduplicationMethod.PHASH or DeduplicationMethod.SSIM.

Return type:

list[FrameEntry]

Returns:

list[FrameEntry] – All frames with kept flag set appropriately.

ractogateway.pipelines.video_processor._extractor.deduplicate_frames_fast(raw_frames, *, similarity_threshold, method, max_hash_workers=4)[source]

DSA-optimised deduplication for the async pipeline.

pHash fast path — two-stage algorithm:

  1. Pre-compute all perceptual hashes in parallel via ThreadPoolExecutor (O(n/k) wall-time with k workers instead of O(n) sequential).

  2. Sequential dedup using O(1) integer XOR + popcount (Hamming distance), bypassing PIL/imagehash entirely after the pre-computation stage.

This converts what was O(n) sequential CPU work into O(n/k) parallel work plus O(n) trivial bit-arithmetic — a significant speedup for large frame sets (20-200+ frames).

SSIM path — falls back to deduplicate_frames() because SSIM comparison is inherently sequential (each frame compared to the last kept frame, which is only known after the previous step).

Return type:

list[FrameEntry]