Computer Vision su Edge: Ottimizzazione per Dispositivi Mobili e Embedded
Deployare modelli di computer vision su dispositivi edge - Raspberry Pi, NVIDIA Jetson, smartphone, microcontrollori ARM - e una sfida ingegneristica completamente diversa dal deployment su cloud o server GPU. Le risorse sono limitate: pochi watt di consumo, gigabytes di RAM invece di decine, assenza di GPU dedicata o GPU entry-level. Eppure, milioni di applicazioni richiedono inference locale: sorveglianza offline, robotica, dispositivi medici portatili, automazione industriale in ambienti senza connessione.
In questo articolo esploreremo le tecniche di ottimizzazione per edge deployment: quantizzazione, pruning, knowledge distillation, formati ottimizzati (ONNX, TFLite, NCNN) e benchmark reali su Raspberry Pi 5 e NVIDIA Jetson Orin.
Cosa Imparerai
- Panoramica hardware edge: Raspberry Pi, Jetson Nano/Orin, Coral TPU, Hailo
- Quantizzazione: INT8, FP16 - teoria e implementazione pratica
- Pruning strutturato e non strutturato per ridurre i parametri
- Knowledge Distillation: addestrare modelli piccoli da modelli grandi
- TFLite e NCNN: deployment su dispositivi ARM
- TensorRT: massima velocità su GPU NVIDIA (Jetson)
- ONNX Runtime con ottimizzazioni per CPU e NPU
- YOLO26 su Raspberry Pi 5: benchmark e configurazione completa
- Pipeline video real-time su Jetson Orin Nano
1. Hardware Edge per Computer Vision
Confronto Hardware Edge 2026
| Dispositivo | CPU | GPU/NPU | RAM | TDP | YOLOv8n FPS |
|---|---|---|---|---|---|
| Raspberry Pi 5 | ARM Cortex-A76 4-core | VideoCore VII | 8GB | 15W | ~5 FPS |
| Jetson Nano (2GB) | ARM A57 4-core | 128 CUDA cores | 2GB | 10W | ~20 FPS |
| Jetson Orin Nano | ARM Cortex-A78AE 6-core | 1024 CUDA + DLA | 8GB | 25W | ~80 FPS |
| Jetson AGX Orin | ARM Cortex-A78AE 12-core | 2048 CUDA + DLA | 64GB | 60W | ~200 FPS |
| Google Coral TPU | ARM Cortex-A53 4-core | 4 TOPS Edge TPU | 1GB | 4W | ~30 FPS (TFLite) |
| Hailo-8 | - (PCIe accelerator) | 26 TOPS Neural Engine | - | 5W | ~120 FPS |
2. Quantizzazione: Da FP32 a INT8
La quantizzazione riduce la precisione numerica dei pesi e delle attivazioni del modello: da float32 (32 bit) a float16 (16 bit) o int8 (8 bit). L'effetto pratico: modello 4x più piccolo con INT8, inference 2-4x più veloce, consumo energetico ridotto. La perdita di accuratezza con tecniche moderne e tipicamente inferiore all'1%.
2.1 Post-Training Quantization (PTQ)
import torch
import torch.quantization as quant
from torch.ao.quantization import get_default_qconfig, prepare, convert
from torchvision import models
import copy
def quantize_model_ptq(
model: torch.nn.Module,
calibration_loader,
backend: str = 'x86' # 'x86' per CPU Intel, 'qnnpack' per ARM
) -> torch.nn.Module:
"""
Post-Training Quantization (PTQ): quantizza il modello senza retraining.
Richiede solo un piccolo calibration dataset (~100-1000 immagini).
Flusso:
1. Fuse operazioni (Conv+BN+ReLU -> singola op)
2. Insert observer per calibrazione
3. Esegui calibrazione (forward pass sul dataset di calibrazione)
4. Converti in modello quantizzato
"""
torch.backends.quantized.engine = backend
model_to_quantize = copy.deepcopy(model)
model_to_quantize.eval()
# Step 1: Fuse layer comuni per efficienza
# Esempio per ResNet: (Conv, BN, ReLU) -> singola operazione fused
model_to_quantize = torch.quantization.fuse_modules(
model_to_quantize,
[['conv1', 'bn1', 'relu']], # adatta ai nomi del tuo modello
inplace=True
)
# Step 2: Set qconfig e prepara per calibrazione
qconfig = get_default_qconfig(backend)
model_to_quantize.qconfig = qconfig
prepared_model = prepare(model_to_quantize, inplace=False)
# Step 3: Calibrazione con dati reali
print("Calibrazione quantizzazione...")
prepared_model.eval()
with torch.no_grad():
for i, (images, _) in enumerate(calibration_loader):
prepared_model(images)
if i >= 99: # 100 batch di calibrazione sufficienti
break
if i % 10 == 0:
print(f" Batch {i+1}/100")
# Step 4: Conversione al modello quantizzato
quantized_model = convert(prepared_model, inplace=False)
# Verifica dimensioni
def model_size_mb(m: torch.nn.Module) -> float:
param_size = sum(p.nelement() * p.element_size() for p in m.parameters())
buffer_size = sum(b.nelement() * b.element_size() for b in m.buffers())
return (param_size + buffer_size) / (1024 ** 2)
original_size = model_size_mb(model)
quantized_size = model_size_mb(quantized_model)
print(f"Dimensione originale: {original_size:.1f} MB")
print(f"Dimensione quantizzata: {quantized_size:.1f} MB")
print(f"Riduzione: {original_size / quantized_size:.1f}x")
return quantized_model
def compare_inference_speed(original_model, quantized_model,
input_tensor: torch.Tensor, n_runs: int = 100) -> dict:
"""Confronta velocità tra modello originale e quantizzato."""
import time
results = {}
for name, model in [('FP32', original_model), ('INT8', quantized_model)]:
model.eval()
# Warmup
with torch.no_grad():
for _ in range(10):
model(input_tensor)
# Benchmark
start = time.perf_counter()
with torch.no_grad():
for _ in range(n_runs):
model(input_tensor)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / n_runs) * 1000
results[name] = avg_ms
print(f"{name}: {avg_ms:.2f}ms / inference")
speedup = results['FP32'] / results['INT8']
print(f"Speedup INT8: {speedup:.2f}x")
return results
2.2 Quantizzazione con YOLO (Ultralytics)
from ultralytics import YOLO
model = YOLO('yolo26n.pt') # nano per edge
# ---- TFLite INT8 per Raspberry Pi / Coral TPU ----
model.export(
format='tflite',
imgsz=320, # risoluzione ridotta per edge
int8=True, # quantizzazione INT8
data='coco.yaml' # dataset per calibrazione PTQ
)
# Output: yolo26n_int8.tflite
# ---- NCNN per CPU ARM (Raspberry Pi, Android) ----
model.export(
format='ncnn',
imgsz=320,
half=False # NCNN usa FP32 o INT8 nativo
)
# Output: yolo26n_ncnn_model/
# ---- TensorRT FP16 per Jetson ----
model.export(
format='engine',
imgsz=640,
half=True, # FP16
workspace=2, # GB workspace (ridotto per Jetson Nano)
device=0
)
# Output: yolo26n.engine
# ---- ONNX + ONNX Runtime per CPU/NPU ----
model.export(
format='onnx',
imgsz=320,
opset=17,
simplify=True,
dynamic=False # batch size fisso per deployment edge
)
print("Export completati per tutti i target edge")
3. YOLO su Raspberry Pi 5
Il Raspberry Pi 5 con 8GB di RAM e il processore ARM Cortex-A76 rappresenta il punto di ingresso più accessibile per l'AI edge. Con le giuste ottimizzazioni (NCNN, risoluzione ridotta, tracking per ridurre inference frequency) si può raggiungere un sistema di detection funzionale in tempo reale.
# ============================================
# SETUP RASPBERRY PI 5 per Computer Vision
# ============================================
# 1. Installazione dipendenze base
# sudo apt update && sudo apt install -y python3-pip libopencv-dev
# pip install ultralytics ncnn onnxruntime
# 2. Ottimizzazioni sistema per AI
# In /boot/firmware/config.txt:
# gpu_mem=256 # Aumenta memoria GPU (VideoCore VII)
# over_voltage=6 # Overclock lieve
# arm_freq=2800 # Frequenza CPU max (stock 2.4GHz)
# ============================================
# INFERENCE con NCNN su Raspberry Pi
# ============================================
import ncnn
import cv2
import numpy as np
import time
class YOLOncnn:
"""
YOLO inference con NCNN - ottimizzato per CPU ARM.
NCNN e sviluppato da Tencent ed e il runtime più veloce per ARM CPU.
"""
def __init__(self, param_path: str, bin_path: str,
num_threads: int = 4, input_size: int = 320):
self.net = ncnn.Net()
self.net.opt.num_threads = num_threads # usa tutti i core
self.net.opt.use_vulkan_compute = False # no GPU su RPi
self.net.load_param(param_path)
self.net.load_model(bin_path)
self.input_size = input_size
def predict(self, img_bgr: np.ndarray, conf_thresh: float = 0.4) -> list[dict]:
"""Inference NCNN su CPU ARM."""
h, w = img_bgr.shape[:2]
# Resize + normalizzazione per NCNN
img_resized = cv2.resize(img_bgr, (self.input_size, self.input_size))
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)
mat_in = ncnn.Mat.from_pixels(
img_rgb, ncnn.Mat.PixelType.PIXEL_RGB, self.input_size, self.input_size
)
mean_vals = [0.485 * 255, 0.456 * 255, 0.406 * 255]
norm_vals = [1/0.229/255, 1/0.224/255, 1/0.225/255]
mat_in.substract_mean_normalize(mean_vals, norm_vals)
ex = self.net.create_extractor()
ex.input("images", mat_in)
_, mat_out = ex.extract("output0")
return self._parse_output(mat_out, conf_thresh, w, h)
def _parse_output(self, mat_out, conf_thresh, orig_w, orig_h) -> list[dict]:
"""Parsing dell'output NCNN in formato detection."""
detections = []
for i in range(mat_out.h):
row = np.array(mat_out.row(i))
confidence = row[4]
if confidence < conf_thresh:
continue
class_scores = row[5:]
class_id = int(np.argmax(class_scores))
class_conf = confidence * class_scores[class_id]
if class_conf >= conf_thresh:
# Coordinate normalizzate -> pixel
cx, cy, bw, bh = row[:4]
x1 = int((cx - bw/2) * orig_w / self.input_size)
y1 = int((cy - bh/2) * orig_h / self.input_size)
x2 = int((cx + bw/2) * orig_w / self.input_size)
y2 = int((cy + bh/2) * orig_h / self.input_size)
detections.append({
'class_id': class_id,
'confidence': float(class_conf),
'bbox': (x1, y1, x2, y2)
})
return detections
def run_rpi_detection_loop(model_param: str, model_bin: str,
camera_id: int = 0) -> None:
"""Loop di detection real-time ottimizzato per Raspberry Pi."""
detector = YOLOncnn(model_param, model_bin, num_threads=4, input_size=320)
cap = cv2.VideoCapture(camera_id)
# Ottimizza acquisizione per RPi
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
frame_skip = 2 # Processa 1 frame su 3 per risparmiare CPU
frame_count = 0
cached_dets = []
fps_history = []
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
if frame_count % frame_skip == 0:
cached_dets = detector.predict(frame, conf_thresh=0.4)
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed if elapsed > 0 else 0
fps_history.append(fps)
# Visualizzazione
for det in cached_dets:
x1, y1, x2, y2 = det['bbox']
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"{det['confidence']:.2f}",
(x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)
avg_fps = sum(fps_history[-30:]) / min(len(fps_history), 30)
cv2.putText(frame, f"FPS: {avg_fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('RPi Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
print(f"FPS medio: {sum(fps_history)/len(fps_history):.1f}")
4. NVIDIA Jetson Orin: TensorRT e DLA
Il Jetson Orin Nano (25W) offre 1024 CUDA cores e un DLA (Deep Learning Accelerator) dedicato. Con TensorRT FP16 e un modello YOLO26n, si superano facilmente i 100 FPS su video 640x640.
from ultralytics import YOLO
import cv2
import time
def setup_jetson_pipeline(model_path: str = 'yolo26n.pt') -> YOLO:
"""
Setup ottimale per Jetson Orin:
1. Esporta in TensorRT FP16
2. Configura jetson_clocks per prestazioni massime
3. Imposta modalità performance per la GPU
"""
import subprocess
# Massimizza performance Jetson (esegui una sola volta)
# subprocess.run(['sudo', 'jetson_clocks'], check=True)
# subprocess.run(['sudo', 'nvpmodel', '-m', '0'], check=True) # MAXN mode
model = YOLO(model_path)
print("Esportazione TensorRT FP16...")
model.export(
format='engine',
imgsz=640,
half=True, # FP16 - quasi la stessa accuratezza di FP32 ma 2x più veloce
workspace=2, # GB workspace GPU (Jetson Orin Nano ha 8GB shared)
device=0,
batch=1,
simplify=True
)
# Carica il modello TensorRT
trt_model = YOLO('yolo26n.engine')
print("Modello TensorRT pronto")
return trt_model
def run_jetson_pipeline(model: YOLO, source=0) -> None:
"""Pipeline real-time ottimizzata per Jetson con statistiche."""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
fps_list = []
frame_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
break
t0 = time.perf_counter()
results = model.predict(
frame, conf=0.35, iou=0.45,
verbose=False, half=True # FP16 inference
)
elapsed = time.perf_counter() - t0
fps = 1.0 / elapsed
fps_list.append(fps)
# Annotazione con informazioni performance
annotated = results[0].plot()
avg_fps = sum(fps_list[-30:]) / min(len(fps_list), 30)
info_text = [
f"FPS: {fps:.0f} (avg: {avg_fps:.0f})",
f"Detections: {len(results[0].boxes)}",
f"Inference: {elapsed*1000:.1f}ms"
]
for i, text in enumerate(info_text):
cv2.putText(annotated, text, (10, 30 + i * 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
cv2.imshow('Jetson Pipeline', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
finally:
cap.release()
cv2.destroyAllWindows()
if fps_list:
print(f"\n=== Stats Jetson ===")
print(f"Frame: {frame_count}")
print(f"FPS medio: {sum(fps_list)/len(fps_list):.1f}")
print(f"FPS massimo: {max(fps_list):.1f}")
print(f"Latenza minima: {1000/max(fps_list):.1f}ms")
5. Pruning e Knowledge Distillation
5.1 Structured Pruning
import torch
import torch.nn as nn
import torch.nn.utils.prune as prune
def apply_structured_pruning(model: nn.Module,
amount: float = 0.3,
n: int = 2) -> nn.Module:
"""
Structured L2-norm pruning: rimuove interi filtri/neuroni.
Produce modelli più veloci in inferenza (a differenza del pruning non strutturato
che produce solo modelli più piccoli ma non necessariamente più veloci).
amount: percentuale di filtri da rimuovere (0.3 = 30%)
n: norma L_n usata per il ranking dei filtri
"""
for name, module in model.named_modules():
if isinstance(module, nn.Conv2d):
# Prune i filtri convoluzionali meno importanti
prune.ln_structured(
module,
name='weight',
amount=amount,
n=n,
dim=0 # dim=0 = prune filtri in output
)
elif isinstance(module, nn.Linear):
prune.ln_structured(
module,
name='weight',
amount=amount,
n=n,
dim=0
)
return model
def remove_pruning_masks(model: nn.Module) -> nn.Module:
"""
Rende permanente il pruning: rimuove le maschere e i parametri "orig",
lasciando solo i pesi pruned. Necessario prima dell'export.
"""
for name, module in model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
try:
prune.remove(module, 'weight')
except ValueError:
pass
return model
def prune_and_finetune(model: nn.Module, train_loader, val_loader,
prune_amount: float = 0.2, finetune_epochs: int = 5) -> nn.Module:
"""
Pipeline completa:
1. Prune il modello (rimuove il prune_amount% dei filtri)
2. Fine-tunes per recuperare l'accuratezza persa
3. Rimuove le maschere e finalizza
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Applying {prune_amount*100:.0f}% structured pruning...")
model = apply_structured_pruning(model, amount=prune_amount)
# Fine-tuning rapido per recupero accuratezza
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
for epoch in range(finetune_epochs):
model.train()
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
loss = criterion(model(images), labels)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
total_loss += loss.item()
model.eval()
correct = total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
preds = model(images).argmax(1)
correct += preds.eq(labels).sum().item()
total += labels.size(0)
print(f" FT Epoch {epoch+1}/{finetune_epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Acc: {100.*correct/total:.2f}%")
# Finalizza pruning
model = remove_pruning_masks(model)
print("Pruning completato e finalizzato")
return model
6. Knowledge Distillation per Edge Models
Il Knowledge Distillation (KD, Hinton et al., 2015) trasferisce la "conoscenza" di un modello grande (teacher) in uno piccolo (student). Il student non impara solo dalle hard labels del dataset, ma dalle soft predictions del teacher: distribuzioni di probabilità che contengono informazione sulla struttura dello spazio dei dati (es. che "gatto" e più simile a "tigre" che a "automobile").
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
"""
Loss combinata per Knowledge Distillation.
L_total = alpha * L_hard + (1 - alpha) * L_soft
L_hard = CrossEntropyLoss(student_logits, true_labels)
L_soft = KLDivLoss(softmax(student/T), softmax(teacher/T)) * T^2
T (temperature): valori alti -> distribuzioni più soft -> più informazione strutturale
alpha: peso relativo tra label reali e distillazione dal teacher
"""
def __init__(self, temperature: float = 4.0, alpha: float = 0.7):
super().__init__()
self.T = temperature
self.alpha = alpha
self.hard_loss = nn.CrossEntropyLoss()
self.soft_loss = nn.KLDivLoss(reduction='batchmean')
def forward(self,
student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor) -> torch.Tensor:
# Loss su label reali (hard labels)
hard = self.hard_loss(student_logits, labels)
# Loss su soft predictions del teacher (KL divergence)
student_soft = F.log_softmax(student_logits / self.T, dim=1)
teacher_soft = F.softmax(teacher_logits / self.T, dim=1)
soft = self.soft_loss(student_soft, teacher_soft) * (self.T ** 2)
return self.alpha * hard + (1 - self.alpha) * soft
def train_with_distillation(
teacher: nn.Module, # modello grande, già addestrato
student: nn.Module, # modello piccolo da addestrare
train_loader,
val_loader,
n_epochs: int = 50,
temperature: float = 4.0,
alpha: float = 0.7,
lr: float = 1e-3
) -> nn.Module:
"""
Training del modello student con KD.
Il teacher rimane frozen durante tutto il training.
Tipico risultato:
- MobileNetV3 senza KD su ImageNet: ~67% Top-1
- MobileNetV3 con KD da ResNet-50: ~72% Top-1
- ResNet-50 (teacher): ~76% Top-1
- Delta: +5% con 5x meno parametri!
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
teacher.eval() # Teacher sempre in eval mode
student.to(device)
teacher.to(device)
criterion = DistillationLoss(temperature=temperature, alpha=alpha)
optimizer = torch.optim.AdamW(student.parameters(), lr=lr, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=n_epochs)
best_val_acc = 0.0
best_state = None
for epoch in range(n_epochs):
student.train()
total_loss = 0.0
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
# Forward pass
student_logits = student(images)
with torch.no_grad(): # Teacher: nessun gradiente
teacher_logits = teacher(images)
# Loss combinata
loss = criterion(student_logits, teacher_logits, labels)
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(student.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
scheduler.step()
# Validation
student.eval()
correct = total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
preds = student(images).argmax(1)
correct += preds.eq(labels).sum().item()
total += labels.size(0)
val_acc = 100.0 * correct / total
if val_acc > best_val_acc:
best_val_acc = val_acc
best_state = {k: v.cpu().clone() for k, v in student.state_dict().items()}
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{n_epochs} | "
f"Loss: {total_loss/len(train_loader):.4f} | "
f"Val Acc: {val_acc:.2f}% | "
f"Best: {best_val_acc:.2f}%")
student.load_state_dict(best_state)
print(f"\nBest validation accuracy: {best_val_acc:.2f}%")
return student
Confronto Strategie di Compressione per Edge
| Tecnica | Riduzione Params | Speedup | Perdita Acc. | Richiede Retraining |
|---|---|---|---|---|
| Quantizzazione INT8 | 4x | 2-4x | <1% | No (PTQ) / Si (QAT) |
| Pruning strutturato 30% | 1.4x | 1.3-1.6x | 1-3% | Si (fine-tuning) |
| Knowledge Distillation | 5-10x (model swap) | 5-10x | 3-8% | Si (full training) |
| FP16 (TensorRT) | 2x | 1.5-2x | <0.5% | No |
| Q + Pruning + KD | 10-20x | 8-15x | 2-5% | Si |
7. ONNX Runtime: Portabilita tra Hardware
ONNX (Open Neural Network Exchange) e il formato standard per la portabilita dei modelli di deep learning. Una volta esportato in ONNX, lo stesso modello può essere eseguito con ONNX Runtime su CPU, GPU NVIDIA, ARM NPU, Intel OpenVINO, Apple Neural Engine senza modifiche al codice di inference.
import torch
import onnx
import onnxruntime as ort
import numpy as np
import time
def export_to_onnx(model: torch.nn.Module,
input_shape: tuple = (1, 3, 640, 640),
output_path: str = 'model.onnx',
opset: int = 17) -> str:
"""
Esporta modello PyTorch in formato ONNX ottimizzato.
opset=17: versione del opset ONNX (più alta = più operatori supportati)
dynamic_axes: permette batch size variabile (utile per server, non per edge)
"""
model.eval()
dummy_input = torch.zeros(input_shape)
# Export con ottimizzazioni
torch.onnx.export(
model,
dummy_input,
output_path,
opset_version=opset,
input_names=['images'],
output_names=['output'],
dynamic_axes={
'images': {0: 'batch'},
'output': {0: 'batch'}
},
do_constant_folding=True, # ottimizza operazioni costanti
verbose=False
)
# Verifica il modello esportato
onnx_model = onnx.load(output_path)
onnx.checker.check_model(onnx_model)
print(f"Modello ONNX valido: {output_path}")
return output_path
class ONNXRuntimeInference:
"""
Inference ottimizzata con ONNX Runtime.
Supporta CPU, GPU CUDA, ARM (QNN), Intel OpenVINO come backend.
"""
def __init__(self, model_path: str, device: str = 'cpu'):
providers = self._get_providers(device)
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
# Numero di thread per CPU inference
sess_options.intra_op_num_threads = 4
sess_options.inter_op_num_threads = 2
self.session = ort.InferenceSession(
model_path, sess_options, providers=providers
)
# Cache nomi input/output
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
print(f"ONNX Runtime caricato su: {providers[0]}")
def _get_providers(self, device: str) -> list:
if device == 'cuda':
return ['CUDAExecutionProvider', 'CPUExecutionProvider']
elif device == 'openvino':
return ['OpenVINOExecutionProvider', 'CPUExecutionProvider']
else:
return ['CPUExecutionProvider']
def predict(self, image: np.ndarray) -> np.ndarray:
"""Inference su immagine numpy preprocessata."""
# Assicura formato float32 [B, C, H, W]
if image.ndim == 3:
image = image[np.newaxis, ...]
image = image.astype(np.float32)
return self.session.run(
[self.output_name], {self.input_name: image}
)[0]
def benchmark(self, input_shape: tuple = (1, 3, 640, 640),
n_runs: int = 100) -> dict:
"""Misura latenza e throughput."""
dummy = np.random.rand(*input_shape).astype(np.float32)
# Warmup
for _ in range(10):
self.predict(dummy)
# Benchmark
start = time.perf_counter()
for _ in range(n_runs):
self.predict(dummy)
elapsed = time.perf_counter() - start
avg_ms = (elapsed / n_runs) * 1000
fps = 1000.0 / avg_ms
print(f"ONNX Runtime: {avg_ms:.2f}ms ({fps:.1f} FPS)")
return {'avg_ms': avg_ms, 'fps': fps}
8. Best Practices per Edge Deployment
Checklist per Edge Deployment Production-Ready
- Scegli il modello più piccolo che soddisfa i requisiti: YOLOv8n o YOLO26n per RPi, YOLOv8m per Jetson Orin. Non usare modelli Large o XLarge su edge. Misura SEMPRE su hardware target.
- Riduci la risoluzione di input: 320x320 invece di 640x640 riduce il tempo di inference del 75% con perdita di accuratezza moderata. Per oggetti grandi, 320 e sufficiente.
- Frame skipping intelligente: Se gli oggetti si muovono lentamente, elabora 1 frame su 3-5. Usa un tracker (CSRT, ByteTrack) per interpolare le posizioni nei frame saltati.
- Ottimizza il pipeline di acquisizione: Imposta CAP_PROP_BUFFERSIZE=1 per minimizzare la latenza. Usa V4L2 direttamente su Linux per minore overhead rispetto a OpenCV.
- TensorRT su Jetson: Sempre. La differenza tra PyTorch e TensorRT FP16 e 5-8x. Non c'è motivo di usare PyTorch per inference production su Jetson.
- Thermal throttling: Su RPi e Jetson, il surriscaldamento causa throttling. Aggiungi dissipatori, controlla la temperatura con
vcgencmd measure_temp(RPi) otegrastats(Jetson). - Misura l'energia, non solo la velocità: FPS/watt e la metrica che conta per i dispositivi batteria. Un modello 2x più lento ma 4x più efficiente energeticamente e spesso preferibile.
- Watchdog e graceful restart: Su dispositivi edge in produzione, implementa sempre un watchdog che riavvia il processo di inference in caso di crash o freeze.
- Logging edge-friendly: Su RPi, usa SQLite invece di database remoti per salvare eventi localmente. Sincronizza con il cloud in batch quando la connessione e disponibile.
import subprocess
import threading
import time
import logging
class ThermalMonitor:
"""
Monitor termico per Raspberry Pi/Jetson.
Riduce automaticamente il carico di lavoro se la temperatura e troppo alta.
"""
TEMP_WARNING = 75.0 # Celsius: riduce frame rate
TEMP_CRITICAL = 85.0 # Celsius: ferma il processing
def __init__(self, platform: str = 'rpi',
check_interval: float = 5.0):
self.platform = platform
self.check_interval = check_interval
self.current_temp = 0.0
self.throttle_factor = 1.0 # 1.0 = nessun throttling
self._stop = threading.Event()
def get_temperature(self) -> float:
"""Legge la temperatura del SoC."""
try:
if self.platform == 'rpi':
result = subprocess.run(
['vcgencmd', 'measure_temp'],
capture_output=True, text=True
)
# Output: "temp=62.1'C"
temp_str = result.stdout.strip()
return float(temp_str.split('=')[1].replace("'C", ''))
elif self.platform == 'jetson':
# Legge da sysfs
with open('/sys/class/thermal/thermal_zone0/temp') as f:
return float(f.read().strip()) / 1000.0
except Exception as e:
logging.warning(f"Impossibile leggere temperatura: {e}")
return 0.0
def get_throttle_factor(self) -> float:
"""Restituisce il fattore di throttling (0.0-1.0)."""
temp = self.current_temp
if temp < self.TEMP_WARNING:
return 1.0
elif temp < self.TEMP_CRITICAL:
# Throttling lineare tra 75 e 85 gradi
factor = 1.0 - (temp - self.TEMP_WARNING) / (
self.TEMP_CRITICAL - self.TEMP_WARNING
)
return max(0.2, factor) # mai sotto il 20%
else:
return 0.0 # ferma il processing
def monitor_loop(self) -> None:
"""Thread di monitoraggio termico."""
while not self._stop.is_set():
self.current_temp = self.get_temperature()
self.throttle_factor = self.get_throttle_factor()
if self.current_temp >= self.TEMP_CRITICAL:
logging.critical(f"TEMP CRITICA: {self.current_temp:.1f}C - "
f"Processing fermato!")
elif self.current_temp >= self.TEMP_WARNING:
logging.warning(f"TEMP ALTA: {self.current_temp:.1f}C - "
f"Throttle: {self.throttle_factor:.2f}")
time.sleep(self.check_interval)
def start(self) -> None:
t = threading.Thread(target=self.monitor_loop, daemon=True)
t.start()
def stop(self) -> None:
self._stop.set()
Conclusioni
Il deployment di modelli di computer vision su dispositivi edge richiede un approccio olistico che combina scelta del hardware, ottimizzazione del modello e pipeline engineering. Non esiste una soluzione unica: la combinazione ottimale dipende dal vincolo dominante (latenza, energia, accuratezza, costo). In questo articolo abbiamo costruito un toolkit completo:
- Hardware edge: Raspberry Pi 5 per scenari budget, Jetson Orin per performance real-time, Coral TPU e Hailo-8 per ultra-low power
- Quantizzazione INT8: 4x riduzione dimensione, 2-4x speedup, <1% perdita accuratezza con PTQ
- NCNN per CPU ARM, TensorRT per GPU NVIDIA, TFLite + Coral TPU per ultra-low power
- Pruning strutturato + fine-tuning: rimuovi il 20-30% dei filtri con minima perdita di accuratezza
- Knowledge Distillation: trasferisci la conoscenza di modelli grandi in modelli embedded
- ONNX Runtime: portabilita del modello tra piattaforme hardware diverse
- Monitoring termico e watchdog: sistemi robusti per produzione su edge 24/7
- Frame skipping + tracking: riduci il compute del 70-80% in scene con poco movimento
Navigazione Serie
- Precedente: OpenCV e PyTorch: Pipeline CV Completa
- Successivo: Face Detection e Recognition: Tecniche Moderne
Risorse Cross-Serie
- MLOps: Model Serving in Produzione - deploy cloud con Kubernetes e Triton
- Deep Learning Avanzato: Quantizzazione e Compressione







