Computer Vision on the Edge: Optimizare pentru dispozitive mobile și încorporate
Implementați modele de computer vision pe dispozitivele de vârf - Raspberry Pi, NVIDIA Jetson, smartphone-uri, Microcontrolere ARM - și o provocare de inginerie complet diferită de implementarea în cloud sau Server GPU. Resursele sunt limitate: câțiva wați de consum, gigaocteți de RAM în loc de zeci, fără GPU dedicat sau GPU entry-level. Cu toate acestea, milioane de aplicații necesită inferențe local: supraveghere offline, robotică, dispozitive medicale portabile, automatizări industriale în medii fără conexiune.
În acest articol vom explora tehnici de optimizare pentru implementările de margine: cuantificare, tăiere, distilare a cunoștințelor, formate optimizate (ONNX, TFLite, NCNN) și benchmark-uri reale pe Raspberry Pi 5 și NVIDIA Jetson Orin.
Ce vei învăța
- Prezentare generală a hardware-ului Edge: Raspberry Pi, Jetson Nano/Orin, Coral TPU, Hailo
- Cuantizare: INT8, FP16 - teorie și implementare practică
- Tăiere structurată și nestructurată pentru reducerea parametrilor
- Distilarea cunoștințelor: antrenarea modelelor mici de la modele mari
- TFLite și NCNN: implementare pe dispozitive ARM
- TensorRT: viteză maximă pe GPU NVIDIA (Jetson)
- ONNX Runtime cu optimizări CPU și NPU
- YOLO26 pe Raspberry Pi 5: benchmark și configurație completă
- Conductă video în timp real pe Jetson Orin Nano
1. Hardware Edge pentru Computer Vision
Comparație hardware Edge 2026
| Dispozitiv | CPU | GPU/NPU | RAM | TDP | YOLOv8n FPS |
|---|---|---|---|---|---|
| Raspberry Pi 5 | ARM Cortex-A76 cu 4 nuclee | VideoCore VII | 8 GB | 15W | ~5 FPS |
| Jetson Nano (2 GB) | ARM A57 4-core | 128 de nuclee CUDA | 2 GB | 10W | ~20 FPS |
| Jetson Orin Nano | ARM Cortex-A78AE cu 6 nuclee | 1024 CUDA + DLA | 8 GB | 25W | ~80FPS |
| Jetson AGX Orin | ARM Cortex-A78AE cu 12 nuclee | 2048 CUDA + DLA | 64 GB | 60W | ~200FPS |
| Google Coral TPU | ARM Cortex-A53 cu 4 nuclee | 4 TOPS Edge TPU | 1 GB | 4W | ~30 FPS (TFLite) |
| Salut-8 | - (accelerator PCIe) | 26 TOPS Neural Engine | - | 5W | ~120 FPS |
2. Cuantizare: FP32 la INT8
La cuantizarea reduce precizia numerică a greutăților și activărilor a șablonului: de la float32 (32 biți) la float16 (16 biți) sau int8 (8 biți). Efectul practic: Model de 4 ori mai mic cu INT8, deducție de 2-4 ori mai rapidă, consum mai mic de energie. Pierderea preciziei cu tehnicile moderne este de obicei mai mică de 1%.
2.1 Cuantificare post-antrenament (PTQ)
import torch
import torch.quantization as quant
from torch.ao.quantization import get_default_qconfig, prepare, convert
from torchvision import models
import copy
def quantize_model_ptq(
model: torch.nn.Module,
calibration_loader,
backend: str = 'x86' # 'x86' per CPU Intel, 'qnnpack' per ARM
) -> torch.nn.Module:
"""
Post-Training Quantization (PTQ): quantizza il modello senza retraining.
Richiede solo un piccolo calibration dataset (~100-1000 immagini).
Flusso:
1. Fuse operazioni (Conv+BN+ReLU -> singola op)
2. Insert observer per calibrazione
3. Esegui calibrazione (forward pass sul dataset di calibrazione)
4. Converti in modello quantizzato
"""
torch.backends.quantized.engine = backend
model_to_quantize = copy.deepcopy(model)
model_to_quantize.eval()
# Step 1: Fuse layer comuni per efficienza
# Esempio per ResNet: (Conv, BN, ReLU) -> singola operazione fused
model_to_quantize = torch.quantization.fuse_modules(
model_to_quantize,
[['conv1', 'bn1', 'relu']], # adatta ai nomi del tuo modello
inplace=True
)
# Step 2: Set qconfig e prepara per calibrazione
qconfig = get_default_qconfig(backend)
model_to_quantize.qconfig = qconfig
prepared_model = prepare(model_to_quantize, inplace=False)
# Step 3: Calibrazione con dati reali
print("Calibrazione quantizzazione...")
prepared_model.eval()
with torch.no_grad():
for i, (images, _) in enumerate(calibration_loader):
prepared_model(images)
if i >= 99: # 100 batch di calibrazione sufficienti
break
if i % 10 == 0:
print(f" Batch {i+1}/100")
# Step 4: Conversione al modello quantizzato
quantized_model = convert(prepared_model, inplace=False)
# Verifica dimensioni
def model_size_mb(m: torch.nn.Module) -> float:
param_size = sum(p.nelement() * p.element_size() for p in m.parameters())
buffer_size = sum(b.nelement() * b.element_size() for b in m.buffers())
return (param_size + buffer_size) / (1024 ** 2)
original_size = model_size_mb(model)
quantized_size = model_size_mb(quantized_model)
print(f"Dimensione originale: {original_size:.1f} MB")
print(f"Dimensione quantizzata: {quantized_size:.1f} MB")
print(f"Riduzione: {original_size / quantized_size:.1f}x")
return quantized_model
def compare_inference_speed(original_model, quantized_model,
input_tensor: torch.Tensor, n_runs: int = 100) -> dict:
"""Confronta velocità tra modello originale e quantizzato."""
import time
results = {}
for name, model in [('FP32', original_model), ('INT8', quantized_model)]:
model.eval()
# Warmup
with torch.no_grad():
for _ in range(10):
model(input_tensor)
# Benchmark
start = time.perf_counter()
with torch.no_grad():
for _ in range(n_runs):
model(input_tensor)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / n_runs) * 1000
results[name] = avg_ms
print(f"{name}: {avg_ms:.2f}ms / inference")
speedup = results['FP32'] / results['INT8']
print(f"Speedup INT8: {speedup:.2f}x")
return results
2.2 Cuantificare cu YOLO (Ultralitice)
from ultralytics import YOLO
model = YOLO('yolo26n.pt') # nano per edge
# ---- TFLite INT8 per Raspberry Pi / Coral TPU ----
model.export(
format='tflite',
imgsz=320, # risoluzione ridotta per edge
int8=True, # quantizzazione INT8
data='coco.yaml' # dataset per calibrazione PTQ
)
# Output: yolo26n_int8.tflite
# ---- NCNN per CPU ARM (Raspberry Pi, Android) ----
model.export(
format='ncnn',
imgsz=320,
half=False # NCNN usa FP32 o INT8 nativo
)
# Output: yolo26n_ncnn_model/
# ---- TensorRT FP16 per Jetson ----
model.export(
format='engine',
imgsz=640,
half=True, # FP16
workspace=2, # GB workspace (ridotto per Jetson Nano)
device=0
)
# Output: yolo26n.engine
# ---- ONNX + ONNX Runtime per CPU/NPU ----
model.export(
format='onnx',
imgsz=320,
opset=17,
simplify=True,
dynamic=False # batch size fisso per deployment edge
)
print("Export completati per tutti i target edge")
3. YOLO pe Raspberry Pi 5
Il Raspberry Pi 5 cu 8GB RAM și procesorul ARM Cortex-A76 reprezintă cel mai accesibil punct de intrare pentru edge AI. Cu optimizările potrivite (NCNN, rezoluție redus, urmărirea pentru a reduce frecvența de inferență) se poate realiza un sistem de detectare funcțional în timp real.
# ============================================
# SETUP RASPBERRY PI 5 per Computer Vision
# ============================================
# 1. Installazione dipendenze base
# sudo apt update && sudo apt install -y python3-pip libopencv-dev
# pip install ultralytics ncnn onnxruntime
# 2. Ottimizzazioni sistema per AI
# In /boot/firmware/config.txt:
# gpu_mem=256 # Aumenta memoria GPU (VideoCore VII)
# over_voltage=6 # Overclock lieve
# arm_freq=2800 # Frequenza CPU max (stock 2.4GHz)
# ============================================
# INFERENCE con NCNN su Raspberry Pi
# ============================================
import ncnn
import cv2
import numpy as np
import time
class YOLOncnn:
"""
YOLO inference con NCNN - ottimizzato per CPU ARM.
NCNN e sviluppato da Tencent ed e il runtime più veloce per ARM CPU.
"""
def __init__(self, param_path: str, bin_path: str,
num_threads: int = 4, input_size: int = 320):
self.net = ncnn.Net()
self.net.opt.num_threads = num_threads # usa tutti i core
self.net.opt.use_vulkan_compute = False # no GPU su RPi
self.net.load_param(param_path)
self.net.load_model(bin_path)
self.input_size = input_size
def predict(self, img_bgr: np.ndarray, conf_thresh: float = 0.4) -> list[dict]:
"""Inference NCNN su CPU ARM."""
h, w = img_bgr.shape[:2]
# Resize + normalizzazione per NCNN
img_resized = cv2.resize(img_bgr, (self.input_size, self.input_size))
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
mat_in = ncnn.Mat.from_pixels(
img_rgb, ncnn.Mat.PixelType.PIXEL_RGB, self.input_size, self.input_size
)
mean_vals = [0.485 * 255, 0.456 * 255, 0.406 * 255]
norm_vals = [1/0.229/255, 1/0.224/255, 1/0.225/255]
mat_in.substract_mean_normalize(mean_vals, norm_vals)
ex = self.net.create_extractor()
ex.input("images", mat_in)
_, mat_out = ex.extract("output0")
return self._parse_output(mat_out, conf_thresh, w, h)
def _parse_output(self, mat_out, conf_thresh, orig_w, orig_h) -> list[dict]:
"""Parsing dell'output NCNN in formato detection."""
detections = []
for i in range(mat_out.h):
row = np.array(mat_out.row(i))
confidence = row[4]
if confidence < conf_thresh:
continue
class_scores = row[5:]
class_id = int(np.argmax(class_scores))
class_conf = confidence * class_scores[class_id]
if class_conf >= conf_thresh:
# Coordinate normalizzate -> pixel
cx, cy, bw, bh = row[:4]
x1 = int((cx - bw/2) * orig_w / self.input_size)
y1 = int((cy - bh/2) * orig_h / self.input_size)
x2 = int((cx + bw/2) * orig_w / self.input_size)
y2 = int((cy + bh/2) * orig_h / self.input_size)
detections.append({
'class_id': class_id,
'confidence': float(class_conf),
'bbox': (x1, y1, x2, y2)
})
return detections
def run_rpi_detection_loop(model_param: str, model_bin: str,
camera_id: int = 0) -> None:
"""Loop di detection real-time ottimizzato per Raspberry Pi."""
detector = YOLOncnn(model_param, model_bin, num_threads=4, input_size=320)
cap = cv2.VideoCapture(camera_id)
# Ottimizza acquisizione per RPi
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
frame_skip = 2 # Processa 1 frame su 3 per risparmiare CPU
frame_count = 0
cached_dets = []
fps_history = []
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
if frame_count % frame_skip == 0:
cached_dets = detector.predict(frame, conf_thresh=0.4)
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0
fps_history.append(fps)
# Visualizzazione
for det in cached_dets:
x1, y1, x2, y2 = det['bbox']
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"{det['confidence']:.2f}",
(x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)
avg_fps = sum(fps_history[-30:]) / min(len(fps_history), 30)
cv2.putText(frame, f"FPS: {avg_fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('RPi Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
print(f"FPS medio: {sum(fps_history)/len(fps_history):.1f}")
4. NVIDIA Jetson Orin: TensorRT și DLA
Il Jetson Orin Nano (25W) oferă 1024 de nuclee CUDA și DLA (Deep Learning Accelerator) dedicat. Cu TensorRT FP16 și un model YOLO26n, acestea sunt ușor de depășit 100 FPS pe video 640x640.
from ultralytics import YOLO
import cv2
import time
def setup_jetson_pipeline(model_path: str = 'yolo26n.pt') -> YOLO:
"""
Setup ottimale per Jetson Orin:
1. Esporta in TensorRT FP16
2. Configura jetson_clocks per prestazioni massime
3. Imposta modalità performance per la GPU
"""
import subprocess
# Massimizza performance Jetson (esegui una sola volta)
# subprocess.run(['sudo', 'jetson_clocks'], check=True)
# subprocess.run(['sudo', 'nvpmodel', '-m', '0'], check=True) # MAXN mode
model = YOLO(model_path)
print("Esportazione TensorRT FP16...")
model.export(
format='engine',
imgsz=640,
half=True, # FP16 - quasi la stessa accuratezza di FP32 ma 2x più veloce
workspace=2, # GB workspace GPU (Jetson Orin Nano ha 8GB shared)
device=0,
batch=1,
simplify=True
)
# Carica il modello TensorRT
trt_model = YOLO('yolo26n.engine')
print("Modello TensorRT pronto")
return trt_model
def run_jetson_pipeline(model: YOLO, source=0) -> None:
"""Pipeline real-time ottimizzata per Jetson con statistiche."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
fps_list = []
frame_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
results = model.predict(
frame, conf=0.35, iou=0.45,
verbose=False, half=True # FP16 inference
)
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed
fps_list.append(fps)
# Annotazione con informazioni performance
annotated = results[0].plot()
avg_fps = sum(fps_list[-30:]) / min(len(fps_list), 30)
info_text = [
f"FPS: {fps:.0f} (avg: {avg_fps:.0f})",
f"Detections: {len(results[0].boxes)}",
f"Inference: {elapsed*1000:.1f}ms"
]
for i, text in enumerate(info_text):
cv2.putText(annotated, text, (10, 30 + i * 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
cv2.imshow('Jetson Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
finally:
cap.release()
cv2.destroyAllWindows()
if fps_list:
print(f"\n=== Stats Jetson ===")
print(f"Frame: {frame_count}")
print(f"FPS medio: {sum(fps_list)/len(fps_list):.1f}")
print(f"FPS massimo: {max(fps_list):.1f}")
print(f"Latenza minima: {1000/max(fps_list):.1f}ms")
5. Tunderea și Distilarea Cunoașterii
5.1 Tunderea structurată
import torch
import torch.nn as nn
import torch.nn.utils.prune as prune
def apply_structured_pruning(model: nn.Module,
amount: float = 0.3,
n: int = 2) -> nn.Module:
"""
Structured L2-norm pruning: rimuove interi filtri/neuroni.
Produce modelli più veloci in inferenza (a differenza del pruning non strutturato
che produce solo modelli più piccoli ma non necessariamente più veloci).
amount: percentuale di filtri da rimuovere (0.3 = 30%)
n: norma L_n usata per il ranking dei filtri
"""
for name, module in model.named_modules():
if isinstance(module, nn.Conv2d):
# Prune i filtri convoluzionali meno importanti
prune.ln_structured(
module,
name='weight',
amount=amount,
n=n,
dim=0 # dim=0 = prune filtri in output
)
elif isinstance(module, nn.Linear):
prune.ln_structured(
module,
name='weight',
amount=amount,
n=n,
dim=0
)
return model
def remove_pruning_masks(model: nn.Module) -> nn.Module:
"""
Rende permanente il pruning: rimuove le maschere e i parametri "orig",
lasciando solo i pesi pruned. Necessario prima dell'export.
"""
for name, module in model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
try:
prune.remove(module, 'weight')
except ValueError:
pass
return model
def prune_and_finetune(model: nn.Module, train_loader, val_loader,
prune_amount: float = 0.2, finetune_epochs: int = 5) -> nn.Module:
"""
Pipeline completa:
1. Prune il modello (rimuove il prune_amount% dei filtri)
2. Fine-tunes per recuperare l'accuratezza persa
3. Rimuove le maschere e finalizza
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Applying {prune_amount*100:.0f}% structured pruning...")
model = apply_structured_pruning(model, amount=prune_amount)
# Fine-tuning rapido per recupero accuratezza
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
for epoch in range(finetune_epochs):
model.train()
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
loss = criterion(model(images), labels)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
total_loss += loss.item()
model.eval()
correct = total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
preds = model(images).argmax(1)
correct += preds.eq(labels).sum().item()
total += labels.size(0)
print(f" FT Epoch {epoch+1}/{finetune_epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Acc: {100.*correct/total:.2f}%")
# Finalizza pruning
model = remove_pruning_masks(model)
print("Pruning completato e finalizzato")
return model
6. Distilarea cunoștințelor pentru modelele Edge
Il Distilarea cunoștințelor (KD, Hinton et al., 2015) transferă „cunoștințe” a unui model mare (profesor) într-un model mic (elev). Elevul nu învață doar de la etichete dure ale setului de date, dar de la previziuni blânde a profesorului: repartizările de probabilități care conțin informații despre structura spațiului de date (de ex. că „pisica” seamănă mai mult cu „tigrul” decât cu „mașina”).
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
"""
Loss combinata per Knowledge Distillation.
L_total = alpha * L_hard + (1 - alpha) * L_soft
L_hard = CrossEntropyLoss(student_logits, true_labels)
L_soft = KLDivLoss(softmax(student/T), softmax(teacher/T)) * T^2
T (temperature): valori alti -> distribuzioni più soft -> più informazione strutturale
alpha: peso relativo tra label reali e distillazione dal teacher
"""
def __init__(self, temperature: float = 4.0, alpha: float = 0.7):
super().__init__()
self.T = temperature
self.alpha = alpha
self.hard_loss = nn.CrossEntropyLoss()
self.soft_loss = nn.KLDivLoss(reduction='batchmean')
def forward(self,
student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor) -> torch.Tensor:
# Loss su label reali (hard labels)
hard = self.hard_loss(student_logits, labels)
# Loss su soft predictions del teacher (KL divergence)
student_soft = F.log_softmax(student_logits / self.T, dim=1)
teacher_soft = F.softmax(teacher_logits / self.T, dim=1)
soft = self.soft_loss(student_soft, teacher_soft) * (self.T ** 2)
return self.alpha * hard + (1 - self.alpha) * soft
def train_with_distillation(
teacher: nn.Module, # modello grande, già addestrato
student: nn.Module, # modello piccolo da addestrare
train_loader,
val_loader,
n_epochs: int = 50,
temperature: float = 4.0,
alpha: float = 0.7,
lr: float = 1e-3
) -> nn.Module:
"""
Training del modello student con KD.
Il teacher rimane frozen durante tutto il training.
Tipico risultato:
- MobileNetV3 senza KD su ImageNet: ~67% Top-1
- MobileNetV3 con KD da ResNet-50: ~72% Top-1
- ResNet-50 (teacher): ~76% Top-1
- Delta: +5% con 5x meno parametri!
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
teacher.eval() # Teacher sempre in eval mode
student.to(device)
teacher.to(device)
criterion = DistillationLoss(temperature=temperature, alpha=alpha)
optimizer = torch.optim.AdamW(student.parameters(), lr=lr, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=n_epochs)
best_val_acc = 0.0
best_state = None
for epoch in range(n_epochs):
student.train()
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
# Forward pass
student_logits = student(images)
with torch.no_grad(): # Teacher: nessun gradiente
teacher_logits = teacher(images)
# Loss combinata
loss = criterion(student_logits, teacher_logits, labels)
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(student.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
scheduler.step()
# Validation
student.eval()
correct = total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
preds = student(images).argmax(1)
correct += preds.eq(labels).sum().item()
total += labels.size(0)
val_acc = 100.0 * correct / total
if val_acc > best_val_acc:
best_val_acc = val_acc
best_state = {k: v.cpu().clone() for k, v in student.state_dict().items()}
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{n_epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Val Acc: {val_acc:.2f}% | "
f"Best: {best_val_acc:.2f}%")
student.load_state_dict(best_state)
print(f"\nBest validation accuracy: {best_val_acc:.2f}%")
return student
Comparația strategiilor de compresie pentru Edge
| Tehnică | Reducerea parametrilor | Accelerare | Acc. Pierderi | Necesită recalificare |
|---|---|---|---|---|
| Cuantizare INT8 | 4x | 2-4x | <1% | Nu (PTQ) / Da (QAT) |
| Tăiere structurată 30% | 1,4x | 1,3-1,6x | 1-3% | Da (ajustare fină) |
| Distilarea cunoștințelor | 5-10x (schimb de model) | 5-10x | 3-8% | Da (antrenament complet) |
| FP16 (TensorRT) | 2x | 1,5-2x | <0,5% | No |
| Q + Tăiere + KD | 10-20x | 8-15x | 2-5% | Si |
7. ONNX Runtime: Portabilitate între hardware
ONNX (Open Neural Network Exchange) și dimensiunea standard pentru portabilitate a modelelor de deep learning. Odată exportat în ONNX, același model poate fi rulați cu ONNX Runtime pe CPU, NVIDIA GPU, ARM NPU, Intel OpenVINO, Apple Neural Engine fără modificări ale codului de inferență.
import torch
import onnx
import onnxruntime as ort
import numpy as np
import time
def export_to_onnx(model: torch.nn.Module,
input_shape: tuple = (1, 3, 640, 640),
output_path: str = 'model.onnx',
opset: int = 17) -> str:
"""
Esporta modello PyTorch in formato ONNX ottimizzato.
opset=17: versione del opset ONNX (più alta = più operatori supportati)
dynamic_axes: permette batch size variabile (utile per server, non per edge)
"""
model.eval()
dummy_input = torch.zeros(input_shape)
# Export con ottimizzazioni
torch.onnx.export(
model,
dummy_input,
output_path,
opset_version=opset,
input_names=['images'],
output_names=['output'],
dynamic_axes={
'images': {0: 'batch'},
'output': {0: 'batch'}
},
do_constant_folding=True, # ottimizza operazioni costanti
verbose=False
)
# Verifica il modello esportato
onnx_model = onnx.load(output_path)
onnx.checker.check_model(onnx_model)
print(f"Modello ONNX valido: {output_path}")
return output_path
class ONNXRuntimeInference:
"""
Inference ottimizzata con ONNX Runtime.
Supporta CPU, GPU CUDA, ARM (QNN), Intel OpenVINO come backend.
"""
def __init__(self, model_path: str, device: str = 'cpu'):
providers = self._get_providers(device)
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
# Numero di thread per CPU inference
sess_options.intra_op_num_threads = 4
sess_options.inter_op_num_threads = 2
self.session = ort.InferenceSession(
model_path, sess_options, providers=providers
)
# Cache nomi input/output
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
print(f"ONNX Runtime caricato su: {providers[0]}")
def _get_providers(self, device: str) -> list:
if device == 'cuda':
return ['CUDAExecutionProvider', 'CPUExecutionProvider']
elif device == 'openvino':
return ['OpenVINOExecutionProvider', 'CPUExecutionProvider']
else:
return ['CPUExecutionProvider']
def predict(self, image: np.ndarray) -> np.ndarray:
"""Inference su immagine numpy preprocessata."""
# Assicura formato float32 [B, C, H, W]
if image.ndim == 3:
image = image[np.newaxis, ...]
image = image.astype(np.float32)
return self.session.run(
[self.output_name], {self.input_name: image}
)[0]
def benchmark(self, input_shape: tuple = (1, 3, 640, 640),
n_runs: int = 100) -> dict:
"""Misura latenza e throughput."""
dummy = np.random.rand(*input_shape).astype(np.float32)
# Warmup
for _ in range(10):
self.predict(dummy)
# Benchmark
start = time.perf_counter()
for _ in range(n_runs):
self.predict(dummy)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / n_runs) * 1000
fps = 1000.0 / avg_ms
print(f"ONNX Runtime: {avg_ms:.2f}ms ({fps:.1f} FPS)")
return {'avg_ms': avg_ms, 'fps': fps}
8. Cele mai bune practici pentru implementarea Edge
Lista de verificare pentru implementarea Edge Productie gata
- Alegeți cel mai mic model care îndeplinește cerințele: YOLOv8n sau YOLO26n pentru RPi, YOLOv8m pentru Jetson Orin. Nu utilizați modele Large sau XLarge pe margine. Măsurați ÎNTOTDEAUNA pe hardware-ul țintă.
- Reduceți rezoluția de intrare: 320x320 în loc de 640x640 reduce timpul de inferență cu 75% cu o pierdere moderată de precizie. Pentru articole mari, 320 este suficient.
- Sarirea inteligentă a cadrelor: Dacă obiectele se mișcă încet, procesați 1 din 3-5 cadre. Utilizați un instrument de urmărire (CSRT, ByteTrack) pentru a interpola poziții în cadrele ignorate.
- Optimizați-vă canalul de achiziții: Setați CAP_PROP_BUFFERSIZE=1 pentru a minimiza latența. Utilizați V4L2 direct pe Linux pentru mai puțină suprasarcină decât OpenCV.
- TensorRT pe Jetson: Întotdeauna. Diferența dintre PyTorch și TensorRT FP16 este de 5-8x. Nu există niciun motiv pentru a utiliza PyTorch pentru producția de inferențe pe Jetson.
- Reglare termică: Pe RPi și Jetson, supraîncălzirea provoacă accelerarea. Adăugați radiatoare, controlați temperatura cu
vcgencmd measure_temp(RPi) sautegrastats(Jetson). - Măsurați energia, nu doar viteza: FPS/watt este valoarea care contează pentru dispozitivele cu baterie. Un model de 2 ori mai lent, dar de 4 ori mai eficient din punct de vedere energetic și adesea preferabil.
- Câine de pază și repornire grațioasă: Pe dispozitivele de vârf de producție, implementați întotdeauna un watchdog care repornește procesul de inferență în caz de blocare sau blocare.
- Înregistrare prietenoasă cu marginile: Pe RPi, utilizați SQLite în loc de baze de date la distanță pentru a salva evenimentele local. Sincronizați cu cloud în loturi atunci când conexiunea este disponibilă.
import subprocess
import threading
import time
import logging
class ThermalMonitor:
"""
Monitor termico per Raspberry Pi/Jetson.
Riduce automaticamente il carico di lavoro se la temperatura e troppo alta.
"""
TEMP_WARNING = 75.0 # Celsius: riduce frame rate
TEMP_CRITICAL = 85.0 # Celsius: ferma il processing
def __init__(self, platform: str = 'rpi',
check_interval: float = 5.0):
self.platform = platform
self.check_interval = check_interval
self.current_temp = 0.0
self.throttle_factor = 1.0 # 1.0 = nessun throttling
self._stop = threading.Event()
def get_temperature(self) -> float:
"""Legge la temperatura del SoC."""
try:
if self.platform == 'rpi':
result = subprocess.run(
['vcgencmd', 'measure_temp'],
capture_output=True, text=True
)
# Output: "temp=62.1'C"
temp_str = result.stdout.strip()
return float(temp_str.split('=')[1].replace("'C", ''))
elif self.platform == 'jetson':
# Legge da sysfs
with open('/sys/class/thermal/thermal_zone0/temp') as f:
return float(f.read().strip()) / 1000.0
except Exception as e:
logging.warning(f"Impossibile leggere temperatura: {e}")
return 0.0
def get_throttle_factor(self) -> float:
"""Restituisce il fattore di throttling (0.0-1.0)."""
temp = self.current_temp
if temp < self.TEMP_WARNING:
return 1.0
elif temp < self.TEMP_CRITICAL:
# Throttling lineare tra 75 e 85 gradi
factor = 1.0 - (temp - self.TEMP_WARNING) / (
self.TEMP_CRITICAL - self.TEMP_WARNING
)
return max(0.2, factor) # mai sotto il 20%
else:
return 0.0 # ferma il processing
def monitor_loop(self) -> None:
"""Thread di monitoraggio termico."""
while not self._stop.is_set():
self.current_temp = self.get_temperature()
self.throttle_factor = self.get_throttle_factor()
if self.current_temp >= self.TEMP_CRITICAL:
logging.critical(f"TEMP CRITICA: {self.current_temp:.1f}C - "
f"Processing fermato!")
elif self.current_temp >= self.TEMP_WARNING:
logging.warning(f"TEMP ALTA: {self.current_temp:.1f}C - "
f"Throttle: {self.throttle_factor:.2f}")
time.sleep(self.check_interval)
def start(self) -> None:
t = threading.Thread(target=self.monitor_loop, daemon=True)
t.start()
def stop(self) -> None:
self._stop.set()
Concluzii
Implementarea modelelor de viziune computerizată pe dispozitive de vârf necesită o abordare holistică care combină alegerea hardware-ului, optimizarea modelului și ingineria conductelor. Nu există o soluție unică: combinația optimă depinde de constrângerea dominantă (latență, energie, precizie, cost). În acest articol am construit un set de instrumente complet:
- Hardware Edge: Raspberry Pi 5 pentru scenarii bugetare, Jetson Orin pentru performanță în timp real, Coral TPU și Hailo-8 pentru putere ultra-scăzută
- Cuantificare INT8: reducerea dimensiunii de 4x, accelerare de 2-4x, pierdere de precizie <1% cu PTQ
- NCNN pentru CPU ARM, TensorRT pentru GPU NVIDIA, TFLite + Coral TPU pentru putere ultra-scăzută
- Tăiere structurată + reglare fină: îndepărtați 20-30% din filtre cu pierderi minime de precizie
- Distilarea cunoștințelor: transferați cunoștințele de la modele mari la modele încorporate
- ONNX Runtime: portabilitatea modelului între diferite platforme hardware
- Monitorizare termică și supraveghere: sisteme robuste pentru producție de vârf 24/7
- Sărirea cadrelor + urmărire: reduceți calculul cu 70-80% în scenele cu mișcare mică
Navigare în serie
- Anterior: OpenCV și PyTorch: canal complet de CV
- Următorul: Detectarea și recunoașterea feței: tehnici moderne
Resurse între serii
- MLOps: Model care servește în producție - implementare cloud cu Kubernetes și Triton
- Învățare profundă avansată: cuantizare și compresie







