OpenCV a PyTorch: Kompletní potrubí počítačového vidění
OpenCV e PyTorch jsou to dva pilíře počítačového ekosystému moderní vize. OpenCV vyniká v získávání obrazu, předzpracování a následném zpracování: tradiční operace, správa webové kamery a videa, morfologické transformace, klasické filtry. PyTorch přináší hluboké učení: neuronové sítě, GPU computing, end-to-end školení. Společně tvoří kompletní CV potrubí od získávání nezpracovaných pixelů po inteligentní predikci.
V tomto článku sestavíme kompletní potrubí počítačového vidění: ze záznamu videa s OpenCV, k předběžnému zpracování, k odvození s modelem PyTorch/YOLO, k vizualizaci a protokolování výsledků. Systém připravený k produkci, který můžete nasadit přímo.
Co se naučíte
- Základní OpenCV: čtení obrázků/videí, barevné prostory, morfologické operace
- Integrace OpenCV-PyTorch: konverze tenzoru, optimalizované potrubí
- Akvizice videa v reálném čase se správou vyrovnávací paměti
- Potrubí předběžného zpracování: změna velikosti, normalizace, příprava dávky
- Optimalizovaná inference: dávkové zpracování, asynchronní inference
- Post-processing: NMS, transformace souřadnic, anotace snímků
- Vícekamerové potrubí a zpracování toku RTSP
- Systém protokolování a upozorňování na zjištěné události
- Optimalizace výkonu: vlákna, GPU streamy, profilování
1. OpenCV Fundamentals for CV Pipeline
1.1 Barevné prostory a převody
Často přehlížený kritický bod: OpenCV používá formát BGR standardně, při používání PyTorch (a PIL). RGB. Záměna těchto dvou vede ke katastrofálním výsledkům: model předem natrénovaný na RGB obrázcích bude přijímat inverzní kanály. Vždy převádějte!
import cv2
import numpy as np
import torch
# ---- Lettura e conversioni ----
def load_image_rgb(path: str) -> np.ndarray:
"""Carica immagine in formato RGB (corretto per PyTorch)."""
img_bgr = cv2.imread(path)
if img_bgr is None:
raise FileNotFoundError(f"Immagine non trovata: {path}")
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def bgr_to_torch(img_bgr: np.ndarray) -> torch.Tensor:
"""
Converte immagine OpenCV BGR in tensor PyTorch normalizzato.
BGR [H, W, C] uint8 -> RGB [C, H, W] float32 normalizzato [0,1]
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float() / 255.0
return tensor
def torch_to_bgr(tensor: torch.Tensor) -> np.ndarray:
"""
Converte tensor PyTorch in immagine OpenCV BGR.
RGB [C, H, W] float32 -> BGR [H, W, C] uint8
"""
img_rgb = (tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
# ---- Spazi colore utili ----
def analyze_color_spaces(img_bgr: np.ndarray) -> dict:
"""Analizza l'immagine in diversi spazi colore."""
return {
'bgr': img_bgr,
'rgb': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB),
'gray': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),
'hsv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV),
'lab': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB),
'yuv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YUV),
}
# ---- Operazioni morfologiche ----
def apply_morphology(gray: np.ndarray, operation: str = 'opening',
kernel_size: int = 5) -> np.ndarray:
"""
Operazioni morfologiche per pre/post-processing.
opening = erosione + dilatazione (rimuove rumore piccolo)
closing = dilatazione + erosione (chiude piccoli buchi)
"""
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
ops = {
'erode': cv2.erode(gray, kernel),
'dilate': cv2.dilate(gray, kernel),
'opening': cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel),
'closing': cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel),
'gradient': cv2.morphologyEx(gray, cv2.MORPH_GRADIENT, kernel),
'tophat': cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel)
}
return ops.get(operation, gray)
# ---- Rilevamento contorni e feature classiche ----
def detect_edges_and_contours(img_bgr: np.ndarray) -> tuple:
"""Pipeline classica: blur -> Canny -> contorni."""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, threshold1=50, threshold2=150)
contours, hierarchy = cv2.findContours(
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filtra contorni per area minima
significant_contours = [c for c in contours if cv2.contourArea(c) > 500]
return edges, significant_contours
# ---- Preprocessing per modelli DL ----
def preprocess_for_model(img_bgr: np.ndarray,
target_size: tuple[int,int] = (640, 640),
mean: list = [0.485, 0.456, 0.406],
std: list = [0.229, 0.224, 0.225]) -> torch.Tensor:
"""
Pipeline completa: BGR immagine -> tensor normalizzato pronto per inference.
"""
# Letterbox resize (mantiene aspect ratio)
h, w = img_bgr.shape[:2]
target_h, target_w = target_size
scale = min(target_h / h, target_w / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Padding per raggiungere target_size
pad_h = target_h - new_h
pad_w = target_w - new_w
padded = cv2.copyMakeBorder(
resized,
pad_h // 2, pad_h - pad_h // 2,
pad_w // 2, pad_w - pad_w // 2,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# BGR -> RGB -> float -> normalize -> tensor
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1) # HWC -> CHW
mean_t = torch.tensor(mean).view(3, 1, 1)
std_t = torch.tensor(std).view(3, 1, 1)
tensor = (tensor - mean_t) / std_t
return tensor.unsqueeze(0), scale, (pad_w // 2, pad_h // 2) # batch dim + metadata
2. Pořízení videa a webové kamery
2.1 VideoCapture a Frame Buffer
import cv2
import threading
import queue
import time
from dataclasses import dataclass
@dataclass
class Frame:
"""Wrapper per un frame con metadata."""
data: np.ndarray
timestamp: float
frame_id: int
class ThreadedVideoCapture:
"""
VideoCapture con lettura in thread separato.
Previene il drop di frame dovuto al processing lento:
la GPU che fa inference non blocca la lettura della camera.
"""
def __init__(self, source, max_buffer_size: int = 5):
self.cap = cv2.VideoCapture(source)
if not self.cap.isOpened():
raise RuntimeError(f"Impossibile aprire: {source}")
# Ottimizza per latenza bassa
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.buffer = queue.Queue(maxsize=max_buffer_size)
self.stopped = False
self.frame_id = 0
# Avvia thread di lettura
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
def _read_frames(self) -> None:
"""Thread worker: legge frame continuamente."""
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
self.stopped = True
break
frame_obj = Frame(
data=frame,
timestamp=time.time(),
frame_id=self.frame_id
)
self.frame_id += 1
# Scarta frame vecchi se il buffer e pieno
if self.buffer.full():
try:
self.buffer.get_nowait()
except queue.Empty:
pass
self.buffer.put(frame_obj)
def read(self) -> Frame | None:
"""Leggi prossimo frame disponibile."""
try:
return self.buffer.get(timeout=1.0)
except queue.Empty:
return None
def get_fps(self) -> float:
return self.cap.get(cv2.CAP_PROP_FPS)
def get_resolution(self) -> tuple[int, int]:
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def release(self) -> None:
self.stopped = True
self.cap.release()
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
3. Kompletní CV Pipeline: OpenCV + YOLO
import cv2
import torch
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from ultralytics import YOLO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Singola detection con tutti i metadata."""
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
timestamp: float = field(default_factory=time.time)
frame_id: int = 0
def to_dict(self) -> dict:
return {
'class': self.class_name,
'confidence': round(self.confidence, 3),
'bbox': list(self.bbox),
'timestamp': self.timestamp,
'frame_id': self.frame_id
}
class CVPipeline:
"""
Pipeline completa Computer Vision:
Acquisizione -> Preprocessing -> Inference -> Post-processing -> Output
"""
def __init__(
self,
model_path: str,
source, # int (webcam), str (file/RTSP)
conf_threshold: float = 0.4,
iou_threshold: float = 0.45,
target_classes: list[str] | None = None, # None = tutte le classi
output_dir: str = 'output',
save_video: bool = False,
alert_classes: list[str] | None = None # classi che triggerano alert
):
self.model = YOLO(model_path)
self.conf = conf_threshold
self.iou = iou_threshold
self.target_classes = target_classes
self.alert_classes = alert_classes or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.source = source
self.save_video = save_video
# Stats
self.frame_count = 0
self.total_detections = 0
self.start_time = None
self.fps_history = []
def run(self) -> None:
"""Avvia la pipeline di processing."""
logger.info(f"Avvio pipeline: source={self.source} modello={self.model.model_name}")
cap = cv2.VideoCapture(self.source)
if not cap.isOpened():
raise RuntimeError(f"Impossibile aprire source: {self.source}")
# Setup video writer se richiesto
writer = None
if self.save_video:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = str(self.output_dir / f"output_{timestamp}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
logger.info(f"Salvataggio video: {out_path}")
self.start_time = time.time()
detection_log = []
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
# Inference
detections = self._run_inference(frame, self.frame_count)
# Visualizzazione
annotated = self._annotate_frame(frame, detections)
# Stats overlay
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0.0
self.fps_history.append(fps)
annotated = self._add_stats_overlay(annotated, fps, len(detections))
# Salvataggio
if writer:
writer.write(annotated)
cv2.imshow('CV Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Logging detections
for det in detections:
detection_log.append(det.to_dict())
if det.class_name in self.alert_classes:
self._trigger_alert(det)
self.frame_count += 1
self.total_detections += len(detections)
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
# Salva log
log_path = self.output_dir / 'detection_log.json'
with open(log_path, 'w') as f:
json.dump(detection_log, f, indent=2)
self._print_stats()
def _run_inference(self, frame: np.ndarray, frame_id: int) -> list[Detection]:
"""Esegue YOLO inference su un singolo frame."""
results = self.model.predict(
frame, conf=self.conf, iou=self.iou, verbose=False
)
detections = []
for box in results[0].boxes:
class_name = self.model.names[int(box.cls[0])]
# Filtra per classi target se specificato
if self.target_classes and class_name not in self.target_classes:
continue
x1, y1, x2, y2 = [int(c) for c in box.xyxy[0]]
detections.append(Detection(
class_name=class_name,
class_id=int(box.cls[0]),
confidence=float(box.conf[0]),
bbox=(x1, y1, x2, y2),
frame_id=frame_id
))
return detections
def _annotate_frame(self, frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
"""Annota il frame con bounding boxes e label."""
annotated = frame.copy()
np.random.seed(42)
colors = {name: tuple(np.random.randint(50, 255, 3).tolist())
for name in self.model.names.values()}
for det in detections:
x1, y1, x2, y2 = det.bbox
color = colors.get(det.class_name, (0, 255, 0))
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0] + 5, y1), color, -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated
def _add_stats_overlay(self, frame: np.ndarray, fps: float,
n_detections: int) -> np.ndarray:
"""Aggiunge overlay con statistiche real-time."""
h = frame.shape[0]
stats = [
f"FPS: {fps:.1f}",
f"Detections: {n_detections}",
f"Frame: {self.frame_count}",
f"Avg FPS: {np.mean(self.fps_history[-30:]):.1f}" if self.fps_history else "Avg FPS: -"
]
for i, text in enumerate(stats):
cv2.putText(frame, text, (10, h - 100 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return frame
def _trigger_alert(self, det: Detection) -> None:
"""Gestisce un evento di alert per classi critiche."""
logger.warning(f"ALERT: {det.class_name} rilevato con confidenza {det.confidence:.2f} "
f"(frame {det.frame_id})")
# In produzione: invia notifica (email, Slack, webhook)
def _print_stats(self) -> None:
"""Stampa statistiche finali della sessione."""
elapsed = time.time() - self.start_time
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
logger.info(f"\n=== Statistiche Pipeline ===")
logger.info(f"Frame processati: {self.frame_count}")
logger.info(f"Detections totali: {self.total_detections}")
logger.info(f"Tempo totale: {elapsed:.1f}s")
logger.info(f"FPS medio: {avg_fps:.1f}")
# Utilizzo
if __name__ == '__main__':
pipeline = CVPipeline(
model_path='yolo26m.pt',
source=0, # webcam (o 'video.mp4', 'rtsp://...')
conf_threshold=0.4,
target_classes=['person', 'car', 'truck'],
alert_classes=['person'],
output_dir='output',
save_video=True
)
pipeline.run()
4. Operace OpenCV pro následné zpracování
import cv2
import numpy as np
# ---- Background Subtraction (MOG2) ----
def setup_background_subtraction():
"""
MOG2: Mixture of Gaussians per rilevamento movimento.
Utile come primo filtro prima dell'inference DL.
"""
subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=50,
detectShadows=True
)
return subtractor
def detect_motion(frame_bgr: np.ndarray, subtractor,
min_contour_area: int = 1000) -> list[tuple]:
"""Rileva regioni di movimento nel frame."""
fg_mask = subtractor.apply(frame_bgr)
# Cleanup maschera
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion_regions = []
for c in contours:
if cv2.contourArea(c) >= min_contour_area:
x, y, w, h = cv2.boundingRect(c)
motion_regions.append((x, y, x+w, y+h))
return motion_regions
# ---- Optical Flow (Lucas-Kanade) ----
def compute_sparse_optical_flow(prev_gray: np.ndarray,
curr_gray: np.ndarray,
prev_points: np.ndarray) -> tuple:
"""
Lucas-Kanade optical flow per tracking di punti specifici.
Utile per tracking di keypoints rilevati nel frame precedente.
"""
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
good_prev = prev_points[status.flatten() == 1]
good_curr = curr_points[status.flatten() == 1]
# Calcola velocità media del movimento
if len(good_prev) > 0:
flow_vectors = good_curr - good_prev
avg_speed = np.mean(np.linalg.norm(flow_vectors, axis=1))
else:
avg_speed = 0.0
return good_curr, good_prev, avg_speed
# ---- CSRT Tracker (più preciso di MOSSE) ----
class ObjectTracker:
"""
Tracker multi-oggetto per mantenere l'identità degli oggetti
tra frame consecutivi senza rieseguire l'inference a ogni frame.
"""
def __init__(self, tracker_type: str = 'CSRT'):
self.tracker_type = tracker_type
self.trackers = [] # lista di (tracker, class_name, id)
self.next_id = 0
def add_tracker(self, frame: np.ndarray, bbox: tuple, class_name: str) -> int:
"""Aggiunge un tracker per un nuovo oggetto."""
x1, y1, x2, y2 = bbox
cv2_bbox = (x1, y1, x2 - x1, y2 - y1) # formato OpenCV: x, y, w, h
tracker = cv2.TrackerCSRT_create()
tracker.init(frame, cv2_bbox)
obj_id = self.next_id
self.trackers.append((tracker, class_name, obj_id))
self.next_id += 1
return obj_id
def update(self, frame: np.ndarray) -> list[dict]:
"""Aggiorna tutti i tracker con il nuovo frame."""
active_objects = []
failed_trackers = []
for i, (tracker, class_name, obj_id) in enumerate(self.trackers):
success, cv2_bbox = tracker.update(frame)
if success:
x, y, w, h = [int(v) for v in cv2_bbox]
active_objects.append({
'id': obj_id,
'class_name': class_name,
'bbox': (x, y, x + w, y + h)
})
else:
failed_trackers.append(i)
# Rimuovi tracker falliti (indietro per evitare shift indici)
for i in reversed(failed_trackers):
self.trackers.pop(i)
return active_objects
def clear(self) -> None:
self.trackers = []
5. Multi-Camera Pipeline a RTSP Stream
import cv2
import threading
import queue
import torch
from ultralytics import YOLO
from dataclasses import dataclass
@dataclass
class CameraFrame:
camera_id: int
frame: np.ndarray
timestamp: float
class MultiCameraPipeline:
"""
Pipeline multi-camera con:
- Thread separato per ogni camera
- Coda condivisa per batch inference
- GPU condivisa tra tutte le camere
"""
def __init__(self, sources: list, model_path: str, batch_size: int = 4):
self.sources = sources
self.model = YOLO(model_path)
self.batch_size = batch_size
self.frame_queue = queue.Queue(maxsize=batch_size * 2)
self.stopped = False
def _camera_reader(self, camera_id: int, source) -> None:
"""Thread worker per singola camera."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if isinstance(source, str) and 'rtsp' in source:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
while not self.stopped:
ret, frame = cap.read()
if not ret:
logger.warning(f"Camera {camera_id} disconnessa")
time.sleep(1.0)
cap = cv2.VideoCapture(source) # Reconnect
continue
frame_obj = CameraFrame(camera_id=camera_id, frame=frame,
timestamp=time.time())
try:
self.frame_queue.put(frame_obj, timeout=0.1)
except queue.Full:
pass # Drop frame se la coda e piena
cap.release()
def run(self) -> None:
"""Avvia tutti i reader thread e processa batch dalla coda."""
threads = []
for i, source in enumerate(self.sources):
t = threading.Thread(
target=self._camera_reader, args=(i, source), daemon=True
)
t.start()
threads.append(t)
logger.info(f"Pipeline avviata: {len(self.sources)} camere")
batch = []
while not self.stopped:
try:
frame_obj = self.frame_queue.get(timeout=0.5)
batch.append(frame_obj)
# Processa batch quando pieno o timeout
if len(batch) >= self.batch_size:
self._process_batch(batch)
batch = []
except queue.Empty:
if batch:
self._process_batch(batch)
batch = []
def _process_batch(self, batch: list[CameraFrame]) -> None:
"""Inference batch su GPU - più efficiente di inference singola."""
frames = [f.frame for f in batch]
# Batch inference con YOLO
results = self.model.predict(
frames, conf=0.4, iou=0.45, verbose=False
)
for frame_obj, result in zip(batch, results):
n_det = len(result.boxes)
if n_det > 0:
logger.info(f"Camera {frame_obj.camera_id}: {n_det} oggetti rilevati")
def stop(self) -> None:
self.stopped = True
# Configurazione multi-camera
pipeline = MultiCameraPipeline(
sources=[
0, # Webcam locale
'rtsp://192.168.1.100/stream1', # Camera IP RTSP
'rtsp://192.168.1.101/stream1', # Camera IP RTSP 2
'videos/recording.mp4' # File video
],
model_path='yolo26m.pt',
batch_size=4
)
# pipeline.run()
6. Profilování a OpenCV CUDA Accelerated
6.1 PyTorch Profiler pro Pipeline CV
Než začnete optimalizovat, změřte. The PyTorch Profiler a nástroj výkonnější k identifikaci úzkých míst v potrubí: přesně rozumí kam plyne čas (preprocessing, inference, postprocessing, přenos dat CPU/GPU).
import torch
import torch.profiler as profiler
import cv2
import numpy as np
from ultralytics import YOLO
def profile_cv_pipeline(model_path: str,
video_source,
n_frames: int = 100,
output_dir: str = './profiler_logs') -> None:
"""
Profila la pipeline CV con PyTorch Profiler.
Genera log compatibili con TensorBoard per analisi visuale.
Output: profiler_logs/ (aprire con: tensorboard --logdir profiler_logs)
"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_source)
frame_count = 0
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(
wait=5, # Skip first 5 iterations (warmup variability)
warmup=5, # Warmup 5 iterations (profiler overhead)
active=20, # Profile 20 iterations
repeat=2 # Repeat the cycle 2 times
),
on_trace_ready=profiler.tensorboard_trace_handler(output_dir),
record_shapes=True, # Record tensor shapes
profile_memory=True, # Track memory allocations
with_stack=True # Include call stack
) as prof:
while frame_count < n_frames:
ret, frame = cap.read()
if not ret:
break
with torch.profiler.record_function("preprocessing"):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0)
if torch.cuda.is_available():
tensor = tensor.cuda()
with torch.profiler.record_function("inference"):
with torch.no_grad():
results = model.predict(frame, verbose=False)
with torch.profiler.record_function("postprocessing"):
boxes = results[0].boxes
n_det = len(boxes)
prof.step()
frame_count += 1
cap.release()
print(f"Profiling completato. Risultati in: {output_dir}")
print(f"Avvia: tensorboard --logdir {output_dir}")
# Stampa tabella top operazioni (utile senza TensorBoard)
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=15
))
# Profiling semplice con timer manuale per produzioni senza TensorBoard
class PipelineProfiler:
"""Profiler leggero per monitoraggio continuo in produzione."""
def __init__(self, window_size: int = 100):
import collections
self.window_size = window_size
self.times: dict = {
'preprocess': collections.deque(maxlen=window_size),
'inference': collections.deque(maxlen=window_size),
'postprocess': collections.deque(maxlen=window_size),
}
def record(self, stage: str, duration_ms: float) -> None:
if stage in self.times:
self.times[stage].append(duration_ms)
def report(self) -> dict:
"""Restituisce statistiche rolling degli ultimi N frame."""
stats = {}
for stage, times in self.times.items():
if times:
arr = np.array(times)
stats[stage] = {
'mean_ms': float(np.mean(arr)),
'p50_ms': float(np.percentile(arr, 50)),
'p95_ms': float(np.percentile(arr, 95)),
'p99_ms': float(np.percentile(arr, 99)),
}
return stats
def print_report(self) -> None:
stats = self.report()
print("\n=== Pipeline Performance (last {self.window_size} frames) ===")
for stage, s in stats.items():
print(f" {stage:12s}: mean={s['mean_ms']:.1f}ms "
f"p95={s['p95_ms']:.1f}ms p99={s['p99_ms']:.1f}ms")
6.2 OpenCV s akcelerací CUDA
OpenCV podporuje jeden modul cuda (opencv-contrib-python zkompilovaný pomocí CUDA) který urychluje operace předběžného zpracování až 10x na GPU NVIDIA. Zvláště užitečné pro změnu velikosti, gaussovské rozostření a převod barev ve vysokofrekvenčních potrubích.
import cv2
import numpy as np
import torch
def check_opencv_cuda() -> dict:
"""Verifica disponibilità e configurazione di OpenCV CUDA."""
info = {
'opencv_version': cv2.__version__,
'cuda_enabled': cv2.cuda.getCudaEnabledDeviceCount() > 0,
'gpu_count': cv2.cuda.getCudaEnabledDeviceCount(),
}
if info['cuda_enabled']:
info['gpu_name'] = cv2.cuda.printShortCudaDeviceInfo(0)
return info
class CUDAPreprocessor:
"""
Preprocessor OpenCV accelerato su GPU.
Richiede: pip install opencv-contrib-python
e compilazione OpenCV con CUDA support.
Speedup tipico vs CPU: 5-10x per resize+cvtColor
"""
def __init__(self, target_size: tuple = (640, 640)):
self.target_size = target_size
self.has_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.has_cuda:
# Pre-alloca stream CUDA per pipeline asincrona
self.stream = cv2.cuda_Stream()
# GPU matrix per evitare ri-allocazioni
self.gpu_frame = cv2.cuda_GpuMat()
self.gpu_rgb = cv2.cuda_GpuMat()
self.gpu_resized = cv2.cuda_GpuMat()
def preprocess_cuda(self, img_bgr: np.ndarray) -> np.ndarray:
"""
Preprocessing su GPU: BGR->RGB + Resize + (opzionale) Gaussian blur.
Returns: numpy array RGB normalizzato [0,1] float32
"""
if not self.has_cuda:
return self._preprocess_cpu(img_bgr)
# Upload a GPU
self.gpu_frame.upload(img_bgr, self.stream)
# BGR -> RGB su GPU
cv2.cuda.cvtColor(self.gpu_frame, cv2.COLOR_BGR2RGB,
self.gpu_rgb, stream=self.stream)
# Resize su GPU (INTER_LINEAR e supportato in cuda)
cv2.cuda.resize(self.gpu_rgb, self.target_size,
self.gpu_resized,
interpolation=cv2.INTER_LINEAR,
stream=self.stream)
# Download da GPU (sincrono - necessario prima dell'inference PyTorch)
result = self.gpu_resized.download(stream=self.stream)
self.stream.waitForCompletion()
return result.astype(np.float32) / 255.0
def _preprocess_cpu(self, img_bgr: np.ndarray) -> np.ndarray:
"""Fallback CPU se CUDA non e disponibile."""
rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, self.target_size, interpolation=cv2.INTER_LINEAR)
return resized.astype(np.float32) / 255.0
def preprocess_batch_cuda(self, frames: list[np.ndarray]) -> torch.Tensor:
"""
Preprocessing batch su GPU.
Converte lista di frame in un batch tensor pronto per inference.
Massimizza throughput per pipeline multi-camera.
"""
preprocessed = [self.preprocess_cuda(f) for f in frames]
# Stack in batch tensor [B, C, H, W]
batch = np.stack(preprocessed) # [B, H, W, C]
batch_tensor = torch.from_numpy(batch).permute(0, 3, 1, 2)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda(non_blocking=True)
return batch_tensor
# Benchmark: CPU vs CUDA preprocessing
def benchmark_preprocessing(n_frames: int = 500,
img_size: tuple = (1920, 1080)) -> None:
"""Confronta velocità preprocessing CPU vs CUDA."""
import time
# Genera frame sintetici
frames = [np.random.randint(0, 255, (*img_size, 3), dtype=np.uint8)
for _ in range(10)]
preprocessor = CUDAPreprocessor(target_size=(640, 640))
# Benchmark CPU
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor._preprocess_cpu(f)
cpu_time = time.perf_counter() - t0
# Benchmark CUDA (se disponibile)
if preprocessor.has_cuda:
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor.preprocess_cuda(f)
cuda_time = time.perf_counter() - t0
print(f"Preprocessing {n_frames} frame ({img_size[1]}x{img_size[0]} -> 640x640):")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
print(f" CUDA: {cuda_time:.2f}s ({n_frames/cuda_time:.0f} FPS)")
print(f" Speedup: {cpu_time/cuda_time:.1f}x")
else:
print("CUDA non disponibile, solo benchmark CPU")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
7. Hodnocení kvality obrazu a metriky snímků
Před odesláním rámce k inferenci stojí za to vyhodnotit jeho kvalitu. Roztřesené, přeexponované nebo špatně zaostřené snímky vytvářejí falešné negativy a snižují se spolehlivost systému. Metriky PSNR a SSIM, obvykle používané pro kvalitu komprese, lze je také dobře aplikovat na filtrování snímků.
import cv2
import numpy as np
from dataclasses import dataclass
@dataclass
class FrameQuality:
"""Metriche di qualità per un singolo frame."""
blur_score: float # Varianza Laplaciano (alto = nitido)
brightness: float # Luminosita media [0, 255]
contrast: float # Deviazione standard luminosita
is_valid: bool # Frame accettabile per inference?
reject_reason: str # Motivo del rifiuto ('', 'blur', 'dark', ecc.)
class FrameQualityFilter:
"""
Filtra frame di bassa qualità prima dell'inference.
Riduce falsi negativi in sistemi di sorveglianza.
"""
def __init__(self,
blur_threshold: float = 100.0, # Laplacian variance
brightness_min: float = 30.0,
brightness_max: float = 220.0,
contrast_min: float = 15.0):
self.blur_threshold = blur_threshold
self.brightness_min = brightness_min
self.brightness_max = brightness_max
self.contrast_min = contrast_min
def assess(self, frame_bgr: np.ndarray) -> FrameQuality:
"""Valuta qualità del frame."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Blur detection: varianza del Laplaciano
# Un frame nitido ha bordi marcati -> alta varianza
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
# Luminosita e contrasto
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
# Valutazione
reject_reason = ''
if laplacian_var < self.blur_threshold:
reject_reason = f'blur (var={laplacian_var:.1f})'
elif brightness < self.brightness_min:
reject_reason = f'too_dark (mean={brightness:.1f})'
elif brightness > self.brightness_max:
reject_reason = f'overexposed (mean={brightness:.1f})'
elif contrast < self.contrast_min:
reject_reason = f'low_contrast (std={contrast:.1f})'
return FrameQuality(
blur_score=laplacian_var,
brightness=brightness,
contrast=contrast,
is_valid=(reject_reason == ''),
reject_reason=reject_reason
)
def filter_batch(self, frames: list[np.ndarray]) -> tuple[list, list]:
"""
Filtra una batch di frame.
Returns: (valid_frames, rejected_frames_with_reason)
"""
valid = []
rejected = []
for frame in frames:
quality = self.assess(frame)
if quality.is_valid:
valid.append(frame)
else:
rejected.append((frame, quality.reject_reason))
return valid, rejected
def compute_psnr(original: np.ndarray, compressed: np.ndarray) -> float:
"""
Peak Signal-to-Noise Ratio tra immagine originale e compressa.
PSNR > 40 dB = ottima qualità; < 20 dB = bassa qualità percettibile.
"""
mse = np.mean((original.astype(float) - compressed.astype(float)) ** 2)
if mse == 0:
return float('inf')
max_pixel = 255.0
return 20 * np.log10(max_pixel / np.sqrt(mse))
def compute_ssim(img1: np.ndarray, img2: np.ndarray) -> float:
"""
Structural Similarity Index (SSIM) tra due immagini grayscale.
Range: [-1, 1], dove 1 = identiche, 0 = non correlate.
Più fedele alla percezione umana rispetto a MSE/PSNR.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(float)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(float)
C1 = (0.01 * 255) ** 2 # costante stabilità
C2 = (0.03 * 255) ** 2
mu1, mu2 = gray1.mean(), gray2.mean()
sigma1 = gray1.std()
sigma2 = gray2.std()
sigma12 = np.mean((gray1 - mu1) * (gray2 - mu2))
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1**2 + sigma2**2 + C2)
return float(numerator / denominator)
# Utilizzo in pipeline con motion gating intelligente
class SmartMotionGate:
"""
Combina background subtraction + SSIM per gating intelligente.
Evita sia di inferire su frame identici (statica) sia su frame corrotti.
"""
def __init__(self, ssim_change_threshold: float = 0.95):
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=200, varThreshold=40, detectShadows=False
)
self.quality_filter = FrameQualityFilter()
self.ssim_threshold = ssim_change_threshold
self.prev_frame = None
def should_infer(self, frame: np.ndarray) -> tuple[bool, str]:
"""
Decide se inferire su questo frame.
Returns: (should_infer, reason)
"""
# 1. Quality check
quality = self.quality_filter.assess(frame)
if not quality.is_valid:
return False, f"low_quality: {quality.reject_reason}"
# 2. Motion detection (veloce)
fg_mask = self.subtractor.apply(frame)
motion_ratio = np.sum(fg_mask > 0) / fg_mask.size
if motion_ratio < 0.005: # meno dello 0.5% dei pixel in movimento
return False, "no_motion"
# 3. SSIM check (rileva cambiamenti strutturali, non solo rumore)
if self.prev_frame is not None:
ssim = compute_ssim(frame, self.prev_frame)
if ssim > self.ssim_threshold:
return False, f"scene_static (ssim={ssim:.3f})"
self.prev_frame = frame.copy()
return True, "ok"
8. Nejlepší postupy a výkon
Typický výkon: YOLO26m v OpenCV Pipeline s předzpracováním CUDA
| Železářské zboží | FPS (640 x 640) | Latence | Optimalizace |
|---|---|---|---|
| NVIDIA A100 | ~220 FPS | ~4,5 ms | TensorRT FP16 |
| NVIDIA RTX 4090 | ~180 FPS | ~5,6 ms | TensorRT FP16 |
| NVIDIA RTX 3080 | ~120 FPS | ~8,3 ms | ONNX Runtime |
| Procesor Intel i9 (ONNX) | ~18 FPS | ~55 ms | OpenVINO |
| Raspberry Pi 5 | ~3 FPS | ~330 ms | YOLOv8n INT8 |
Optimalizace výkonu
- Přečtěte si v samostatném vláknu: Neblokujte inferenční smyčku pomocí I/O kamery. Použijte ThreadedVideoCapture k maximalizaci využití GPU.
- BGR->RGB pouze jednou: Nekonvertujte při každém hovoru. Převést na čtení a údržbu RGB v celém kanálu.
- torch.no_grad() vždy v závěru: Zakázat automatický výpočet grafu během inference: -30 % paměti, +10 % rychlost.
- Dávkový závěr: Pokud máte více video streamů, shromažďujte snímky a usuzujte v dávkách. Propustnost GPU se mění téměř lineárně s velikostí dávky.
- TensorRT pro výrobu: Na hardwaru NVIDIA vždy exportujte do TensorRT pro maximální rychlost. YOLOv8m: 50 až 180 FPS s TensorRT FP16.
- Odečítání pozadí jako hradlování: Ve scénách s malým pohybem spusťte DL inferenci pouze na snímcích, kde MOG2 detekuje pohyb. Ušetřete 60–70 % výpočetní techniky.
- Filtr kvality rámu: Roztřesené (skóre rozostření < 100) a podexponované snímky před dedukcí vyhoďte. Snižuje falešné negativy a zlepšuje přesnost systému.
- SSIM pro statické scény: Pokud dva po sobě jdoucí snímky mají SSIM > 0,95, scéna se nezmění. Neopakujte závěr, znovu použijte předchozí výsledky.
- OpenCV CUDA pro předzpracování: Pokud máte GPU NVIDIA a opencv-contrib zkompilovaný pomocí CUDA, je předzpracování (změna velikosti, cvtColor) na GPU 5-10x rychlejší.
Běžné chyby, kterým je třeba se vyhnout
- Neprovádějte zahřívání modelu: První inference je stále pomalejší (kompilace JIT, inicializace CUDA). Před měřením výkonu spusťte 3-5 inferenčních figurín.
- Zapomeňte na model.eval(): V režimu vlaku se vrstvy Dropout a BatchNorm chovají odlišně. Vždy volejte
model.eval()před závěrem. - frame.copy() chybí v anotaci: Neanotujte přímo původní rámec, pokud jej také používáte pro odvození. Vždy používejte
frame.copy(). - Neuvolňujte VideoCapture: Všechno je v pořádku
cv2.VideoCaptureotevřeno musí být uvolněno scap.release(). Použijte kontextový manažer nebo zkuste/konečně. - RTSP bez logiky opětovného připojení: RTSP spojení přestanou fungovat. Každá čtečka kamery musí mít logiku opakování s exponenciálním couváním.
Závěry
Vybudovali jsme kompletní CV potrubí, které integruje OpenCV a PyTorch/YOLO, pokrývající každá vrstva systému připraveného na výrobu:
- OpenCV pro akvizici, předzpracování, postprocessing a vizualizaci
- Neblokující vlákno pro zachycení videa, které maximalizuje využití GPU
- Potrubí připravené na výrobu se strukturovaným protokolováním, upozorňováním a ukládáním videa
- Vícekamerový systém s dávkovou inferencí a automatickým opětovným připojením RTSP pro scénáře sledování
- Odečítání pozadí (MOG2) a Smart Motion Gate pro snížení výpočtu až o 70 %
- PyTorch Profiler pro přesnou identifikaci úzkých míst
- OpenCV CUDA pro GPU akcelerované předzpracování (5-10x vs CPU)
- Posouzení kvality snímků pro filtrování poškozených snímků před odvozením
- Výkonnostní benchmark na skutečném hardwaru: od A100 (220 FPS) po Raspberry Pi (3 FPS)







