OpenCV and PyTorch: Complete Computer Vision Pipeline
OpenCV and PyTorch are the two pillars of the modern computer vision ecosystem. OpenCV excels at image acquisition, preprocessing, and post-processing: traditional operations, webcam and video management, morphological transforms, classical filters. PyTorch brings deep learning: neural networks, GPU computing, end-to-end training. Together they form a complete CV pipeline from raw pixel acquisition to intelligent prediction.
In this article we will build a complete computer vision pipeline: from video acquisition with OpenCV, through preprocessing, to PyTorch/YOLO inference, frame annotation, and production-grade logging and alerting. A system you can deploy directly.
What You Will Learn
- OpenCV fundamentals: image/video reading, color spaces, morphological operations
- OpenCV-PyTorch integration: tensor conversion, optimized pipeline
- Real-time video acquisition with buffer management and threading
- Pre-processing pipeline: letterbox resize, normalize, batch preparation
- Optimized inference: batch processing, async inference
- Post-processing: NMS, coordinate transformation, frame annotation
- Optical flow and background subtraction for motion analysis
- Multi-camera pipeline and RTSP stream processing
- Detection event logging and alerting system
- Performance optimization: threading, GPU streams, profiling
1. OpenCV Fundamentals for CV Pipelines
1.1 Color Spaces and Conversions
A critical and often overlooked point: OpenCV uses BGR format by default, while PyTorch (and PIL) use RGB. Confusing the two produces catastrophic results: a model pre-trained on RGB images will receive inverted channels, degrading accuracy significantly without any obvious error. Always convert explicitly.
import cv2
import numpy as np
import torch
# ---- Image loading and conversions ----
def load_image_rgb(path: str) -> np.ndarray:
"""Load image in RGB format (correct for PyTorch)."""
img_bgr = cv2.imread(path)
if img_bgr is None:
raise FileNotFoundError(f"Image not found: {path}")
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def bgr_to_torch(img_bgr: np.ndarray) -> torch.Tensor:
"""
Convert OpenCV BGR image to normalized PyTorch tensor.
BGR [H, W, C] uint8 -> RGB [C, H, W] float32 in [0, 1]
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float() / 255.0
return tensor
def torch_to_bgr(tensor: torch.Tensor) -> np.ndarray:
"""
Convert PyTorch tensor to OpenCV BGR image.
RGB [C, H, W] float32 -> BGR [H, W, C] uint8
"""
img_rgb = (tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
# ---- Useful color space conversions ----
def analyze_color_spaces(img_bgr: np.ndarray) -> dict:
"""Analyze image in multiple color spaces."""
return {
'bgr': img_bgr,
'rgb': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB),
'gray': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),
'hsv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV),
'lab': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB),
'yuv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YUV),
}
# ---- Morphological operations ----
def apply_morphology(gray: np.ndarray, operation: str = 'opening',
kernel_size: int = 5) -> np.ndarray:
"""
Morphological operations for pre/post-processing.
opening = erosion + dilation (removes small noise)
closing = dilation + erosion (closes small holes)
"""
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
ops = {
'erode': cv2.erode(gray, kernel),
'dilate': cv2.dilate(gray, kernel),
'opening': cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel),
'closing': cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel),
'gradient': cv2.morphologyEx(gray, cv2.MORPH_GRADIENT, kernel),
'tophat': cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel)
}
return ops.get(operation, gray)
# ---- Edge and contour detection ----
def detect_edges_and_contours(img_bgr: np.ndarray) -> tuple:
"""Classical pipeline: blur -> Canny -> contours."""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, threshold1=50, threshold2=150)
contours, hierarchy = cv2.findContours(
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filter contours by minimum area
significant = [c for c in contours if cv2.contourArea(c) > 500]
return edges, significant
# ---- Preprocessing for DL models (letterbox resize) ----
def preprocess_for_model(img_bgr: np.ndarray,
target_size: tuple = (640, 640),
mean: list = [0.485, 0.456, 0.406],
std: list = [0.229, 0.224, 0.225]) -> tuple:
"""
Complete pipeline: BGR image -> normalized tensor ready for inference.
Uses letterbox resize to maintain aspect ratio.
"""
h, w = img_bgr.shape[:2]
target_h, target_w = target_size
scale = min(target_h / h, target_w / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Padding to reach target_size
pad_h = target_h - new_h
pad_w = target_w - new_w
padded = cv2.copyMakeBorder(
resized,
pad_h // 2, pad_h - pad_h // 2,
pad_w // 2, pad_w - pad_w // 2,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# BGR -> RGB -> float -> normalize -> tensor
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1) # HWC -> CHW
mean_t = torch.tensor(mean).view(3, 1, 1)
std_t = torch.tensor(std).view(3, 1, 1)
tensor = (tensor - mean_t) / std_t
return tensor.unsqueeze(0), scale, (pad_w // 2, pad_h // 2) # batch dim + metadata
2. Non-Blocking Video Capture with Threading
2.1 VideoCapture with Frame Buffer Management
import cv2
import threading
import queue
import time
import numpy as np
from dataclasses import dataclass
@dataclass
class Frame:
"""Frame wrapper with metadata."""
data: np.ndarray
timestamp: float
frame_id: int
class ThreadedVideoCapture:
"""
VideoCapture with reading in a separate thread.
Prevents frame drops caused by slow processing:
GPU inference does not block camera reading.
"""
def __init__(self, source, max_buffer_size: int = 5):
self.cap = cv2.VideoCapture(source)
if not self.cap.isOpened():
raise RuntimeError(f"Cannot open: {source}")
# Optimize for low latency
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.buffer = queue.Queue(maxsize=max_buffer_size)
self.stopped = False
self.frame_id = 0
# Start reader thread
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
def _read_frames(self) -> None:
"""Thread worker: reads frames continuously."""
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
self.stopped = True
break
frame_obj = Frame(
data=frame,
timestamp=time.time(),
frame_id=self.frame_id
)
self.frame_id += 1
# Drop old frames if buffer is full (always keep latest)
if self.buffer.full():
try:
self.buffer.get_nowait()
except queue.Empty:
pass
self.buffer.put(frame_obj)
def read(self) -> Frame | None:
"""Read next available frame."""
try:
return self.buffer.get(timeout=1.0)
except queue.Empty:
return None
def get_fps(self) -> float:
return self.cap.get(cv2.CAP_PROP_FPS)
def get_resolution(self) -> tuple:
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def release(self) -> None:
self.stopped = True
self.cap.release()
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
3. Complete CV Pipeline: OpenCV + YOLO
import cv2
import torch
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from ultralytics import YOLO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Single detection with all metadata."""
class_name: str
class_id: int
confidence: float
bbox: tuple # x1, y1, x2, y2
timestamp: float = field(default_factory=time.time)
frame_id: int = 0
def to_dict(self) -> dict:
return {
'class': self.class_name,
'confidence': round(self.confidence, 3),
'bbox': list(self.bbox),
'timestamp': self.timestamp,
'frame_id': self.frame_id
}
class CVPipeline:
"""
Complete Computer Vision Pipeline:
Acquisition -> Preprocessing -> Inference -> Post-processing -> Output
"""
def __init__(
self,
model_path: str,
source, # int (webcam), str (file/RTSP)
conf_threshold: float = 0.4,
iou_threshold: float = 0.45,
target_classes: list | None = None, # None = all classes
output_dir: str = 'output',
save_video: bool = False,
alert_classes: list | None = None # classes that trigger alerts
):
self.model = YOLO(model_path)
self.conf = conf_threshold
self.iou = iou_threshold
self.target_classes = target_classes
self.alert_classes = alert_classes or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.source = source
self.save_video = save_video
# Stats
self.frame_count = 0
self.total_detections = 0
self.start_time = None
self.fps_history = []
def run(self) -> None:
"""Start the processing pipeline."""
logger.info(f"Starting pipeline: source={self.source}")
cap = cv2.VideoCapture(self.source)
if not cap.isOpened():
raise RuntimeError(f"Cannot open source: {self.source}")
# Setup video writer if requested
writer = None
if self.save_video:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = str(self.output_dir / f"output_{timestamp}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
logger.info(f"Saving video to: {out_path}")
self.start_time = time.time()
detection_log = []
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
# Inference
detections = self._run_inference(frame, self.frame_count)
# Visualization
annotated = self._annotate_frame(frame, detections)
# Stats overlay
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0.0
self.fps_history.append(fps)
annotated = self._add_stats_overlay(annotated, fps, len(detections))
if writer:
writer.write(annotated)
cv2.imshow('CV Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
for det in detections:
detection_log.append(det.to_dict())
if det.class_name in self.alert_classes:
self._trigger_alert(det)
self.frame_count += 1
self.total_detections += len(detections)
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
log_path = self.output_dir / 'detection_log.json'
with open(log_path, 'w') as f:
json.dump(detection_log, f, indent=2)
self._print_stats()
def _run_inference(self, frame: np.ndarray, frame_id: int) -> list[Detection]:
"""Run YOLO inference on a single frame."""
results = self.model.predict(
frame, conf=self.conf, iou=self.iou, verbose=False
)
detections = []
for box in results[0].boxes:
class_name = self.model.names[int(box.cls[0])]
if self.target_classes and class_name not in self.target_classes:
continue
x1, y1, x2, y2 = [int(c) for c in box.xyxy[0]]
detections.append(Detection(
class_name=class_name,
class_id=int(box.cls[0]),
confidence=float(box.conf[0]),
bbox=(x1, y1, x2, y2),
frame_id=frame_id
))
return detections
def _annotate_frame(self, frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
"""Annotate the frame with bounding boxes and labels."""
annotated = frame.copy()
np.random.seed(42)
colors = {name: tuple(np.random.randint(50, 255, 3).tolist())
for name in self.model.names.values()}
for det in detections:
x1, y1, x2, y2 = det.bbox
color = colors.get(det.class_name, (0, 255, 0))
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0] + 5, y1), color, -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated
def _add_stats_overlay(self, frame: np.ndarray, fps: float,
n_detections: int) -> np.ndarray:
"""Add real-time statistics overlay."""
h = frame.shape[0]
stats = [
f"FPS: {fps:.1f}",
f"Detections: {n_detections}",
f"Frame: {self.frame_count}",
f"Avg FPS: {np.mean(self.fps_history[-30:]):.1f}" if self.fps_history else "Avg FPS: -"
]
for i, text in enumerate(stats):
cv2.putText(frame, text, (10, h - 100 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return frame
def _trigger_alert(self, det: Detection) -> None:
"""Handle an alert event for critical classes."""
logger.warning(f"ALERT: {det.class_name} detected with confidence {det.confidence:.2f} "
f"(frame {det.frame_id})")
# In production: send notification (email, Slack, webhook)
def _print_stats(self) -> None:
"""Print final session statistics."""
elapsed = time.time() - self.start_time
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
logger.info(f"\n=== Pipeline Statistics ===")
logger.info(f"Frames processed: {self.frame_count}")
logger.info(f"Total detections: {self.total_detections}")
logger.info(f"Total time: {elapsed:.1f}s")
logger.info(f"Average FPS: {avg_fps:.1f}")
# Usage
if __name__ == '__main__':
pipeline = CVPipeline(
model_path='yolo26m.pt',
source=0, # webcam (or 'video.mp4', 'rtsp://...')
conf_threshold=0.4,
target_classes=['person', 'car', 'truck'],
alert_classes=['person'],
output_dir='output',
save_video=True
)
pipeline.run()
4. Advanced OpenCV for Post-Processing
import cv2
import numpy as np
# ---- Background Subtraction (MOG2) ----
def setup_background_subtraction():
"""
MOG2: Mixture of Gaussians for motion detection.
Use as a pre-filter before expensive DL inference.
"""
subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=50,
detectShadows=True
)
return subtractor
def detect_motion(frame_bgr: np.ndarray, subtractor,
min_contour_area: int = 1000) -> list[tuple]:
"""Detect motion regions in a frame."""
fg_mask = subtractor.apply(frame_bgr)
# Clean up mask with morphological operations
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion_regions = []
for c in contours:
if cv2.contourArea(c) >= min_contour_area:
x, y, w, h = cv2.boundingRect(c)
motion_regions.append((x, y, x + w, y + h))
return motion_regions
# ---- Optical Flow (Lucas-Kanade) ----
def compute_sparse_optical_flow(prev_gray: np.ndarray,
curr_gray: np.ndarray,
prev_points: np.ndarray) -> tuple:
"""
Lucas-Kanade sparse optical flow for tracking specific points.
Useful for tracking keypoints detected in the previous frame.
"""
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
good_prev = prev_points[status.flatten() == 1]
good_curr = curr_points[status.flatten() == 1]
# Calculate average motion speed
if len(good_prev) > 0:
flow_vectors = good_curr - good_prev
avg_speed = np.mean(np.linalg.norm(flow_vectors, axis=1))
else:
avg_speed = 0.0
return good_curr, good_prev, avg_speed
# ---- CSRT Multi-Object Tracker ----
class ObjectTracker:
"""
Multi-object tracker to maintain object identity across frames
without running inference on every single frame.
"""
def __init__(self, tracker_type: str = 'CSRT'):
self.tracker_type = tracker_type
self.trackers = [] # list of (tracker, class_name, id)
self.next_id = 0
def add_tracker(self, frame: np.ndarray, bbox: tuple, class_name: str) -> int:
"""Add a tracker for a new object."""
x1, y1, x2, y2 = bbox
cv2_bbox = (x1, y1, x2 - x1, y2 - y1) # OpenCV format: x, y, w, h
tracker = cv2.TrackerCSRT_create()
tracker.init(frame, cv2_bbox)
obj_id = self.next_id
self.trackers.append((tracker, class_name, obj_id))
self.next_id += 1
return obj_id
def update(self, frame: np.ndarray) -> list[dict]:
"""Update all trackers with the new frame."""
active_objects = []
failed_trackers = []
for i, (tracker, class_name, obj_id) in enumerate(self.trackers):
success, cv2_bbox = tracker.update(frame)
if success:
x, y, w, h = [int(v) for v in cv2_bbox]
active_objects.append({
'id': obj_id,
'class_name': class_name,
'bbox': (x, y, x + w, y + h)
})
else:
failed_trackers.append(i)
# Remove failed trackers (backward to avoid index shifting)
for i in reversed(failed_trackers):
self.trackers.pop(i)
return active_objects
def clear(self) -> None:
self.trackers = []
5. Multi-Camera Pipeline and RTSP Streams
import cv2
import threading
import queue
import time
import numpy as np
from ultralytics import YOLO
from dataclasses import dataclass
@dataclass
class CameraFrame:
camera_id: int
frame: np.ndarray
timestamp: float
class MultiCameraPipeline:
"""
Multi-camera pipeline with:
- One reader thread per camera
- Shared frame queue for batch inference
- Single shared GPU model
"""
def __init__(self, sources: list, model_path: str, batch_size: int = 4):
self.sources = sources
self.model = YOLO(model_path)
self.batch_size = batch_size
self.frame_queue = queue.Queue(maxsize=batch_size * 2)
self.stopped = False
def _camera_reader(self, camera_id: int, source) -> None:
"""Thread worker for a single camera with auto-reconnect."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if isinstance(source, str) and 'rtsp' in source:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
while not self.stopped:
ret, frame = cap.read()
if not ret:
logger.warning(f"Camera {camera_id} disconnected, reconnecting...")
time.sleep(1.0)
cap = cv2.VideoCapture(source) # Reconnect
continue
frame_obj = CameraFrame(camera_id=camera_id, frame=frame,
timestamp=time.time())
try:
self.frame_queue.put(frame_obj, timeout=0.1)
except queue.Full:
pass # Drop frame if queue is full
cap.release()
def run(self) -> None:
"""Start all reader threads and process batches from the queue."""
threads = []
for i, source in enumerate(self.sources):
t = threading.Thread(
target=self._camera_reader, args=(i, source), daemon=True
)
t.start()
threads.append(t)
logger.info(f"Pipeline started: {len(self.sources)} cameras")
batch = []
while not self.stopped:
try:
frame_obj = self.frame_queue.get(timeout=0.5)
batch.append(frame_obj)
if len(batch) >= self.batch_size:
self._process_batch(batch)
batch = []
except queue.Empty:
if batch:
self._process_batch(batch)
batch = []
def _process_batch(self, batch: list[CameraFrame]) -> None:
"""Batch inference on GPU - more efficient than single-frame inference."""
frames = [f.frame for f in batch]
# Batch YOLO inference
results = self.model.predict(
frames, conf=0.4, iou=0.45, verbose=False
)
for frame_obj, result in zip(batch, results):
n_det = len(result.boxes)
if n_det > 0:
logger.info(f"Camera {frame_obj.camera_id}: {n_det} objects detected")
def stop(self) -> None:
self.stopped = True
# Multi-camera configuration
pipeline = MultiCameraPipeline(
sources=[
0, # Local webcam
'rtsp://192.168.1.100/stream1', # IP Camera 1 (RTSP)
'rtsp://192.168.1.101/stream1', # IP Camera 2 (RTSP)
'videos/recording.mp4' # Video file
],
model_path='yolo26m.pt',
batch_size=4
)
# pipeline.run()
6. Profiling and GPU-Accelerated OpenCV
6.1 PyTorch Profiler for CV Pipelines
Before optimizing, measure. The PyTorch Profiler is the most powerful tool for identifying bottlenecks in a CV pipeline: it shows exactly where time is spent (preprocessing, inference, postprocessing, CPU/GPU data transfer) down to the kernel level.
import torch
import torch.profiler as profiler
import cv2
import numpy as np
from ultralytics import YOLO
def profile_cv_pipeline(model_path: str,
video_source,
n_frames: int = 100,
output_dir: str = './profiler_logs') -> None:
"""
Profile the CV pipeline with PyTorch Profiler.
Generates logs compatible with TensorBoard for visual analysis.
Output: profiler_logs/ (view with: tensorboard --logdir profiler_logs)
"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_source)
frame_count = 0
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(
wait=5, # Skip first 5 iterations (warmup variability)
warmup=5, # Warmup 5 iterations (profiler overhead)
active=20, # Profile 20 iterations
repeat=2 # Repeat the cycle 2 times
),
on_trace_ready=profiler.tensorboard_trace_handler(output_dir),
record_shapes=True,
profile_memory=True,
with_stack=True
) as prof:
while frame_count < n_frames:
ret, frame = cap.read()
if not ret:
break
with torch.profiler.record_function("preprocessing"):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0)
if torch.cuda.is_available():
tensor = tensor.cuda()
with torch.profiler.record_function("inference"):
with torch.no_grad():
results = model.predict(frame, verbose=False)
with torch.profiler.record_function("postprocessing"):
boxes = results[0].boxes
n_det = len(boxes)
prof.step()
frame_count += 1
cap.release()
print(f"Profiling complete. Results in: {output_dir}")
# Print top operations table (useful without TensorBoard)
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=15
))
# Lightweight production profiler without TensorBoard
class PipelineProfiler:
"""Lightweight profiler for continuous monitoring in production."""
def __init__(self, window_size: int = 100):
import collections
self.window_size = window_size
self.times: dict = {
'preprocess': collections.deque(maxlen=window_size),
'inference': collections.deque(maxlen=window_size),
'postprocess': collections.deque(maxlen=window_size),
}
def record(self, stage: str, duration_ms: float) -> None:
if stage in self.times:
self.times[stage].append(duration_ms)
def report(self) -> dict:
"""Return rolling statistics for the last N frames."""
stats = {}
for stage, times in self.times.items():
if times:
arr = np.array(times)
stats[stage] = {
'mean_ms': float(np.mean(arr)),
'p50_ms': float(np.percentile(arr, 50)),
'p95_ms': float(np.percentile(arr, 95)),
'p99_ms': float(np.percentile(arr, 99)),
}
return stats
def print_report(self) -> None:
stats = self.report()
print("\n=== Pipeline Performance ===")
for stage, s in stats.items():
print(f" {stage:12s}: mean={s['mean_ms']:.1f}ms "
f"p95={s['p95_ms']:.1f}ms p99={s['p99_ms']:.1f}ms")
6.2 OpenCV with CUDA Acceleration
OpenCV provides a cuda module (opencv-contrib-python compiled with CUDA) that accelerates preprocessing operations up to 10x on NVIDIA GPUs. Particularly valuable for resize, gaussian blur, and color conversions in high-frequency pipelines where preprocessing can become the bottleneck.
import cv2
import numpy as np
import torch
class CUDAPreprocessor:
"""
GPU-accelerated OpenCV preprocessor.
Requires: pip install opencv-contrib-python
compiled with CUDA support.
Typical speedup vs CPU: 5-10x for resize+cvtColor on 1080p frames.
"""
def __init__(self, target_size: tuple = (640, 640)):
self.target_size = target_size
self.has_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.has_cuda:
# Pre-allocate CUDA stream for async pipeline
self.stream = cv2.cuda_Stream()
self.gpu_frame = cv2.cuda_GpuMat()
self.gpu_rgb = cv2.cuda_GpuMat()
self.gpu_resized = cv2.cuda_GpuMat()
def preprocess_cuda(self, img_bgr: np.ndarray) -> np.ndarray:
"""
GPU preprocessing: BGR->RGB + Resize.
Returns: float32 RGB array normalized [0, 1]
"""
if not self.has_cuda:
return self._preprocess_cpu(img_bgr)
# Upload to GPU
self.gpu_frame.upload(img_bgr, self.stream)
# BGR -> RGB on GPU
cv2.cuda.cvtColor(self.gpu_frame, cv2.COLOR_BGR2RGB,
self.gpu_rgb, stream=self.stream)
# Resize on GPU
cv2.cuda.resize(self.gpu_rgb, self.target_size,
self.gpu_resized,
interpolation=cv2.INTER_LINEAR,
stream=self.stream)
# Download from GPU (synchronous before PyTorch inference)
result = self.gpu_resized.download(stream=self.stream)
self.stream.waitForCompletion()
return result.astype(np.float32) / 255.0
def _preprocess_cpu(self, img_bgr: np.ndarray) -> np.ndarray:
"""CPU fallback if CUDA is unavailable."""
rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, self.target_size,
interpolation=cv2.INTER_LINEAR)
return resized.astype(np.float32) / 255.0
def preprocess_batch_cuda(self, frames: list[np.ndarray]) -> torch.Tensor:
"""
Batch GPU preprocessing.
Converts a list of frames into a batch tensor ready for inference.
Maximizes throughput for multi-camera pipelines.
"""
preprocessed = [self.preprocess_cuda(f) for f in frames]
batch = np.stack(preprocessed) # [B, H, W, C]
batch_tensor = torch.from_numpy(batch).permute(0, 3, 1, 2)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda(non_blocking=True)
return batch_tensor
7. Frame Quality Assessment and Adaptive Filtering
Before sending a frame to inference, it's worth evaluating its quality. Blurry, overexposed, or low-contrast frames produce false negatives and reduce system reliability. Frame quality filtering typically improves overall pipeline precision by 5-15% at no computational cost - you're trading quantity of inferences for quality.
import cv2
import numpy as np
from dataclasses import dataclass
@dataclass
class FrameQuality:
"""Quality metrics for a single frame."""
blur_score: float # Laplacian variance (high = sharp)
brightness: float # Mean brightness [0, 255]
contrast: float # Brightness standard deviation
is_valid: bool # Frame acceptable for inference?
reject_reason: str # Rejection reason ('', 'blur', 'dark', etc.)
class FrameQualityFilter:
"""
Filters low-quality frames before inference.
Reduces false negatives in surveillance systems.
"""
def __init__(self,
blur_threshold: float = 100.0,
brightness_min: float = 30.0,
brightness_max: float = 220.0,
contrast_min: float = 15.0):
self.blur_threshold = blur_threshold
self.brightness_min = brightness_min
self.brightness_max = brightness_max
self.contrast_min = contrast_min
def assess(self, frame_bgr: np.ndarray) -> FrameQuality:
"""Assess frame quality."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Blur detection: Laplacian variance
# Sharp frames have strong edges -> high variance
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
reject_reason = ''
if laplacian_var < self.blur_threshold:
reject_reason = f'blur (var={laplacian_var:.1f})'
elif brightness < self.brightness_min:
reject_reason = f'too_dark (mean={brightness:.1f})'
elif brightness > self.brightness_max:
reject_reason = f'overexposed (mean={brightness:.1f})'
elif contrast < self.contrast_min:
reject_reason = f'low_contrast (std={contrast:.1f})'
return FrameQuality(
blur_score=laplacian_var,
brightness=brightness,
contrast=contrast,
is_valid=(reject_reason == ''),
reject_reason=reject_reason
)
def compute_ssim(img1: np.ndarray, img2: np.ndarray) -> float:
"""
Structural Similarity Index between two images.
Range: [-1, 1], where 1 = identical, 0 = uncorrelated.
More perceptually accurate than MSE/PSNR for detecting scene changes.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(float)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(float)
C1 = (0.01 * 255) ** 2
C2 = (0.03 * 255) ** 2
mu1, mu2 = gray1.mean(), gray2.mean()
sigma1, sigma2 = gray1.std(), gray2.std()
sigma12 = np.mean((gray1 - mu1) * (gray2 - mu2))
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1**2 + sigma2**2 + C2)
return float(numerator / denominator)
class SmartMotionGate:
"""
Combines background subtraction + SSIM for intelligent gating.
Avoids inferring on identical frames (static scenes) or corrupted frames.
"""
def __init__(self, ssim_change_threshold: float = 0.95):
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=200, varThreshold=40, detectShadows=False
)
self.quality_filter = FrameQualityFilter()
self.ssim_threshold = ssim_change_threshold
self.prev_frame = None
def should_infer(self, frame: np.ndarray) -> tuple[bool, str]:
"""
Decide whether to run inference on this frame.
Returns: (should_infer, reason)
"""
# 1. Quality check
quality = self.quality_filter.assess(frame)
if not quality.is_valid:
return False, f"low_quality: {quality.reject_reason}"
# 2. Motion detection (fast)
fg_mask = self.subtractor.apply(frame)
motion_ratio = np.sum(fg_mask > 0) / fg_mask.size
if motion_ratio < 0.005: # less than 0.5% of pixels in motion
return False, "no_motion"
# 3. SSIM check (structural changes, not just noise)
if self.prev_frame is not None:
ssim = compute_ssim(frame, self.prev_frame)
if ssim > self.ssim_threshold:
return False, f"scene_static (ssim={ssim:.3f})"
self.prev_frame = frame.copy()
return True, "ok"
8. Best Practices and Performance
Typical Performance: YOLOv8m in OpenCV Pipeline
| Hardware | FPS (640x640) | Latency | Optimization |
|---|---|---|---|
| NVIDIA A100 | ~220 FPS | ~4.5ms | TensorRT FP16 |
| NVIDIA RTX 4090 | ~180 FPS | ~5.6ms | TensorRT FP16 |
| NVIDIA RTX 3080 | ~120 FPS | ~8.3ms | ONNX Runtime |
| Intel i9 CPU (ONNX) | ~18 FPS | ~55ms | OpenVINO |
| Raspberry Pi 5 | ~3 FPS | ~330ms | YOLOv8n INT8 |
Performance Optimization Tips
- Read in a separate thread: Never block the inference loop with camera I/O. Use ThreadedVideoCapture to maximize GPU utilization. Without threading, the GPU waits idle while the CPU reads frames.
- BGR->RGB conversion once only: Do not convert at every call. Convert at read time and maintain RGB throughout the entire pipeline.
- Always use torch.no_grad() in inference: Disables the autograd computation graph: -30% memory usage, +10% speed. In production, also use model.eval().
- Batch inference for multiple streams: Accumulate frames from multiple cameras and infer in batches. GPU throughput scales nearly linearly with batch size up to hardware limits.
- TensorRT for NVIDIA production: Always export to TensorRT for maximum speed on NVIDIA hardware. YOLOv8m: from 50 to 180 FPS with TensorRT FP16.
- Background subtraction as motion gating: In scenes with little movement, only run DL inference on frames where MOG2 detects motion. Saves 60-70% of compute in typical surveillance scenarios.
- Reduce resolution for real-time applications: If 640x640 is too slow, try 320x320 or 416x416. Small objects will suffer but overall throughput doubles or triples.
- Frame quality filter: Discard blurry frames (blur score < 100) and underexposed frames before inference. Reduces false negatives and improves system precision.
- SSIM for static scenes: If two consecutive frames have SSIM > 0.95, the scene hasn't changed. Skip inference and reuse previous results.
- OpenCV CUDA for preprocessing: If you have an NVIDIA GPU and opencv-contrib compiled with CUDA, preprocessing (resize, cvtColor) is 5-10x faster on GPU.
Common Mistakes to Avoid
- No model warmup: The first inference is always slower (JIT compilation, CUDA initialization). Run 3-5 dummy inferences before measuring performance.
- Forgetting model.eval(): In train mode, Dropout and BatchNorm behave differently. Always call
model.eval()before inference. - Missing frame.copy() before annotation: Don't annotate the original frame directly if you're also using it for inference. Always use
frame.copy(). - Not releasing VideoCapture: Every opened
cv2.VideoCapturemust be released withcap.release(). Use context managers or try/finally. - RTSP without reconnect logic: RTSP connections drop. Every camera reader must implement retry logic with exponential backoff.
Conclusions
We built a complete CV pipeline integrating OpenCV and PyTorch/YOLO, covering every layer of a production system:
- OpenCV for acquisition, preprocessing, post-processing, visualization, and classical algorithms
- BGR/RGB conversion - the most critical detail to get right from the start
- Threading for non-blocking video capture that maximizes GPU utilization
- Production-ready pipeline with detection logging, alerting, and video saving
- Background subtraction (MOG2) and optical flow for motion analysis and compute gating
- CSRT multi-object tracking to maintain identity between inference calls
- Multi-camera batch inference with RTSP auto-reconnect for surveillance scenarios
- PyTorch Profiler for pinpointing exact bottlenecks at the kernel level
- OpenCV CUDA for GPU-accelerated preprocessing (5-10x vs CPU)
- Frame Quality Assessment to filter corrupted frames before inference
- Smart Motion Gate combining quality check + MOG2 + SSIM for intelligent gating
- Performance benchmarks on real hardware: A100 (220 FPS) to Raspberry Pi (3 FPS)







