OpenCV e PyTorch: Pipeline Completa di Computer Vision
OpenCV e PyTorch sono i due pilastri dell'ecosistema computer vision moderno. OpenCV eccelle nell'acquisizione, preprocessing e post-processing delle immagini: operazioni tradizionali, gestione di webcam e video, trasformazioni morfologiche, filtri classici. PyTorch porta il deep learning: reti neurali, GPU computing, training end-to-end. Insieme, formano una pipeline CV completa che va dall'acquisizione raw dei pixel alla predizione intelligente.
In questo articolo costruiremo una pipeline completa di computer vision: dall'acquisizione video con OpenCV, al preprocessing, all'inference con un modello PyTorch/YOLO, fino alla visualizzazione e logging dei risultati. Un sistema production-ready che potete deployare direttamente.
Cosa Imparerai
- OpenCV fondamentali: lettura immagini/video, spazi colore, operazioni morfologiche
- Integrazione OpenCV-PyTorch: conversione tensori, pipeline ottimizzata
- Acquisizione video in tempo reale con buffer management
- Pre-processing pipeline: resize, normalize, batch preparation
- Inference ottimizzata: batch processing, async inference
- Post-processing: NMS, coordinate transformation, annotazione frame
- Multi-camera pipeline e RTSP stream processing
- Sistema di logging e alerting per eventi rilevati
- Ottimizzazione performance: threading, GPU streams, profiling
1. OpenCV Fondamentali per CV Pipeline
1.1 Spazi Colore e Conversioni
Un punto critico spesso trascurato: OpenCV usa il formato BGR per default, mentre PyTorch (e PIL) usano RGB. Confondere i due produce risultati disastrosi: il modello pre-addestrato su immagini RGB ricevera canali invertiti. Sempre convertire!
import cv2
import numpy as np
import torch
# ---- Lettura e conversioni ----
def load_image_rgb(path: str) -> np.ndarray:
"""Carica immagine in formato RGB (corretto per PyTorch)."""
img_bgr = cv2.imread(path)
if img_bgr is None:
raise FileNotFoundError(f"Immagine non trovata: {path}")
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def bgr_to_torch(img_bgr: np.ndarray) -> torch.Tensor:
"""
Converte immagine OpenCV BGR in tensor PyTorch normalizzato.
BGR [H, W, C] uint8 -> RGB [C, H, W] float32 normalizzato [0,1]
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float() / 255.0
return tensor
def torch_to_bgr(tensor: torch.Tensor) -> np.ndarray:
"""
Converte tensor PyTorch in immagine OpenCV BGR.
RGB [C, H, W] float32 -> BGR [H, W, C] uint8
"""
img_rgb = (tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
# ---- Spazi colore utili ----
def analyze_color_spaces(img_bgr: np.ndarray) -> dict:
"""Analizza l'immagine in diversi spazi colore."""
return {
'bgr': img_bgr,
'rgb': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB),
'gray': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),
'hsv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV),
'lab': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB),
'yuv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YUV),
}
# ---- Operazioni morfologiche ----
def apply_morphology(gray: np.ndarray, operation: str = 'opening',
kernel_size: int = 5) -> np.ndarray:
"""
Operazioni morfologiche per pre/post-processing.
opening = erosione + dilatazione (rimuove rumore piccolo)
closing = dilatazione + erosione (chiude piccoli buchi)
"""
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
ops = {
'erode': cv2.erode(gray, kernel),
'dilate': cv2.dilate(gray, kernel),
'opening': cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel),
'closing': cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel),
'gradient': cv2.morphologyEx(gray, cv2.MORPH_GRADIENT, kernel),
'tophat': cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel)
}
return ops.get(operation, gray)
# ---- Rilevamento contorni e feature classiche ----
def detect_edges_and_contours(img_bgr: np.ndarray) -> tuple:
"""Pipeline classica: blur -> Canny -> contorni."""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, threshold1=50, threshold2=150)
contours, hierarchy = cv2.findContours(
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filtra contorni per area minima
significant_contours = [c for c in contours if cv2.contourArea(c) > 500]
return edges, significant_contours
# ---- Preprocessing per modelli DL ----
def preprocess_for_model(img_bgr: np.ndarray,
target_size: tuple[int,int] = (640, 640),
mean: list = [0.485, 0.456, 0.406],
std: list = [0.229, 0.224, 0.225]) -> torch.Tensor:
"""
Pipeline completa: BGR immagine -> tensor normalizzato pronto per inference.
"""
# Letterbox resize (mantiene aspect ratio)
h, w = img_bgr.shape[:2]
target_h, target_w = target_size
scale = min(target_h / h, target_w / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Padding per raggiungere target_size
pad_h = target_h - new_h
pad_w = target_w - new_w
padded = cv2.copyMakeBorder(
resized,
pad_h // 2, pad_h - pad_h // 2,
pad_w // 2, pad_w - pad_w // 2,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# BGR -> RGB -> float -> normalize -> tensor
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1) # HWC -> CHW
mean_t = torch.tensor(mean).view(3, 1, 1)
std_t = torch.tensor(std).view(3, 1, 1)
tensor = (tensor - mean_t) / std_t
return tensor.unsqueeze(0), scale, (pad_w // 2, pad_h // 2) # batch dim + metadata
2. Acquisizione Video e Webcam
2.1 VideoCapture e Frame Buffer
import cv2
import threading
import queue
import time
from dataclasses import dataclass
@dataclass
class Frame:
"""Wrapper per un frame con metadata."""
data: np.ndarray
timestamp: float
frame_id: int
class ThreadedVideoCapture:
"""
VideoCapture con lettura in thread separato.
Previene il drop di frame dovuto al processing lento:
la GPU che fa inference non blocca la lettura della camera.
"""
def __init__(self, source, max_buffer_size: int = 5):
self.cap = cv2.VideoCapture(source)
if not self.cap.isOpened():
raise RuntimeError(f"Impossibile aprire: {source}")
# Ottimizza per latenza bassa
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.buffer = queue.Queue(maxsize=max_buffer_size)
self.stopped = False
self.frame_id = 0
# Avvia thread di lettura
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
def _read_frames(self) -> None:
"""Thread worker: legge frame continuamente."""
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
self.stopped = True
break
frame_obj = Frame(
data=frame,
timestamp=time.time(),
frame_id=self.frame_id
)
self.frame_id += 1
# Scarta frame vecchi se il buffer e pieno
if self.buffer.full():
try:
self.buffer.get_nowait()
except queue.Empty:
pass
self.buffer.put(frame_obj)
def read(self) -> Frame | None:
"""Leggi prossimo frame disponibile."""
try:
return self.buffer.get(timeout=1.0)
except queue.Empty:
return None
def get_fps(self) -> float:
return self.cap.get(cv2.CAP_PROP_FPS)
def get_resolution(self) -> tuple[int, int]:
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def release(self) -> None:
self.stopped = True
self.cap.release()
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
3. Pipeline CV Completa: OpenCV + YOLO
import cv2
import torch
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from ultralytics import YOLO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Singola detection con tutti i metadata."""
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
timestamp: float = field(default_factory=time.time)
frame_id: int = 0
def to_dict(self) -> dict:
return {
'class': self.class_name,
'confidence': round(self.confidence, 3),
'bbox': list(self.bbox),
'timestamp': self.timestamp,
'frame_id': self.frame_id
}
class CVPipeline:
"""
Pipeline completa Computer Vision:
Acquisizione -> Preprocessing -> Inference -> Post-processing -> Output
"""
def __init__(
self,
model_path: str,
source, # int (webcam), str (file/RTSP)
conf_threshold: float = 0.4,
iou_threshold: float = 0.45,
target_classes: list[str] | None = None, # None = tutte le classi
output_dir: str = 'output',
save_video: bool = False,
alert_classes: list[str] | None = None # classi che triggerano alert
):
self.model = YOLO(model_path)
self.conf = conf_threshold
self.iou = iou_threshold
self.target_classes = target_classes
self.alert_classes = alert_classes or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.source = source
self.save_video = save_video
# Stats
self.frame_count = 0
self.total_detections = 0
self.start_time = None
self.fps_history = []
def run(self) -> None:
"""Avvia la pipeline di processing."""
logger.info(f"Avvio pipeline: source={self.source} modello={self.model.model_name}")
cap = cv2.VideoCapture(self.source)
if not cap.isOpened():
raise RuntimeError(f"Impossibile aprire source: {self.source}")
# Setup video writer se richiesto
writer = None
if self.save_video:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = str(self.output_dir / f"output_{timestamp}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
logger.info(f"Salvataggio video: {out_path}")
self.start_time = time.time()
detection_log = []
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
# Inference
detections = self._run_inference(frame, self.frame_count)
# Visualizzazione
annotated = self._annotate_frame(frame, detections)
# Stats overlay
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0.0
self.fps_history.append(fps)
annotated = self._add_stats_overlay(annotated, fps, len(detections))
# Salvataggio
if writer:
writer.write(annotated)
cv2.imshow('CV Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Logging detections
for det in detections:
detection_log.append(det.to_dict())
if det.class_name in self.alert_classes:
self._trigger_alert(det)
self.frame_count += 1
self.total_detections += len(detections)
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
# Salva log
log_path = self.output_dir / 'detection_log.json'
with open(log_path, 'w') as f:
json.dump(detection_log, f, indent=2)
self._print_stats()
def _run_inference(self, frame: np.ndarray, frame_id: int) -> list[Detection]:
"""Esegue YOLO inference su un singolo frame."""
results = self.model.predict(
frame, conf=self.conf, iou=self.iou, verbose=False
)
detections = []
for box in results[0].boxes:
class_name = self.model.names[int(box.cls[0])]
# Filtra per classi target se specificato
if self.target_classes and class_name not in self.target_classes:
continue
x1, y1, x2, y2 = [int(c) for c in box.xyxy[0]]
detections.append(Detection(
class_name=class_name,
class_id=int(box.cls[0]),
confidence=float(box.conf[0]),
bbox=(x1, y1, x2, y2),
frame_id=frame_id
))
return detections
def _annotate_frame(self, frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
"""Annota il frame con bounding boxes e label."""
annotated = frame.copy()
np.random.seed(42)
colors = {name: tuple(np.random.randint(50, 255, 3).tolist())
for name in self.model.names.values()}
for det in detections:
x1, y1, x2, y2 = det.bbox
color = colors.get(det.class_name, (0, 255, 0))
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0] + 5, y1), color, -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated
def _add_stats_overlay(self, frame: np.ndarray, fps: float,
n_detections: int) -> np.ndarray:
"""Aggiunge overlay con statistiche real-time."""
h = frame.shape[0]
stats = [
f"FPS: {fps:.1f}",
f"Detections: {n_detections}",
f"Frame: {self.frame_count}",
f"Avg FPS: {np.mean(self.fps_history[-30:]):.1f}" if self.fps_history else "Avg FPS: -"
]
for i, text in enumerate(stats):
cv2.putText(frame, text, (10, h - 100 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return frame
def _trigger_alert(self, det: Detection) -> None:
"""Gestisce un evento di alert per classi critiche."""
logger.warning(f"ALERT: {det.class_name} rilevato con confidenza {det.confidence:.2f} "
f"(frame {det.frame_id})")
# In produzione: invia notifica (email, Slack, webhook)
def _print_stats(self) -> None:
"""Stampa statistiche finali della sessione."""
elapsed = time.time() - self.start_time
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
logger.info(f"\n=== Statistiche Pipeline ===")
logger.info(f"Frame processati: {self.frame_count}")
logger.info(f"Detections totali: {self.total_detections}")
logger.info(f"Tempo totale: {elapsed:.1f}s")
logger.info(f"FPS medio: {avg_fps:.1f}")
# Utilizzo
if __name__ == '__main__':
pipeline = CVPipeline(
model_path='yolo26m.pt',
source=0, # webcam (o 'video.mp4', 'rtsp://...')
conf_threshold=0.4,
target_classes=['person', 'car', 'truck'],
alert_classes=['person'],
output_dir='output',
save_video=True
)
pipeline.run()
4. Operazioni OpenCV per Post-Processing
import cv2
import numpy as np
# ---- Background Subtraction (MOG2) ----
def setup_background_subtraction():
"""
MOG2: Mixture of Gaussians per rilevamento movimento.
Utile come primo filtro prima dell'inference DL.
"""
subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=50,
detectShadows=True
)
return subtractor
def detect_motion(frame_bgr: np.ndarray, subtractor,
min_contour_area: int = 1000) -> list[tuple]:
"""Rileva regioni di movimento nel frame."""
fg_mask = subtractor.apply(frame_bgr)
# Cleanup maschera
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion_regions = []
for c in contours:
if cv2.contourArea(c) >= min_contour_area:
x, y, w, h = cv2.boundingRect(c)
motion_regions.append((x, y, x+w, y+h))
return motion_regions
# ---- Optical Flow (Lucas-Kanade) ----
def compute_sparse_optical_flow(prev_gray: np.ndarray,
curr_gray: np.ndarray,
prev_points: np.ndarray) -> tuple:
"""
Lucas-Kanade optical flow per tracking di punti specifici.
Utile per tracking di keypoints rilevati nel frame precedente.
"""
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
good_prev = prev_points[status.flatten() == 1]
good_curr = curr_points[status.flatten() == 1]
# Calcola velocità media del movimento
if len(good_prev) > 0:
flow_vectors = good_curr - good_prev
avg_speed = np.mean(np.linalg.norm(flow_vectors, axis=1))
else:
avg_speed = 0.0
return good_curr, good_prev, avg_speed
# ---- CSRT Tracker (più preciso di MOSSE) ----
class ObjectTracker:
"""
Tracker multi-oggetto per mantenere l'identità degli oggetti
tra frame consecutivi senza rieseguire l'inference a ogni frame.
"""
def __init__(self, tracker_type: str = 'CSRT'):
self.tracker_type = tracker_type
self.trackers = [] # lista di (tracker, class_name, id)
self.next_id = 0
def add_tracker(self, frame: np.ndarray, bbox: tuple, class_name: str) -> int:
"""Aggiunge un tracker per un nuovo oggetto."""
x1, y1, x2, y2 = bbox
cv2_bbox = (x1, y1, x2 - x1, y2 - y1) # formato OpenCV: x, y, w, h
tracker = cv2.TrackerCSRT_create()
tracker.init(frame, cv2_bbox)
obj_id = self.next_id
self.trackers.append((tracker, class_name, obj_id))
self.next_id += 1
return obj_id
def update(self, frame: np.ndarray) -> list[dict]:
"""Aggiorna tutti i tracker con il nuovo frame."""
active_objects = []
failed_trackers = []
for i, (tracker, class_name, obj_id) in enumerate(self.trackers):
success, cv2_bbox = tracker.update(frame)
if success:
x, y, w, h = [int(v) for v in cv2_bbox]
active_objects.append({
'id': obj_id,
'class_name': class_name,
'bbox': (x, y, x + w, y + h)
})
else:
failed_trackers.append(i)
# Rimuovi tracker falliti (indietro per evitare shift indici)
for i in reversed(failed_trackers):
self.trackers.pop(i)
return active_objects
def clear(self) -> None:
self.trackers = []
5. Multi-Camera Pipeline e RTSP Stream
import cv2
import threading
import queue
import torch
from ultralytics import YOLO
from dataclasses import dataclass
@dataclass
class CameraFrame:
camera_id: int
frame: np.ndarray
timestamp: float
class MultiCameraPipeline:
"""
Pipeline multi-camera con:
- Thread separato per ogni camera
- Coda condivisa per batch inference
- GPU condivisa tra tutte le camere
"""
def __init__(self, sources: list, model_path: str, batch_size: int = 4):
self.sources = sources
self.model = YOLO(model_path)
self.batch_size = batch_size
self.frame_queue = queue.Queue(maxsize=batch_size * 2)
self.stopped = False
def _camera_reader(self, camera_id: int, source) -> None:
"""Thread worker per singola camera."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if isinstance(source, str) and 'rtsp' in source:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
while not self.stopped:
ret, frame = cap.read()
if not ret:
logger.warning(f"Camera {camera_id} disconnessa")
time.sleep(1.0)
cap = cv2.VideoCapture(source) # Reconnect
continue
frame_obj = CameraFrame(camera_id=camera_id, frame=frame,
timestamp=time.time())
try:
self.frame_queue.put(frame_obj, timeout=0.1)
except queue.Full:
pass # Drop frame se la coda e piena
cap.release()
def run(self) -> None:
"""Avvia tutti i reader thread e processa batch dalla coda."""
threads = []
for i, source in enumerate(self.sources):
t = threading.Thread(
target=self._camera_reader, args=(i, source), daemon=True
)
t.start()
threads.append(t)
logger.info(f"Pipeline avviata: {len(self.sources)} camere")
batch = []
while not self.stopped:
try:
frame_obj = self.frame_queue.get(timeout=0.5)
batch.append(frame_obj)
# Processa batch quando pieno o timeout
if len(batch) >= self.batch_size:
self._process_batch(batch)
batch = []
except queue.Empty:
if batch:
self._process_batch(batch)
batch = []
def _process_batch(self, batch: list[CameraFrame]) -> None:
"""Inference batch su GPU - più efficiente di inference singola."""
frames = [f.frame for f in batch]
# Batch inference con YOLO
results = self.model.predict(
frames, conf=0.4, iou=0.45, verbose=False
)
for frame_obj, result in zip(batch, results):
n_det = len(result.boxes)
if n_det > 0:
logger.info(f"Camera {frame_obj.camera_id}: {n_det} oggetti rilevati")
def stop(self) -> None:
self.stopped = True
# Configurazione multi-camera
pipeline = MultiCameraPipeline(
sources=[
0, # Webcam locale
'rtsp://192.168.1.100/stream1', # Camera IP RTSP
'rtsp://192.168.1.101/stream1', # Camera IP RTSP 2
'videos/recording.mp4' # File video
],
model_path='yolo26m.pt',
batch_size=4
)
# pipeline.run()
6. Profiling e OpenCV CUDA Accelerated
6.1 PyTorch Profiler per Pipeline CV
Prima di ottimizzare, misura. Il PyTorch Profiler e lo strumento più potente per identificare i colli di bottiglia nella pipeline: capisce esattamente dove va il tempo (preprocessing, inference, postprocessing, data transfer CPU/GPU).
import torch
import torch.profiler as profiler
import cv2
import numpy as np
from ultralytics import YOLO
def profile_cv_pipeline(model_path: str,
video_source,
n_frames: int = 100,
output_dir: str = './profiler_logs') -> None:
"""
Profila la pipeline CV con PyTorch Profiler.
Genera log compatibili con TensorBoard per analisi visuale.
Output: profiler_logs/ (aprire con: tensorboard --logdir profiler_logs)
"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_source)
frame_count = 0
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(
wait=5, # Skip first 5 iterations (warmup variability)
warmup=5, # Warmup 5 iterations (profiler overhead)
active=20, # Profile 20 iterations
repeat=2 # Repeat the cycle 2 times
),
on_trace_ready=profiler.tensorboard_trace_handler(output_dir),
record_shapes=True, # Record tensor shapes
profile_memory=True, # Track memory allocations
with_stack=True # Include call stack
) as prof:
while frame_count < n_frames:
ret, frame = cap.read()
if not ret:
break
with torch.profiler.record_function("preprocessing"):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0)
if torch.cuda.is_available():
tensor = tensor.cuda()
with torch.profiler.record_function("inference"):
with torch.no_grad():
results = model.predict(frame, verbose=False)
with torch.profiler.record_function("postprocessing"):
boxes = results[0].boxes
n_det = len(boxes)
prof.step()
frame_count += 1
cap.release()
print(f"Profiling completato. Risultati in: {output_dir}")
print(f"Avvia: tensorboard --logdir {output_dir}")
# Stampa tabella top operazioni (utile senza TensorBoard)
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=15
))
# Profiling semplice con timer manuale per produzioni senza TensorBoard
class PipelineProfiler:
"""Profiler leggero per monitoraggio continuo in produzione."""
def __init__(self, window_size: int = 100):
import collections
self.window_size = window_size
self.times: dict = {
'preprocess': collections.deque(maxlen=window_size),
'inference': collections.deque(maxlen=window_size),
'postprocess': collections.deque(maxlen=window_size),
}
def record(self, stage: str, duration_ms: float) -> None:
if stage in self.times:
self.times[stage].append(duration_ms)
def report(self) -> dict:
"""Restituisce statistiche rolling degli ultimi N frame."""
stats = {}
for stage, times in self.times.items():
if times:
arr = np.array(times)
stats[stage] = {
'mean_ms': float(np.mean(arr)),
'p50_ms': float(np.percentile(arr, 50)),
'p95_ms': float(np.percentile(arr, 95)),
'p99_ms': float(np.percentile(arr, 99)),
}
return stats
def print_report(self) -> None:
stats = self.report()
print("\n=== Pipeline Performance (last {self.window_size} frames) ===")
for stage, s in stats.items():
print(f" {stage:12s}: mean={s['mean_ms']:.1f}ms "
f"p95={s['p95_ms']:.1f}ms p99={s['p99_ms']:.1f}ms")
6.2 OpenCV con Accelerazione CUDA
OpenCV supporta un modulo cuda (opencv-contrib-python compilato con CUDA) che accelera le operazioni di preprocessing fino a 10x su GPU NVIDIA. Particolarmente utile per resize, gaussian blur e color conversions in pipeline ad alta frequenza.
import cv2
import numpy as np
import torch
def check_opencv_cuda() -> dict:
"""Verifica disponibilità e configurazione di OpenCV CUDA."""
info = {
'opencv_version': cv2.__version__,
'cuda_enabled': cv2.cuda.getCudaEnabledDeviceCount() > 0,
'gpu_count': cv2.cuda.getCudaEnabledDeviceCount(),
}
if info['cuda_enabled']:
info['gpu_name'] = cv2.cuda.printShortCudaDeviceInfo(0)
return info
class CUDAPreprocessor:
"""
Preprocessor OpenCV accelerato su GPU.
Richiede: pip install opencv-contrib-python
e compilazione OpenCV con CUDA support.
Speedup tipico vs CPU: 5-10x per resize+cvtColor
"""
def __init__(self, target_size: tuple = (640, 640)):
self.target_size = target_size
self.has_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.has_cuda:
# Pre-alloca stream CUDA per pipeline asincrona
self.stream = cv2.cuda_Stream()
# GPU matrix per evitare ri-allocazioni
self.gpu_frame = cv2.cuda_GpuMat()
self.gpu_rgb = cv2.cuda_GpuMat()
self.gpu_resized = cv2.cuda_GpuMat()
def preprocess_cuda(self, img_bgr: np.ndarray) -> np.ndarray:
"""
Preprocessing su GPU: BGR->RGB + Resize + (opzionale) Gaussian blur.
Returns: numpy array RGB normalizzato [0,1] float32
"""
if not self.has_cuda:
return self._preprocess_cpu(img_bgr)
# Upload a GPU
self.gpu_frame.upload(img_bgr, self.stream)
# BGR -> RGB su GPU
cv2.cuda.cvtColor(self.gpu_frame, cv2.COLOR_BGR2RGB,
self.gpu_rgb, stream=self.stream)
# Resize su GPU (INTER_LINEAR e supportato in cuda)
cv2.cuda.resize(self.gpu_rgb, self.target_size,
self.gpu_resized,
interpolation=cv2.INTER_LINEAR,
stream=self.stream)
# Download da GPU (sincrono - necessario prima dell'inference PyTorch)
result = self.gpu_resized.download(stream=self.stream)
self.stream.waitForCompletion()
return result.astype(np.float32) / 255.0
def _preprocess_cpu(self, img_bgr: np.ndarray) -> np.ndarray:
"""Fallback CPU se CUDA non e disponibile."""
rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, self.target_size, interpolation=cv2.INTER_LINEAR)
return resized.astype(np.float32) / 255.0
def preprocess_batch_cuda(self, frames: list[np.ndarray]) -> torch.Tensor:
"""
Preprocessing batch su GPU.
Converte lista di frame in un batch tensor pronto per inference.
Massimizza throughput per pipeline multi-camera.
"""
preprocessed = [self.preprocess_cuda(f) for f in frames]
# Stack in batch tensor [B, C, H, W]
batch = np.stack(preprocessed) # [B, H, W, C]
batch_tensor = torch.from_numpy(batch).permute(0, 3, 1, 2)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda(non_blocking=True)
return batch_tensor
# Benchmark: CPU vs CUDA preprocessing
def benchmark_preprocessing(n_frames: int = 500,
img_size: tuple = (1920, 1080)) -> None:
"""Confronta velocità preprocessing CPU vs CUDA."""
import time
# Genera frame sintetici
frames = [np.random.randint(0, 255, (*img_size, 3), dtype=np.uint8)
for _ in range(10)]
preprocessor = CUDAPreprocessor(target_size=(640, 640))
# Benchmark CPU
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor._preprocess_cpu(f)
cpu_time = time.perf_counter() - t0
# Benchmark CUDA (se disponibile)
if preprocessor.has_cuda:
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor.preprocess_cuda(f)
cuda_time = time.perf_counter() - t0
print(f"Preprocessing {n_frames} frame ({img_size[1]}x{img_size[0]} -> 640x640):")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
print(f" CUDA: {cuda_time:.2f}s ({n_frames/cuda_time:.0f} FPS)")
print(f" Speedup: {cpu_time/cuda_time:.1f}x")
else:
print("CUDA non disponibile, solo benchmark CPU")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
7. Image Quality Assessment e Metriche di Frame
Prima di inviare un frame all'inference, vale la pena valutarne la qualità. Frame mossi, sovraesposti o con scarsa nitidezza producono false negative e riducono l'affidabilità del sistema. Le metriche PSNR e SSIM, tipicamente usate per la qualità della compressione, si applicano bene anche al frame filtering.
import cv2
import numpy as np
from dataclasses import dataclass
@dataclass
class FrameQuality:
"""Metriche di qualità per un singolo frame."""
blur_score: float # Varianza Laplaciano (alto = nitido)
brightness: float # Luminosita media [0, 255]
contrast: float # Deviazione standard luminosita
is_valid: bool # Frame accettabile per inference?
reject_reason: str # Motivo del rifiuto ('', 'blur', 'dark', ecc.)
class FrameQualityFilter:
"""
Filtra frame di bassa qualità prima dell'inference.
Riduce falsi negativi in sistemi di sorveglianza.
"""
def __init__(self,
blur_threshold: float = 100.0, # Laplacian variance
brightness_min: float = 30.0,
brightness_max: float = 220.0,
contrast_min: float = 15.0):
self.blur_threshold = blur_threshold
self.brightness_min = brightness_min
self.brightness_max = brightness_max
self.contrast_min = contrast_min
def assess(self, frame_bgr: np.ndarray) -> FrameQuality:
"""Valuta qualità del frame."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Blur detection: varianza del Laplaciano
# Un frame nitido ha bordi marcati -> alta varianza
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
# Luminosita e contrasto
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
# Valutazione
reject_reason = ''
if laplacian_var < self.blur_threshold:
reject_reason = f'blur (var={laplacian_var:.1f})'
elif brightness < self.brightness_min:
reject_reason = f'too_dark (mean={brightness:.1f})'
elif brightness > self.brightness_max:
reject_reason = f'overexposed (mean={brightness:.1f})'
elif contrast < self.contrast_min:
reject_reason = f'low_contrast (std={contrast:.1f})'
return FrameQuality(
blur_score=laplacian_var,
brightness=brightness,
contrast=contrast,
is_valid=(reject_reason == ''),
reject_reason=reject_reason
)
def filter_batch(self, frames: list[np.ndarray]) -> tuple[list, list]:
"""
Filtra una batch di frame.
Returns: (valid_frames, rejected_frames_with_reason)
"""
valid = []
rejected = []
for frame in frames:
quality = self.assess(frame)
if quality.is_valid:
valid.append(frame)
else:
rejected.append((frame, quality.reject_reason))
return valid, rejected
def compute_psnr(original: np.ndarray, compressed: np.ndarray) -> float:
"""
Peak Signal-to-Noise Ratio tra immagine originale e compressa.
PSNR > 40 dB = ottima qualità; < 20 dB = bassa qualità percettibile.
"""
mse = np.mean((original.astype(float) - compressed.astype(float)) ** 2)
if mse == 0:
return float('inf')
max_pixel = 255.0
return 20 * np.log10(max_pixel / np.sqrt(mse))
def compute_ssim(img1: np.ndarray, img2: np.ndarray) -> float:
"""
Structural Similarity Index (SSIM) tra due immagini grayscale.
Range: [-1, 1], dove 1 = identiche, 0 = non correlate.
Più fedele alla percezione umana rispetto a MSE/PSNR.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(float)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(float)
C1 = (0.01 * 255) ** 2 # costante stabilità
C2 = (0.03 * 255) ** 2
mu1, mu2 = gray1.mean(), gray2.mean()
sigma1 = gray1.std()
sigma2 = gray2.std()
sigma12 = np.mean((gray1 - mu1) * (gray2 - mu2))
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1**2 + sigma2**2 + C2)
return float(numerator / denominator)
# Utilizzo in pipeline con motion gating intelligente
class SmartMotionGate:
"""
Combina background subtraction + SSIM per gating intelligente.
Evita sia di inferire su frame identici (statica) sia su frame corrotti.
"""
def __init__(self, ssim_change_threshold: float = 0.95):
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=200, varThreshold=40, detectShadows=False
)
self.quality_filter = FrameQualityFilter()
self.ssim_threshold = ssim_change_threshold
self.prev_frame = None
def should_infer(self, frame: np.ndarray) -> tuple[bool, str]:
"""
Decide se inferire su questo frame.
Returns: (should_infer, reason)
"""
# 1. Quality check
quality = self.quality_filter.assess(frame)
if not quality.is_valid:
return False, f"low_quality: {quality.reject_reason}"
# 2. Motion detection (veloce)
fg_mask = self.subtractor.apply(frame)
motion_ratio = np.sum(fg_mask > 0) / fg_mask.size
if motion_ratio < 0.005: # meno dello 0.5% dei pixel in movimento
return False, "no_motion"
# 3. SSIM check (rileva cambiamenti strutturali, non solo rumore)
if self.prev_frame is not None:
ssim = compute_ssim(frame, self.prev_frame)
if ssim > self.ssim_threshold:
return False, f"scene_static (ssim={ssim:.3f})"
self.prev_frame = frame.copy()
return True, "ok"
8. Best Practices e Performance
Performance Tipiche: YOLO26m in Pipeline OpenCV con CUDA Preprocessing
| Hardware | FPS (640x640) | Latenza | Ottimizzazione |
|---|---|---|---|
| NVIDIA A100 | ~220 FPS | ~4.5ms | TensorRT FP16 |
| NVIDIA RTX 4090 | ~180 FPS | ~5.6ms | TensorRT FP16 |
| NVIDIA RTX 3080 | ~120 FPS | ~8.3ms | ONNX Runtime |
| CPU Intel i9 (ONNX) | ~18 FPS | ~55ms | OpenVINO |
| Raspberry Pi 5 | ~3 FPS | ~330ms | YOLOv8n INT8 |
Ottimizzazioni Performance
- Leggi in thread separato: Non bloccare il loop di inference con I/O della camera. Usa ThreadedVideoCapture per massimizzare l'utilizzo GPU.
- BGR->RGB una volta sola: Non convertire ad ogni chiamata. Converti alla lettura e mantieni RGB per tutto il pipeline.
- torch.no_grad() sempre in inference: Disabilita il grafo computazionale di autograd durante l'inference: -30% memoria, +10% velocità.
- Batch inference: Se hai più stream video, accumula frame e inferisci in batch. Il throughput GPU scala quasi linearmente con il batch size.
- TensorRT per produzione: Su hardware NVIDIA, esporta sempre in TensorRT per la massima velocità. YOLOv8m: da 50 a 180 FPS con TensorRT FP16.
- Background subtraction come gating: In scene con poco movimento, esegui l'inference DL solo sui frame dove MOG2 rileva movimento. Risparmia il 60-70% di compute.
- Frame quality filter: Scarta frame mossi (blur score < 100) e sottoesposti prima dell'inference. Riduce i falsi negativi e migliora la precision del sistema.
- SSIM per scene statiche: Se due frame consecutivi hanno SSIM > 0.95, la scena non e cambiata. Non rieseguire l'inference, riusa i risultati precedenti.
- OpenCV CUDA per preprocessing: Se hai una GPU NVIDIA e opencv-contrib compilato con CUDA, il preprocessing (resize, cvtColor) e 5-10x più veloce su GPU.
Errori Comuni da Evitare
- Non effettuare warmup del modello: La prima inference e sempre più lenta (JIT compilation, CUDA initialization). Esegui 3-5 inference dummy prima di misurare le performance.
- Dimenticare model.eval(): In modalità train, i layer Dropout e BatchNorm si comportano diversamente. Sempre chiamare
model.eval()prima dell'inference. - frame.copy() assente nell'annotazione: Non annotare direttamente il frame originale se lo stai anche usando per l'inference. Usa sempre
frame.copy(). - Non rilasciare VideoCapture: Ogni
cv2.VideoCaptureaperto deve essere rilasciato concap.release(). Usa context manager o try/finally. - RTSP senza reconnect logic: Le connessioni RTSP si interrompono. Ogni camera reader deve avere retry logic con backoff esponenziale.
Conclusioni
Abbiamo costruito una pipeline CV completa che integra OpenCV e PyTorch/YOLO, coprendo ogni layer di un sistema production-ready:
- OpenCV per acquisizione, preprocessing, postprocessing e visualizzazione
- Threading per acquisizione video non bloccante che massimizza l'utilizzo GPU
- Pipeline production-ready con logging strutturato, alerting e salvataggio video
- Multi-camera con batch inference e auto-reconnect RTSP per scenari surveillance
- Background subtraction (MOG2) e Smart Motion Gate per ridurre il compute fino al 70%
- PyTorch Profiler per identificare esattamente i colli di bottiglia
- OpenCV CUDA per preprocessing accelerato GPU (5-10x vs CPU)
- Frame Quality Assessment per filtrare frame corrotti prima dell'inference
- Benchmark performance su hardware reale: da A100 (220 FPS) a Raspberry Pi (3 FPS)
Navigazione Serie
- Precedente: Data Augmentation per Computer Vision
- Successivo: Computer Vision su Edge: Ottimizzazione per Dispositivi Mobili







