OpenCV ve PyTorch: Komple Bilgisayarlı Görme İşlem Hattı
OpenCV e PyTorch onlar bilgisayar ekosisteminin iki direğidir çağdaş vizyon. OpenCV, görüntü edinme, ön işleme ve son işleme konularında üstündür: geleneksel işlemler, web kamerası ve video yönetimi, morfolojik dönüşümler, klasik filtreler. PyTorch derin öğrenmeyi getiriyor: sinir ağları, GPU hesaplama, uçtan uca eğitim. Birlikte oluşurlar ham piksel ediniminden akıllı tahmine kadar uzanan eksiksiz bir CV hattı.
Bu makalede, video yakalamadan başlayarak eksiksiz bir bilgisayarlı görme hattı oluşturacağız. OpenCV ile ön işleme, PyTorch/YOLO modeliyle çıkarım yapma, görselleştirme ve sonuçların günlüğe kaydedilmesi. Doğrudan dağıtabileceğiniz, üretime hazır bir sistem.
Ne Öğreneceksiniz
- Temel OpenCV: görüntü/video okuma, renk uzayları, morfolojik işlemler
- OpenCV-PyTorch entegrasyonu: tensör dönüşümü, optimize edilmiş ardışık düzen
- Tampon yönetimi ile gerçek zamanlı video edinimi
- Ön işleme hattı: yeniden boyutlandırma, normalleştirme, toplu hazırlık
- Optimize edilmiş çıkarım: toplu işleme, eşzamansız çıkarım
- İşlem sonrası: NMS, koordinat dönüşümü, çerçeve açıklaması
- Çoklu kamera hattı ve RTSP akış işleme
- Tespit edilen olaylar için kayıt ve uyarı sistemi
- Performans optimizasyonu: iş parçacığı oluşturma, GPU akışları, profil oluşturma
1. CV İşlem Hattı için OpenCV Temelleri
1.1 Renk Uzayları ve Dönüşümleri
Sıklıkla gözden kaçırılan kritik bir nokta: OpenCV formatı kullanıyor BGR varsayılan olarak, PyTorch (ve PIL) kullanırken RGB. Bu ikisini karıştırmak feci sonuçlar doğurur: RGB görüntüleri üzerinde önceden eğitilmiş model, ters çevrilmiş kanalları alacaktır. Her zaman dönüştürün!
import cv2
import numpy as np
import torch
# ---- Lettura e conversioni ----
def load_image_rgb(path: str) -> np.ndarray:
"""Carica immagine in formato RGB (corretto per PyTorch)."""
img_bgr = cv2.imread(path)
if img_bgr is None:
raise FileNotFoundError(f"Immagine non trovata: {path}")
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def bgr_to_torch(img_bgr: np.ndarray) -> torch.Tensor:
"""
Converte immagine OpenCV BGR in tensor PyTorch normalizzato.
BGR [H, W, C] uint8 -> RGB [C, H, W] float32 normalizzato [0,1]
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(img_rgb).permute(2, 0, 1).float() / 255.0
return tensor
def torch_to_bgr(tensor: torch.Tensor) -> np.ndarray:
"""
Converte tensor PyTorch in immagine OpenCV BGR.
RGB [C, H, W] float32 -> BGR [H, W, C] uint8
"""
img_rgb = (tensor.permute(1, 2, 0).cpu().numpy() * 255).astype(np.uint8)
return cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
# ---- Spazi colore utili ----
def analyze_color_spaces(img_bgr: np.ndarray) -> dict:
"""Analizza l'immagine in diversi spazi colore."""
return {
'bgr': img_bgr,
'rgb': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB),
'gray': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY),
'hsv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV),
'lab': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB),
'yuv': cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YUV),
}
# ---- Operazioni morfologiche ----
def apply_morphology(gray: np.ndarray, operation: str = 'opening',
kernel_size: int = 5) -> np.ndarray:
"""
Operazioni morfologiche per pre/post-processing.
opening = erosione + dilatazione (rimuove rumore piccolo)
closing = dilatazione + erosione (chiude piccoli buchi)
"""
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
ops = {
'erode': cv2.erode(gray, kernel),
'dilate': cv2.dilate(gray, kernel),
'opening': cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel),
'closing': cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel),
'gradient': cv2.morphologyEx(gray, cv2.MORPH_GRADIENT, kernel),
'tophat': cv2.morphologyEx(gray, cv2.MORPH_TOPHAT, kernel)
}
return ops.get(operation, gray)
# ---- Rilevamento contorni e feature classiche ----
def detect_edges_and_contours(img_bgr: np.ndarray) -> tuple:
"""Pipeline classica: blur -> Canny -> contorni."""
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(blurred, threshold1=50, threshold2=150)
contours, hierarchy = cv2.findContours(
edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filtra contorni per area minima
significant_contours = [c for c in contours if cv2.contourArea(c) > 500]
return edges, significant_contours
# ---- Preprocessing per modelli DL ----
def preprocess_for_model(img_bgr: np.ndarray,
target_size: tuple[int,int] = (640, 640),
mean: list = [0.485, 0.456, 0.406],
std: list = [0.229, 0.224, 0.225]) -> torch.Tensor:
"""
Pipeline completa: BGR immagine -> tensor normalizzato pronto per inference.
"""
# Letterbox resize (mantiene aspect ratio)
h, w = img_bgr.shape[:2]
target_h, target_w = target_size
scale = min(target_h / h, target_w / w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(img_bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
# Padding per raggiungere target_size
pad_h = target_h - new_h
pad_w = target_w - new_w
padded = cv2.copyMakeBorder(
resized,
pad_h // 2, pad_h - pad_h // 2,
pad_w // 2, pad_w - pad_w // 2,
cv2.BORDER_CONSTANT, value=(114, 114, 114)
)
# BGR -> RGB -> float -> normalize -> tensor
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1) # HWC -> CHW
mean_t = torch.tensor(mean).view(3, 1, 1)
std_t = torch.tensor(std).view(3, 1, 1)
tensor = (tensor - mean_t) / std_t
return tensor.unsqueeze(0), scale, (pad_w // 2, pad_h // 2) # batch dim + metadata
2. Video ve Web Kamerası Edinimi
2.1 Video Yakalama ve Çerçeve Arabelleği
import cv2
import threading
import queue
import time
from dataclasses import dataclass
@dataclass
class Frame:
"""Wrapper per un frame con metadata."""
data: np.ndarray
timestamp: float
frame_id: int
class ThreadedVideoCapture:
"""
VideoCapture con lettura in thread separato.
Previene il drop di frame dovuto al processing lento:
la GPU che fa inference non blocca la lettura della camera.
"""
def __init__(self, source, max_buffer_size: int = 5):
self.cap = cv2.VideoCapture(source)
if not self.cap.isOpened():
raise RuntimeError(f"Impossibile aprire: {source}")
# Ottimizza per latenza bassa
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.buffer = queue.Queue(maxsize=max_buffer_size)
self.stopped = False
self.frame_id = 0
# Avvia thread di lettura
self.thread = threading.Thread(target=self._read_frames, daemon=True)
self.thread.start()
def _read_frames(self) -> None:
"""Thread worker: legge frame continuamente."""
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
self.stopped = True
break
frame_obj = Frame(
data=frame,
timestamp=time.time(),
frame_id=self.frame_id
)
self.frame_id += 1
# Scarta frame vecchi se il buffer e pieno
if self.buffer.full():
try:
self.buffer.get_nowait()
except queue.Empty:
pass
self.buffer.put(frame_obj)
def read(self) -> Frame | None:
"""Leggi prossimo frame disponibile."""
try:
return self.buffer.get(timeout=1.0)
except queue.Empty:
return None
def get_fps(self) -> float:
return self.cap.get(cv2.CAP_PROP_FPS)
def get_resolution(self) -> tuple[int, int]:
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def release(self) -> None:
self.stopped = True
self.cap.release()
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
3. CV Pipeline'ı tamamlayın: OpenCV + YOLO
import cv2
import torch
import numpy as np
import time
import logging
import json
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from ultralytics import YOLO
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Singola detection con tutti i metadata."""
class_name: str
class_id: int
confidence: float
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
timestamp: float = field(default_factory=time.time)
frame_id: int = 0
def to_dict(self) -> dict:
return {
'class': self.class_name,
'confidence': round(self.confidence, 3),
'bbox': list(self.bbox),
'timestamp': self.timestamp,
'frame_id': self.frame_id
}
class CVPipeline:
"""
Pipeline completa Computer Vision:
Acquisizione -> Preprocessing -> Inference -> Post-processing -> Output
"""
def __init__(
self,
model_path: str,
source, # int (webcam), str (file/RTSP)
conf_threshold: float = 0.4,
iou_threshold: float = 0.45,
target_classes: list[str] | None = None, # None = tutte le classi
output_dir: str = 'output',
save_video: bool = False,
alert_classes: list[str] | None = None # classi che triggerano alert
):
self.model = YOLO(model_path)
self.conf = conf_threshold
self.iou = iou_threshold
self.target_classes = target_classes
self.alert_classes = alert_classes or []
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.source = source
self.save_video = save_video
# Stats
self.frame_count = 0
self.total_detections = 0
self.start_time = None
self.fps_history = []
def run(self) -> None:
"""Avvia la pipeline di processing."""
logger.info(f"Avvio pipeline: source={self.source} modello={self.model.model_name}")
cap = cv2.VideoCapture(self.source)
if not cap.isOpened():
raise RuntimeError(f"Impossibile aprire source: {self.source}")
# Setup video writer se richiesto
writer = None
if self.save_video:
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = str(self.output_dir / f"output_{timestamp}.mp4")
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
logger.info(f"Salvataggio video: {out_path}")
self.start_time = time.time()
detection_log = []
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
# Inference
detections = self._run_inference(frame, self.frame_count)
# Visualizzazione
annotated = self._annotate_frame(frame, detections)
# Stats overlay
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0.0
self.fps_history.append(fps)
annotated = self._add_stats_overlay(annotated, fps, len(detections))
# Salvataggio
if writer:
writer.write(annotated)
cv2.imshow('CV Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Logging detections
for det in detections:
detection_log.append(det.to_dict())
if det.class_name in self.alert_classes:
self._trigger_alert(det)
self.frame_count += 1
self.total_detections += len(detections)
finally:
cap.release()
if writer:
writer.release()
cv2.destroyAllWindows()
# Salva log
log_path = self.output_dir / 'detection_log.json'
with open(log_path, 'w') as f:
json.dump(detection_log, f, indent=2)
self._print_stats()
def _run_inference(self, frame: np.ndarray, frame_id: int) -> list[Detection]:
"""Esegue YOLO inference su un singolo frame."""
results = self.model.predict(
frame, conf=self.conf, iou=self.iou, verbose=False
)
detections = []
for box in results[0].boxes:
class_name = self.model.names[int(box.cls[0])]
# Filtra per classi target se specificato
if self.target_classes and class_name not in self.target_classes:
continue
x1, y1, x2, y2 = [int(c) for c in box.xyxy[0]]
detections.append(Detection(
class_name=class_name,
class_id=int(box.cls[0]),
confidence=float(box.conf[0]),
bbox=(x1, y1, x2, y2),
frame_id=frame_id
))
return detections
def _annotate_frame(self, frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
"""Annota il frame con bounding boxes e label."""
annotated = frame.copy()
np.random.seed(42)
colors = {name: tuple(np.random.randint(50, 255, 3).tolist())
for name in self.model.names.values()}
for det in detections:
x1, y1, x2, y2 = det.bbox
color = colors.get(det.class_name, (0, 255, 0))
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = f"{det.class_name} {det.confidence:.2f}"
label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated,
(x1, y1 - label_size[1] - 10),
(x1 + label_size[0] + 5, y1), color, -1)
cv2.putText(annotated, label, (x1 + 2, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return annotated
def _add_stats_overlay(self, frame: np.ndarray, fps: float,
n_detections: int) -> np.ndarray:
"""Aggiunge overlay con statistiche real-time."""
h = frame.shape[0]
stats = [
f"FPS: {fps:.1f}",
f"Detections: {n_detections}",
f"Frame: {self.frame_count}",
f"Avg FPS: {np.mean(self.fps_history[-30:]):.1f}" if self.fps_history else "Avg FPS: -"
]
for i, text in enumerate(stats):
cv2.putText(frame, text, (10, h - 100 + i * 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return frame
def _trigger_alert(self, det: Detection) -> None:
"""Gestisce un evento di alert per classi critiche."""
logger.warning(f"ALERT: {det.class_name} rilevato con confidenza {det.confidence:.2f} "
f"(frame {det.frame_id})")
# In produzione: invia notifica (email, Slack, webhook)
def _print_stats(self) -> None:
"""Stampa statistiche finali della sessione."""
elapsed = time.time() - self.start_time
avg_fps = self.frame_count / elapsed if elapsed > 0 else 0
logger.info(f"\n=== Statistiche Pipeline ===")
logger.info(f"Frame processati: {self.frame_count}")
logger.info(f"Detections totali: {self.total_detections}")
logger.info(f"Tempo totale: {elapsed:.1f}s")
logger.info(f"FPS medio: {avg_fps:.1f}")
# Utilizzo
if __name__ == '__main__':
pipeline = CVPipeline(
model_path='yolo26m.pt',
source=0, # webcam (o 'video.mp4', 'rtsp://...')
conf_threshold=0.4,
target_classes=['person', 'car', 'truck'],
alert_classes=['person'],
output_dir='output',
save_video=True
)
pipeline.run()
4. İşlem Sonrası için OpenCV İşlemleri
import cv2
import numpy as np
# ---- Background Subtraction (MOG2) ----
def setup_background_subtraction():
"""
MOG2: Mixture of Gaussians per rilevamento movimento.
Utile come primo filtro prima dell'inference DL.
"""
subtractor = cv2.createBackgroundSubtractorMOG2(
history=500,
varThreshold=50,
detectShadows=True
)
return subtractor
def detect_motion(frame_bgr: np.ndarray, subtractor,
min_contour_area: int = 1000) -> list[tuple]:
"""Rileva regioni di movimento nel frame."""
fg_mask = subtractor.apply(frame_bgr)
# Cleanup maschera
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
motion_regions = []
for c in contours:
if cv2.contourArea(c) >= min_contour_area:
x, y, w, h = cv2.boundingRect(c)
motion_regions.append((x, y, x+w, y+h))
return motion_regions
# ---- Optical Flow (Lucas-Kanade) ----
def compute_sparse_optical_flow(prev_gray: np.ndarray,
curr_gray: np.ndarray,
prev_points: np.ndarray) -> tuple:
"""
Lucas-Kanade optical flow per tracking di punti specifici.
Utile per tracking di keypoints rilevati nel frame precedente.
"""
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
good_prev = prev_points[status.flatten() == 1]
good_curr = curr_points[status.flatten() == 1]
# Calcola velocità media del movimento
if len(good_prev) > 0:
flow_vectors = good_curr - good_prev
avg_speed = np.mean(np.linalg.norm(flow_vectors, axis=1))
else:
avg_speed = 0.0
return good_curr, good_prev, avg_speed
# ---- CSRT Tracker (più preciso di MOSSE) ----
class ObjectTracker:
"""
Tracker multi-oggetto per mantenere l'identità degli oggetti
tra frame consecutivi senza rieseguire l'inference a ogni frame.
"""
def __init__(self, tracker_type: str = 'CSRT'):
self.tracker_type = tracker_type
self.trackers = [] # lista di (tracker, class_name, id)
self.next_id = 0
def add_tracker(self, frame: np.ndarray, bbox: tuple, class_name: str) -> int:
"""Aggiunge un tracker per un nuovo oggetto."""
x1, y1, x2, y2 = bbox
cv2_bbox = (x1, y1, x2 - x1, y2 - y1) # formato OpenCV: x, y, w, h
tracker = cv2.TrackerCSRT_create()
tracker.init(frame, cv2_bbox)
obj_id = self.next_id
self.trackers.append((tracker, class_name, obj_id))
self.next_id += 1
return obj_id
def update(self, frame: np.ndarray) -> list[dict]:
"""Aggiorna tutti i tracker con il nuovo frame."""
active_objects = []
failed_trackers = []
for i, (tracker, class_name, obj_id) in enumerate(self.trackers):
success, cv2_bbox = tracker.update(frame)
if success:
x, y, w, h = [int(v) for v in cv2_bbox]
active_objects.append({
'id': obj_id,
'class_name': class_name,
'bbox': (x, y, x + w, y + h)
})
else:
failed_trackers.append(i)
# Rimuovi tracker falliti (indietro per evitare shift indici)
for i in reversed(failed_trackers):
self.trackers.pop(i)
return active_objects
def clear(self) -> None:
self.trackers = []
5. Çoklu Kamera Boru Hattı ve RTSP Akışı
import cv2
import threading
import queue
import torch
from ultralytics import YOLO
from dataclasses import dataclass
@dataclass
class CameraFrame:
camera_id: int
frame: np.ndarray
timestamp: float
class MultiCameraPipeline:
"""
Pipeline multi-camera con:
- Thread separato per ogni camera
- Coda condivisa per batch inference
- GPU condivisa tra tutte le camere
"""
def __init__(self, sources: list, model_path: str, batch_size: int = 4):
self.sources = sources
self.model = YOLO(model_path)
self.batch_size = batch_size
self.frame_queue = queue.Queue(maxsize=batch_size * 2)
self.stopped = False
def _camera_reader(self, camera_id: int, source) -> None:
"""Thread worker per singola camera."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if isinstance(source, str) and 'rtsp' in source:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
while not self.stopped:
ret, frame = cap.read()
if not ret:
logger.warning(f"Camera {camera_id} disconnessa")
time.sleep(1.0)
cap = cv2.VideoCapture(source) # Reconnect
continue
frame_obj = CameraFrame(camera_id=camera_id, frame=frame,
timestamp=time.time())
try:
self.frame_queue.put(frame_obj, timeout=0.1)
except queue.Full:
pass # Drop frame se la coda e piena
cap.release()
def run(self) -> None:
"""Avvia tutti i reader thread e processa batch dalla coda."""
threads = []
for i, source in enumerate(self.sources):
t = threading.Thread(
target=self._camera_reader, args=(i, source), daemon=True
)
t.start()
threads.append(t)
logger.info(f"Pipeline avviata: {len(self.sources)} camere")
batch = []
while not self.stopped:
try:
frame_obj = self.frame_queue.get(timeout=0.5)
batch.append(frame_obj)
# Processa batch quando pieno o timeout
if len(batch) >= self.batch_size:
self._process_batch(batch)
batch = []
except queue.Empty:
if batch:
self._process_batch(batch)
batch = []
def _process_batch(self, batch: list[CameraFrame]) -> None:
"""Inference batch su GPU - più efficiente di inference singola."""
frames = [f.frame for f in batch]
# Batch inference con YOLO
results = self.model.predict(
frames, conf=0.4, iou=0.45, verbose=False
)
for frame_obj, result in zip(batch, results):
n_det = len(result.boxes)
if n_det > 0:
logger.info(f"Camera {frame_obj.camera_id}: {n_det} oggetti rilevati")
def stop(self) -> None:
self.stopped = True
# Configurazione multi-camera
pipeline = MultiCameraPipeline(
sources=[
0, # Webcam locale
'rtsp://192.168.1.100/stream1', # Camera IP RTSP
'rtsp://192.168.1.101/stream1', # Camera IP RTSP 2
'videos/recording.mp4' # File video
],
model_path='yolo26m.pt',
batch_size=4
)
# pipeline.run()
6. Profil Oluşturma ve OpenCV CUDA Hızlandırılmış
6.1 Pipeline CV için PyTorch Profiler
Optimize etmeden önce ölçün. PyTorch Profilcisi ve araç boru hattındaki darboğazları tespit etmede daha güçlü: tam olarak anlıyor zamanın nereye gittiği (ön işleme, çıkarım, son işleme, CPU/GPU veri aktarımı).
import torch
import torch.profiler as profiler
import cv2
import numpy as np
from ultralytics import YOLO
def profile_cv_pipeline(model_path: str,
video_source,
n_frames: int = 100,
output_dir: str = './profiler_logs') -> None:
"""
Profila la pipeline CV con PyTorch Profiler.
Genera log compatibili con TensorBoard per analisi visuale.
Output: profiler_logs/ (aprire con: tensorboard --logdir profiler_logs)
"""
model = YOLO(model_path)
cap = cv2.VideoCapture(video_source)
frame_count = 0
with profiler.profile(
activities=[
profiler.ProfilerActivity.CPU,
profiler.ProfilerActivity.CUDA,
],
schedule=profiler.schedule(
wait=5, # Skip first 5 iterations (warmup variability)
warmup=5, # Warmup 5 iterations (profiler overhead)
active=20, # Profile 20 iterations
repeat=2 # Repeat the cycle 2 times
),
on_trace_ready=profiler.tensorboard_trace_handler(output_dir),
record_shapes=True, # Record tensor shapes
profile_memory=True, # Track memory allocations
with_stack=True # Include call stack
) as prof:
while frame_count < n_frames:
ret, frame = cap.read()
if not ret:
break
with torch.profiler.record_function("preprocessing"):
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
tensor = torch.from_numpy(rgb).float() / 255.0
tensor = tensor.permute(2, 0, 1).unsqueeze(0)
if torch.cuda.is_available():
tensor = tensor.cuda()
with torch.profiler.record_function("inference"):
with torch.no_grad():
results = model.predict(frame, verbose=False)
with torch.profiler.record_function("postprocessing"):
boxes = results[0].boxes
n_det = len(boxes)
prof.step()
frame_count += 1
cap.release()
print(f"Profiling completato. Risultati in: {output_dir}")
print(f"Avvia: tensorboard --logdir {output_dir}")
# Stampa tabella top operazioni (utile senza TensorBoard)
print(prof.key_averages().table(
sort_by="cuda_time_total", row_limit=15
))
# Profiling semplice con timer manuale per produzioni senza TensorBoard
class PipelineProfiler:
"""Profiler leggero per monitoraggio continuo in produzione."""
def __init__(self, window_size: int = 100):
import collections
self.window_size = window_size
self.times: dict = {
'preprocess': collections.deque(maxlen=window_size),
'inference': collections.deque(maxlen=window_size),
'postprocess': collections.deque(maxlen=window_size),
}
def record(self, stage: str, duration_ms: float) -> None:
if stage in self.times:
self.times[stage].append(duration_ms)
def report(self) -> dict:
"""Restituisce statistiche rolling degli ultimi N frame."""
stats = {}
for stage, times in self.times.items():
if times:
arr = np.array(times)
stats[stage] = {
'mean_ms': float(np.mean(arr)),
'p50_ms': float(np.percentile(arr, 50)),
'p95_ms': float(np.percentile(arr, 95)),
'p99_ms': float(np.percentile(arr, 99)),
}
return stats
def print_report(self) -> None:
stats = self.report()
print("\n=== Pipeline Performance (last {self.window_size} frames) ===")
for stage, s in stats.items():
print(f" {stage:12s}: mean={s['mean_ms']:.1f}ms "
f"p95={s['p95_ms']:.1f}ms p99={s['p99_ms']:.1f}ms")
6.2 CUDA Hızlandırmalı OpenCV
OpenCV bir modülü destekler Cuda (CUDA ile derlenmiş opencv-contrib-python) NVIDIA GPU'larda ön işleme işlemlerini 10 kata kadar hızlandırır. özellikle yüksek frekanslı işlem hatlarında yeniden boyutlandırma, Gauss bulanıklığı ve renk dönüşümleri için kullanışlıdır.
import cv2
import numpy as np
import torch
def check_opencv_cuda() -> dict:
"""Verifica disponibilità e configurazione di OpenCV CUDA."""
info = {
'opencv_version': cv2.__version__,
'cuda_enabled': cv2.cuda.getCudaEnabledDeviceCount() > 0,
'gpu_count': cv2.cuda.getCudaEnabledDeviceCount(),
}
if info['cuda_enabled']:
info['gpu_name'] = cv2.cuda.printShortCudaDeviceInfo(0)
return info
class CUDAPreprocessor:
"""
Preprocessor OpenCV accelerato su GPU.
Richiede: pip install opencv-contrib-python
e compilazione OpenCV con CUDA support.
Speedup tipico vs CPU: 5-10x per resize+cvtColor
"""
def __init__(self, target_size: tuple = (640, 640)):
self.target_size = target_size
self.has_cuda = cv2.cuda.getCudaEnabledDeviceCount() > 0
if self.has_cuda:
# Pre-alloca stream CUDA per pipeline asincrona
self.stream = cv2.cuda_Stream()
# GPU matrix per evitare ri-allocazioni
self.gpu_frame = cv2.cuda_GpuMat()
self.gpu_rgb = cv2.cuda_GpuMat()
self.gpu_resized = cv2.cuda_GpuMat()
def preprocess_cuda(self, img_bgr: np.ndarray) -> np.ndarray:
"""
Preprocessing su GPU: BGR->RGB + Resize + (opzionale) Gaussian blur.
Returns: numpy array RGB normalizzato [0,1] float32
"""
if not self.has_cuda:
return self._preprocess_cpu(img_bgr)
# Upload a GPU
self.gpu_frame.upload(img_bgr, self.stream)
# BGR -> RGB su GPU
cv2.cuda.cvtColor(self.gpu_frame, cv2.COLOR_BGR2RGB,
self.gpu_rgb, stream=self.stream)
# Resize su GPU (INTER_LINEAR e supportato in cuda)
cv2.cuda.resize(self.gpu_rgb, self.target_size,
self.gpu_resized,
interpolation=cv2.INTER_LINEAR,
stream=self.stream)
# Download da GPU (sincrono - necessario prima dell'inference PyTorch)
result = self.gpu_resized.download(stream=self.stream)
self.stream.waitForCompletion()
return result.astype(np.float32) / 255.0
def _preprocess_cpu(self, img_bgr: np.ndarray) -> np.ndarray:
"""Fallback CPU se CUDA non e disponibile."""
rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, self.target_size, interpolation=cv2.INTER_LINEAR)
return resized.astype(np.float32) / 255.0
def preprocess_batch_cuda(self, frames: list[np.ndarray]) -> torch.Tensor:
"""
Preprocessing batch su GPU.
Converte lista di frame in un batch tensor pronto per inference.
Massimizza throughput per pipeline multi-camera.
"""
preprocessed = [self.preprocess_cuda(f) for f in frames]
# Stack in batch tensor [B, C, H, W]
batch = np.stack(preprocessed) # [B, H, W, C]
batch_tensor = torch.from_numpy(batch).permute(0, 3, 1, 2)
if torch.cuda.is_available():
batch_tensor = batch_tensor.cuda(non_blocking=True)
return batch_tensor
# Benchmark: CPU vs CUDA preprocessing
def benchmark_preprocessing(n_frames: int = 500,
img_size: tuple = (1920, 1080)) -> None:
"""Confronta velocità preprocessing CPU vs CUDA."""
import time
# Genera frame sintetici
frames = [np.random.randint(0, 255, (*img_size, 3), dtype=np.uint8)
for _ in range(10)]
preprocessor = CUDAPreprocessor(target_size=(640, 640))
# Benchmark CPU
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor._preprocess_cpu(f)
cpu_time = time.perf_counter() - t0
# Benchmark CUDA (se disponibile)
if preprocessor.has_cuda:
t0 = time.perf_counter()
for i in range(n_frames):
f = frames[i % 10]
preprocessor.preprocess_cuda(f)
cuda_time = time.perf_counter() - t0
print(f"Preprocessing {n_frames} frame ({img_size[1]}x{img_size[0]} -> 640x640):")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
print(f" CUDA: {cuda_time:.2f}s ({n_frames/cuda_time:.0f} FPS)")
print(f" Speedup: {cpu_time/cuda_time:.1f}x")
else:
print("CUDA non disponibile, solo benchmark CPU")
print(f" CPU: {cpu_time:.2f}s ({n_frames/cpu_time:.0f} FPS)")
7. Görüntü Kalitesi Değerlendirmesi ve Çerçeve Metrikleri
Bir çerçeveyi çıkarıma göndermeden önce kalitesini değerlendirmekte fayda var. Titreyen, aşırı pozlanmış veya zayıf keskinleştirilmiş çerçeveler, yanlış negatifler üretir ve sistemin güvenilirliği. Genellikle kalite için kullanılan PSNR ve SSIM ölçümleri sıkıştırmanın yanı sıra çerçeve filtrelemeye de iyi uygulanırlar.
import cv2
import numpy as np
from dataclasses import dataclass
@dataclass
class FrameQuality:
"""Metriche di qualità per un singolo frame."""
blur_score: float # Varianza Laplaciano (alto = nitido)
brightness: float # Luminosita media [0, 255]
contrast: float # Deviazione standard luminosita
is_valid: bool # Frame accettabile per inference?
reject_reason: str # Motivo del rifiuto ('', 'blur', 'dark', ecc.)
class FrameQualityFilter:
"""
Filtra frame di bassa qualità prima dell'inference.
Riduce falsi negativi in sistemi di sorveglianza.
"""
def __init__(self,
blur_threshold: float = 100.0, # Laplacian variance
brightness_min: float = 30.0,
brightness_max: float = 220.0,
contrast_min: float = 15.0):
self.blur_threshold = blur_threshold
self.brightness_min = brightness_min
self.brightness_max = brightness_max
self.contrast_min = contrast_min
def assess(self, frame_bgr: np.ndarray) -> FrameQuality:
"""Valuta qualità del frame."""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Blur detection: varianza del Laplaciano
# Un frame nitido ha bordi marcati -> alta varianza
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
# Luminosita e contrasto
brightness = float(np.mean(gray))
contrast = float(np.std(gray))
# Valutazione
reject_reason = ''
if laplacian_var < self.blur_threshold:
reject_reason = f'blur (var={laplacian_var:.1f})'
elif brightness < self.brightness_min:
reject_reason = f'too_dark (mean={brightness:.1f})'
elif brightness > self.brightness_max:
reject_reason = f'overexposed (mean={brightness:.1f})'
elif contrast < self.contrast_min:
reject_reason = f'low_contrast (std={contrast:.1f})'
return FrameQuality(
blur_score=laplacian_var,
brightness=brightness,
contrast=contrast,
is_valid=(reject_reason == ''),
reject_reason=reject_reason
)
def filter_batch(self, frames: list[np.ndarray]) -> tuple[list, list]:
"""
Filtra una batch di frame.
Returns: (valid_frames, rejected_frames_with_reason)
"""
valid = []
rejected = []
for frame in frames:
quality = self.assess(frame)
if quality.is_valid:
valid.append(frame)
else:
rejected.append((frame, quality.reject_reason))
return valid, rejected
def compute_psnr(original: np.ndarray, compressed: np.ndarray) -> float:
"""
Peak Signal-to-Noise Ratio tra immagine originale e compressa.
PSNR > 40 dB = ottima qualità; < 20 dB = bassa qualità percettibile.
"""
mse = np.mean((original.astype(float) - compressed.astype(float)) ** 2)
if mse == 0:
return float('inf')
max_pixel = 255.0
return 20 * np.log10(max_pixel / np.sqrt(mse))
def compute_ssim(img1: np.ndarray, img2: np.ndarray) -> float:
"""
Structural Similarity Index (SSIM) tra due immagini grayscale.
Range: [-1, 1], dove 1 = identiche, 0 = non correlate.
Più fedele alla percezione umana rispetto a MSE/PSNR.
"""
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(float)
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(float)
C1 = (0.01 * 255) ** 2 # costante stabilità
C2 = (0.03 * 255) ** 2
mu1, mu2 = gray1.mean(), gray2.mean()
sigma1 = gray1.std()
sigma2 = gray2.std()
sigma12 = np.mean((gray1 - mu1) * (gray2 - mu2))
numerator = (2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)
denominator = (mu1**2 + mu2**2 + C1) * (sigma1**2 + sigma2**2 + C2)
return float(numerator / denominator)
# Utilizzo in pipeline con motion gating intelligente
class SmartMotionGate:
"""
Combina background subtraction + SSIM per gating intelligente.
Evita sia di inferire su frame identici (statica) sia su frame corrotti.
"""
def __init__(self, ssim_change_threshold: float = 0.95):
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=200, varThreshold=40, detectShadows=False
)
self.quality_filter = FrameQualityFilter()
self.ssim_threshold = ssim_change_threshold
self.prev_frame = None
def should_infer(self, frame: np.ndarray) -> tuple[bool, str]:
"""
Decide se inferire su questo frame.
Returns: (should_infer, reason)
"""
# 1. Quality check
quality = self.quality_filter.assess(frame)
if not quality.is_valid:
return False, f"low_quality: {quality.reject_reason}"
# 2. Motion detection (veloce)
fg_mask = self.subtractor.apply(frame)
motion_ratio = np.sum(fg_mask > 0) / fg_mask.size
if motion_ratio < 0.005: # meno dello 0.5% dei pixel in movimento
return False, "no_motion"
# 3. SSIM check (rileva cambiamenti strutturali, non solo rumore)
if self.prev_frame is not None:
ssim = compute_ssim(frame, self.prev_frame)
if ssim > self.ssim_threshold:
return False, f"scene_static (ssim={ssim:.3f})"
self.prev_frame = frame.copy()
return True, "ok"
8. En İyi Uygulamalar ve Performans
Tipik Performans: CUDA Ön İşleme ile OpenCV Pipeline'da YOLO26m
| Donanım | FPS (640x640) | Gecikme | Optimizasyon |
|---|---|---|---|
| NVIDIA A100 | ~220FPS | ~4,5ms | TensorRT FP16 |
| NVIDIA RTX 4090 | ~180FPS | ~5,6ms | TensorRT FP16 |
| NVIDIA RTX 3080 | ~120FPS | ~8,3ms | ONNX Çalışma Zamanı |
| Intel i9 CPU (ONNX) | ~18 FPS | ~55ms | AçıkVINO |
| Ahududu Pi 5 | ~3 FPS | ~330ms | YOLOv8n INT8 |
Performans optimizasyonları
- Ayrı başlıkta okuyun: Çıkarım döngüsünü kamera G/Ç ile engellemeyin. GPU kullanımını en üst düzeye çıkarmak için ThreadedVideoCapture'ı kullanın.
- BGR->RGB yalnızca bir kez: Her aramada dönüşüm yapmayın. İşlem hattı boyunca RGB'yi okumak ve korumak için dönüştürün.
- torch.no_grad() her zaman çıkarımda bulunur: Çıkarım sırasında otomatik dereceli grafik hesaplamasını devre dışı bırakın: -%30 bellek, +%10 hız.
- Toplu çıkarım: Birden fazla video akışınız varsa kareleri biriktirin ve gruplar halinde çıkarım yapın. GPU verimi, parti boyutuyla neredeyse doğrusal olarak ölçeklenir.
- Üretim için TensorRT: NVIDIA donanımında, maksimum hız için her zaman TensorRT'ye aktarın. YOLOv8m: TensorRT FP16 ile 50 ila 180 FPS.
- Geçitleme olarak arka plan çıkarma: Hareketin az olduğu sahnelerde, DL çıkarımını yalnızca MOG2'nin hareketi algıladığı karelerde çalıştırın. Bilgi işlemden %60-70 oranında tasarruf edin.
- Çerçeve kalitesi filtresi: Çıkarımdan önce titrek (bulanıklık puanı < 100) ve az pozlanmış kareleri atın. Yanlış negatifleri azaltır ve sistem hassasiyetini artırır.
- Statik sahneler için SSIM: Ardışık iki karenin SSIM > 0,95 olması durumunda sahne değişmez. Çıkarımı yeniden çalıştırmayın, önceki sonuçları yeniden kullanın.
- Ön işleme için OpenCV CUDA: Bir NVIDIA GPU'nuz ve CUDA ile derlenmiş opencv-contrib'iniz varsa, ön işleme (yeniden boyutlandırma, cvtColor) GPU'da 5-10 kat daha hızlıdır.
Kaçınılması Gereken Yaygın Hatalar
- Model ısınmalarını gerçekleştirmeyin: İlk çıkarım giderek daha yavaştır (JIT derlemesi, CUDA başlatma). Performansı ölçmeden önce 3-5 çıkarım kuklası çalıştırın.
- model.eval()'ı unutun: Eğitim modunda Dropout ve BatchNorm katmanları farklı davranır. Her zaman ara
model.eval()çıkarımdan önce. - Açıklamada Frame.copy() eksik: Çıkarım için de kullanıyorsanız, orijinal çerçeveye doğrudan açıklama eklemeyin. Her zaman kullan
frame.copy(). - VideoCapture'ı serbest bırakmayın: Her şey yolunda
cv2.VideoCaptureaçılmış olarak serbest bırakılmalıdırcap.release(). Bağlam yöneticisini kullanın veya deneyin/sonunda. - Yeniden bağlanma mantığı olmayan RTSP: RTSP bağlantıları kopuyor. Her kamera okuyucusunun üstel geri çekilmeli yeniden deneme mantığı olmalıdır.
Sonuçlar
OpenCV ve PyTorch/YOLO'yu entegre eden eksiksiz bir CV hattı oluşturduk. üretime hazır bir sistemin her katmanı:
- Edinme, ön işleme, son işleme ve görselleştirme için OpenCV
- GPU kullanımını en üst düzeye çıkaran, engellenmeyen video yakalama iş parçacığı
- Yapılandırılmış günlük kaydı, uyarı ve video kaydetme özellikleriyle üretime hazır işlem hattı
- Gözetim senaryoları için toplu çıkarım ve RTSP otomatik yeniden bağlanma özelliğine sahip çoklu kamera
- Bilgi işlem miktarını %70'e kadar azaltmak için arka planda çıkarma (MOG2) ve Smart Motion Gate
- Darboğazları tam olarak belirlemek için PyTorch Profiler
- GPU hızlandırmalı ön işleme için OpenCV CUDA (CPU'ya karşı 5-10x)
- Çıkarımdan önce bozuk çerçeveleri filtrelemek için Çerçeve Kalitesi Değerlendirmesi
- Gerçek donanımda performans karşılaştırması: A100'den (220 FPS) Raspberry Pi'ye (3 FPS)







