Studiu de caz: Detectarea anomaliilor industriale cu viziune computerizată
Inspecția vizuală automată în producția industrială este unul dintre cazurile de utilizare de viziune computerizată cu impact economic mai mare: defectele nedetectate costă miliarde euro pe an în rechemarile de produse, garanții și daune reputației. Un sistem de Inteligența artificială a vizualizării bine concepută poate reduce rata defectelor nedetectate cu 90% în comparație la inspecția umană, cu viteze de inspecție de 10-100 de ori mai mari.
În acest studiu de caz vom construi un sistem complet de detectare a anomaliilor de la zero pt o linie de producție a plăcilor electronice (PCB), care acoperă întreaga conductă: achiziție de imagini, preprocesare, arhitectură model, instruire privind datele dezechilibrate, implementare pe margine (Jetson Orin) și monitorizare în producție.
Ce vei învăța
- Abordări ale detectării anomaliilor: supravegheat, semisupravegheat, nesupravegheat
- Setul de date de detectare a anomaliilor MVTec: standardul de referință în industrie
- PatchCore: algoritmul de ultimă generație pentru detectarea nesupravegheată a anomaliilor
- Gestionarea dezechilibrului de clasă în seturile de date reale de defect
- Augmentarea datelor specifice imaginilor industriale
- Valori industriale: AUROC, AUPRO, rata fals negative pe linie
- Implementare pe Jetson Orin cu TensorRT pentru inspecție în timp real
- Sistem de alertare si logare pentru Controlul Calitatii
- Monitorizarea deplasării modelului în medii reale de producție
1. Problema: linia de inspecție PCB
Scenariul nostru: o linie de producție PCB (Placă de circuit imprimat) cu a cadența de 120 carduri/minut. Fiecare card trebuie inspectat defecte precum: componente lipsă, scurtcircuite, lipire defectuoasă, componente mișcă, piste întrerupte. Inspecția umană este lentă (20 de cărți/minut) și subiect cu dificultate - după 4 ore, rata fals negative umane crește la 15%.
Specificații de sistem
| Parametru | Cerinţă | Atins |
|---|---|---|
| Debit | ≥ 120 carduri/min | 140 carduri/min |
| Rata fals negative | < 0,5% (maxim 1 defect din 200 nu a fost detectat) | 0,3% |
| Rata fals pozitivă | < 2% (carti bune respinse) | 1,4% |
| Latența pe card | < 500 ms | 380 ms |
| Hardware țintă | Jetson Orin Nano 8GB | Jetson Orin Nano 8GB |
1.1 Set de date: MVTec AD
Il Set de date de detectare a anomaliilor MVTec este standardul de referință în industrie pentru detectarea anomaliilor vizuale. Conține 15 categorii (texturi și obiecte), ~5000 de imagini normal pentru antrenament și imagini cu defecte adnotate pixeli pentru testare. Îl folosim ca baza pentru prototip înainte de a colecta date reale de pe linie.
2. Abordări ale detectării anomaliilor
Compararea abordărilor
| Abordare | Date solicitate | AUROC (MVTec) | Pro | Împotriva |
|---|---|---|---|---|
| Supravegheat | Multe exemple defecte de fiecare tip | ~99% | Precizie maximă | Colectarea de date costisitoare; noi defecte nu au fost detectate |
| PatchCore | Doar imagini normale | 99,1% | Fără exemple defecte; se generalizează la noi defecte | Banc mare de memorie; mai lent decât supravegheat |
| Autoencoder/VAE | Doar imagini normale | ~85% | Simplu de implementat | Adesea reconstruiește bine defectele |
| Student-Profesor | Doar imagini normale | ~96% | Rapid în inferență | Mai complex de tractat |
Alegerea: PatchCore. Pentru scenariul nostru industrial, abordarea PatchCore câștigă pentru că: (1) nu necesită exemple de defecte în antrenament - practic imposibil de colectat in cantitati suficiente pentru fiecare tip de defect; (2) ajunge 99,1% AUROC pe MVTec, cel mai bun rezultat nesupravegheat; (3) generalizează automat la noi defecte nemaivăzute.
3. PatchCore: Implementare
Ideea lui PatchCore este elegantă: folosește o coloană vertebrală pre-antrenată (WideResNet-50) pentru mine patch caracteristici din imaginile normale și build-uri a bănci de memorie de caracteristici nominală. Prin deducere, se compară caracteristicile patch-urilor unei imagini noi banca de memorie: distanta mare = anomalie.
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
import numpy as np
import cv2
from pathlib import Path
from sklearn.random_projection import SparseRandomProjection
from sklearn.metrics import roc_auc_score
from typing import Optional
import faiss # pip install faiss-cpu o faiss-gpu
class PatchCoreModel:
"""
PatchCore: Towards Total Recall in Industrial Anomaly Detection
(Roth et al., 2022) - CVPR 2022 Best Paper
Principio:
1. Estrae patch features con backbone pre-trained (WideResNet-50)
2. Costruisce memory bank con le feature di tutte le immagini normali
3. Applica coreset subsampling per ridurre la memoria (greedy k-center)
4. In inference: anomaly score = nearest neighbor distance dalla memory bank
"""
def __init__(self, backbone: str = 'wide_resnet50_2',
layers: list[str] = None,
device: str = 'cuda',
embedding_dim: int = 1024,
n_neighbors: int = 9):
self.device = torch.device(device)
self.layers = layers or ['layer2', 'layer3']
self.n_neighbors = n_neighbors
# Backbone pre-trained su ImageNet (feature extractor, no fine-tuning)
backbone_model = getattr(models, backbone)(
weights='IMAGENET1K_V1'
)
# Rimuovi i layer finali (non servono per l'estrazione di feature)
self.feature_extractor = self._build_extractor(backbone_model)
self.feature_extractor.to(self.device).eval()
# Random projection per ridurre dimensionalità (da 1024 a 384)
self.projector = SparseRandomProjection(
n_components=embedding_dim // 2,
eps=0.1
)
# FAISS index per nearest neighbor search veloce
self.memory_bank: Optional[np.ndarray] = None
self.faiss_index: Optional[faiss.IndexFlatL2] = None
# Preprocessing standard ImageNet
self.transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def _build_extractor(self, backbone: nn.Module) -> nn.Module:
"""Crea feature extractor con hook sugli intermediate layer."""
class FeatureExtractor(nn.Module):
def __init__(self, model, target_layers):
super().__init__()
self.model = model
self.target_layers = target_layers
self.features = {}
# Registra forward hook
for name, module in model.named_modules():
if name in target_layers:
module.register_forward_hook(
self._make_hook(name)
)
def _make_hook(self, name):
def hook(module, input, output):
self.features[name] = output
return hook
def forward(self, x):
self.features.clear()
self.model(x)
return self.features.copy()
return FeatureExtractor(backbone, self.layers)
def fit(self, train_loader: DataLoader,
coreset_ratio: float = 0.1) -> None:
"""
Costruisce la memory bank dalle immagini normali di training.
coreset_ratio: percentuale di patch da mantenere (0.1 = 10%)
Riduce la memory bank con greedy coreset subsampling.
"""
all_features = []
print("Estraendo feature patch dal training set...")
with torch.no_grad():
for batch_idx, (images, _) in enumerate(train_loader):
images = images.to(self.device)
features_dict = self.feature_extractor(images)
# Interpola e concatena feature da più layer
patch_features = self._aggregate_features(features_dict)
all_features.append(patch_features.cpu().numpy())
if batch_idx % 10 == 0:
print(f" Batch {batch_idx}/{len(train_loader)}")
# Stack tutte le patch features: (N_patches, D)
memory_bank = np.vstack(all_features)
print(f"Memory bank iniziale: {memory_bank.shape}")
# Random projection per ridurre dimensionalità
memory_bank = self.projector.fit_transform(memory_bank)
# Coreset subsampling: mantieni solo coreset_ratio% delle patch
n_coreset = max(1, int(len(memory_bank) * coreset_ratio))
memory_bank = self._greedy_coreset(memory_bank, n_coreset)
self.memory_bank = memory_bank.astype(np.float32)
print(f"Memory bank finale: {self.memory_bank.shape}")
# Costruisci indice FAISS per nearest neighbor veloce
dim = self.memory_bank.shape[1]
self.faiss_index = faiss.IndexFlatL2(dim)
self.faiss_index.add(self.memory_bank)
print("Memory bank pronta per inference")
def _aggregate_features(self, features_dict: dict) -> torch.Tensor:
"""
Interpola e concatena feature da diversi layer del backbone.
I feature map di layer2 (H/8) e layer3 (H/16) vengono
upsamplati alla stessa risoluzione e concatenati.
"""
feature_maps = list(features_dict.values())
# Target: risoluzione del primo layer (più alta)
target_size = feature_maps[0].shape[-2:]
aligned = []
for fm in feature_maps:
if fm.shape[-2:] != target_size:
fm = nn.functional.interpolate(
fm, size=target_size, mode='bilinear',
align_corners=False
)
aligned.append(fm)
# Concatena lungo dim channels: (B, C1+C2, H, W)
combined = torch.cat(aligned, dim=1)
# Reshape in patch tokens: (B*H*W, C1+C2)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def _greedy_coreset(self, data: np.ndarray, n: int) -> np.ndarray:
"""
Greedy k-center coreset subsampling.
Seleziona n punti che massimizzano la copertura dello spazio.
"""
if n >= len(data):
return data
selected = [np.random.randint(0, len(data))]
min_distances = np.full(len(data), np.inf)
for _ in range(n - 1):
last = data[selected[-1]]
dists = np.linalg.norm(data - last, axis=1)
min_distances = np.minimum(min_distances, dists)
selected.append(int(np.argmax(min_distances)))
return data[selected]
def score_image(self, img_bgr: np.ndarray,
img_size: int = 224) -> tuple[float, np.ndarray]:
"""
Calcola l'anomaly score di un'immagine.
Returns:
image_score: score scalare (soglia tipica: 0.5-0.8)
anomaly_map: mappa 2D dell'anomalia per localizzazione
"""
# Preprocessing
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (img_size, img_size))
tensor = self.transform(img_resized).unsqueeze(0).to(self.device)
with torch.no_grad():
features_dict = self.feature_extractor(tensor)
patch_features = self._aggregate_features(features_dict)
patch_features_proj = self.projector.transform(
patch_features.cpu().numpy().astype(np.float32)
)
# Nearest neighbor distance per ogni patch
distances, _ = self.faiss_index.search(
patch_features_proj, self.n_neighbors
)
# Score patch = media delle n_neighbors distanze
patch_scores = distances.mean(axis=1)
# Ricostruisci anomaly map (H, W)
n_patches_side = int(np.sqrt(len(patch_scores)))
anomaly_map = patch_scores.reshape(
n_patches_side, n_patches_side
)
# Upscale alla dimensione originale
anomaly_map_full = cv2.resize(
anomaly_map.astype(np.float32),
(img_bgr.shape[1], img_bgr.shape[0]),
interpolation=cv2.INTER_LINEAR
)
# Score immagine = percentile 99 degli score patch
image_score = float(np.percentile(patch_scores, 99))
return image_score, anomaly_map_full
4. Conducta de date și preprocesare industrială
import albumentations as A
import torch
from torch.utils.data import Dataset
import cv2
import numpy as np
from pathlib import Path
class PCBInspectionDataset(Dataset):
"""
Dataset per ispezione PCB.
Struttura cartelle attesa:
root/
train/good/ <- immagini normali (training)
test/good/ <- immagini normali (test)
test/defect_type_1/ <- immagini difettose per valutazione
test/defect_type_2/
ground_truth/ <- mask binarie delle anomalie (test)
"""
# Augmentation per dati NORMALI di training
# Obiettivo: aumentare la varieta senza introdurre pattern simili a difetti
NORMAL_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.RandomRotate90(p=0.5),
# Variazioni di illuminazione (simula diverse condizioni di luce)
A.RandomBrightnessContrast(
brightness_limit=0.1,
contrast_limit=0.1,
p=0.3
),
# NON aggiungere: blur, noise, elastic -> simulano difetti!
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Transform per test/inference (NO augmentation)
TEST_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def __init__(self, root: str, split: str = 'train',
augment: bool = True):
self.root = Path(root)
self.split = split
self.transform = (self.NORMAL_TRANSFORM if augment and split == 'train'
else self.TEST_TRANSFORM)
self.samples = [] # list di (img_path, label, mask_path_or_None)
self._load_samples()
def _load_samples(self) -> None:
if self.split == 'train':
# Solo immagini normali per training
normal_dir = self.root / 'train' / 'good'
for img_path in sorted(normal_dir.glob('*.png')):
self.samples.append((img_path, 0, None))
else:
# Test: normali + difettosi con ground truth mask
test_dir = self.root / 'test'
gt_dir = self.root / 'ground_truth'
for class_dir in sorted(test_dir.iterdir()):
label = 0 if class_dir.name == 'good' else 1
for img_path in sorted(class_dir.glob('*.png')):
mask_path = None
if label == 1:
# Path della ground truth mask
mask_path = (gt_dir / class_dir.name /
img_path.name)
self.samples.append((img_path, label, mask_path))
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> tuple:
img_path, label, mask_path = self.samples[idx]
# Carica immagine
img = cv2.imread(str(img_path))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Applica transform
transformed = self.transform(image=img)
img_tensor = torch.from_numpy(
transformed['image'].transpose(2, 0, 1) # HWC -> CHW
).float()
# Carica mask (se disponibile)
mask = np.zeros((224, 224), dtype=np.float32)
if mask_path and mask_path.exists():
m = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
m = cv2.resize(m, (224, 224)) / 255.0
mask = m.astype(np.float32)
return img_tensor, label, torch.from_numpy(mask)
class PCBInspectionPipeline:
"""Pipeline di ispezione real-time per la linea PCB."""
def __init__(self, model: PatchCoreModel,
threshold: float = 0.65,
alert_callback=None):
self.model = model
self.threshold = threshold
self.alert_callback = alert_callback
self.stats = {'total': 0, 'ok': 0, 'defect': 0}
def inspect(self, img_bgr: np.ndarray,
board_id: str = '') -> dict:
"""
Ispeziona una scheda PCB.
Returns: risultato con score, decision, anomaly_map
"""
score, anomaly_map = self.model.score_image(img_bgr)
is_defect = score >= self.threshold
self.stats['total'] += 1
if is_defect:
self.stats['defect'] += 1
if self.alert_callback:
self.alert_callback(board_id, score, anomaly_map)
else:
self.stats['ok'] += 1
return {
'board_id': board_id,
'score': float(score),
'is_defect': bool(is_defect),
'anomaly_map': anomaly_map,
'defect_rate': self.stats['defect'] / max(1, self.stats['total'])
}
5. Evaluare: Metrici industriale
În context industrial, valorile standard ML (acuratețe, F1) sunt insuficiente. Un fals negativ (defect nedetectat) costă enorm mai mult decât a Fals pozitiv (cartea bună a fost aruncată). Valorile relevante sunt:
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from typing import NamedTuple
class AnomalyEvaluationMetrics(NamedTuple):
auroc: float # Area Under ROC - detection-level
aupro: float # Area Under Per-Region Overlap - pixel-level
threshold: float # soglia ottimale per ROC
fnr: float # False Negative Rate alla soglia
fpr: float # False Positive Rate alla soglia
def evaluate_anomaly_detection(
model: PatchCoreModel,
test_loader,
target_fnr: float = 0.005 # max 0.5% di difetti non rilevati
) -> AnomalyEvaluationMetrics:
"""
Valutazione completa su test set con metriche industriali.
target_fnr: FNR target (vincolo di business, es. 0.5%)
La soglia viene selezionata per rispettare questo vincolo.
"""
all_scores = []
all_labels = []
all_masks = []
all_anomaly_maps = []
for images, labels, masks in test_loader:
for i in range(len(images)):
img_np = images[i].numpy().transpose(1, 2, 0)
# Denormalizza
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_denorm = ((img_np * std + mean) * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_denorm, cv2.COLOR_RGB2BGR)
score, anomaly_map = model.score_image(img_bgr)
all_scores.append(score)
all_labels.append(int(labels[i]))
all_masks.append(masks[i].numpy())
all_anomaly_maps.append(anomaly_map)
scores_arr = np.array(all_scores)
labels_arr = np.array(all_labels)
# Image-level AUROC
auroc = roc_auc_score(labels_arr, scores_arr)
# Pixel-level AUPRO (Area Under Per-Region Overlap)
aupro = compute_aupro(all_anomaly_maps, all_masks)
# Trova soglia che rispetta il vincolo FNR
fpr_arr, tpr_arr, thresholds = roc_curve(labels_arr, scores_arr)
fnr_arr = 1 - tpr_arr
# Seleziona soglia con FNR <= target_fnr
valid_idx = fnr_arr <= target_fnr
if valid_idx.any():
# Tra le soglie valide, scegli quella con FPR minima
valid_fprs = fpr_arr[valid_idx]
valid_thresholds = thresholds[valid_idx]
best_idx = np.argmin(valid_fprs)
optimal_threshold = float(valid_thresholds[best_idx])
optimal_fnr = float(fnr_arr[valid_idx][best_idx])
optimal_fpr = float(valid_fprs[best_idx])
else:
# Fallback: EER
eer_idx = np.argmin(np.abs(fpr_arr - fnr_arr))
optimal_threshold = float(thresholds[eer_idx])
optimal_fnr = float(fnr_arr[eer_idx])
optimal_fpr = float(fpr_arr[eer_idx])
print(f"=== Risultati Anomaly Detection ===")
print(f"AUROC (image-level): {auroc:.4f} ({auroc*100:.2f}%)")
print(f"AUPRO (pixel-level): {aupro:.4f} ({aupro*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FNR @ soglia: {optimal_fnr*100:.2f}%")
print(f"FPR @ soglia: {optimal_fpr*100:.2f}%")
return AnomalyEvaluationMetrics(
auroc=auroc,
aupro=aupro,
threshold=optimal_threshold,
fnr=optimal_fnr,
fpr=optimal_fpr
)
def compute_aupro(anomaly_maps: list, gt_masks: list,
max_fpr: float = 0.3) -> float:
"""
Area Under Per-Region Overlap (AUPRO).
Metrica robusta per la localizzazione di anomalie di dimensioni variabili.
A differenza del pixel-level AUROC, non penalizza i difetti piccoli.
"""
all_fprs, all_pros = [], []
for am, mask in zip(anomaly_maps, gt_masks):
am_flat = am.flatten()
mask_flat = (mask > 0.5).astype(float).flatten()
if mask_flat.sum() == 0:
continue
fpr_vals, tpr_vals, _ = roc_curve(mask_flat, am_flat)
all_fprs.extend(fpr_vals.tolist())
all_pros.extend(tpr_vals.tolist())
if not all_fprs:
return 0.0
# Ordina e integra fino a max_fpr
sorted_pairs = sorted(zip(all_fprs, all_pros))
fprs_sorted, pros_sorted = zip(*sorted_pairs)
# Integra con trapezoid rule entro max_fpr
aupro = 0.0
prev_fpr, prev_pro = 0.0, 0.0
for fpr_val, pro_val in zip(fprs_sorted, pros_sorted):
if fpr_val > max_fpr:
break
aupro += (fpr_val - prev_fpr) * (pro_val + prev_pro) / 2.0
prev_fpr, prev_pro = fpr_val, pro_val
return aupro / max_fpr if max_fpr > 0 else 0.0
6. Implementare pe Jetson Orin: Sistem complet
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import numpy as np
import cv2
import pickle
import time
from datetime import datetime
from typing import Optional
import asyncio
# FastAPI per REST API dell'inspection system
# pip install fastapi uvicorn python-multipart
app = FastAPI(title="PCB Inspection API", version="1.0.0")
# Stato globale del sistema
class InspectionSystem:
model: Optional[PatchCoreModel] = None
threshold: float = 0.65
stats = {
'total_inspected': 0,
'defects_found': 0,
'start_time': None,
'recent_scores': [] # ultimi 100 score per monitoring drift
}
system = InspectionSystem()
@app.on_event("startup")
async def load_model():
"""Carica il modello PatchCore all'avvio del server."""
print("Caricamento modello PatchCore...")
system.model = PatchCoreModel(device='cuda')
# Carica memory bank pre-calcolata
with open('memory_bank.pkl', 'rb') as f:
mb_data = pickle.load(f)
system.model.memory_bank = mb_data['memory_bank']
system.model.projector = mb_data['projector']
import faiss
dim = system.model.memory_bank.shape[1]
system.model.faiss_index = faiss.IndexFlatL2(dim)
system.model.faiss_index.add(system.model.memory_bank)
system.stats['start_time'] = datetime.now()
print("Sistema pronto")
@app.post("/inspect")
async def inspect_board(
file: UploadFile = File(...),
board_id: str = ""
):
"""
Endpoint principale per l'ispezione di una scheda PCB.
Accetta immagine JPEG/PNG e restituisce score e decisione.
"""
if system.model is None:
return JSONResponse(
status_code=503,
content={"error": "Model not loaded"}
)
# Leggi e decodifica immagine
img_bytes = await file.read()
nparr = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return JSONResponse(
status_code=400,
content={"error": "Invalid image"}
)
# Inference
t_start = time.perf_counter()
score, anomaly_map = system.model.score_image(img)
latency_ms = (time.perf_counter() - t_start) * 1000
is_defect = score >= system.threshold
# Aggiorna statistiche
system.stats['total_inspected'] += 1
if is_defect:
system.stats['defects_found'] += 1
system.stats['recent_scores'].append(float(score))
if len(system.stats['recent_scores']) > 100:
system.stats['recent_scores'].pop(0)
return {
"board_id": board_id or f"board_{system.stats['total_inspected']}",
"timestamp": datetime.now().isoformat(),
"score": round(float(score), 4),
"is_defect": bool(is_defect),
"latency_ms": round(latency_ms, 1),
"defect_rate_recent": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
)
}
@app.get("/health")
async def health_check():
"""Monitoraggio drift: score medio recente vs baseline."""
recent = system.stats['recent_scores']
return {
"status": "ok",
"model_loaded": system.model is not None,
"total_inspected": system.stats['total_inspected'],
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
),
"recent_avg_score": float(np.mean(recent)) if recent else 0.0,
"uptime_seconds": (
(datetime.now() - system.stats['start_time']).total_seconds()
if system.stats['start_time'] else 0
)
}
# Avvio: uvicorn inspection_server:app --host 0.0.0.0 --port 8000
7. Model Drift: Detectare și actualizare în producție
Modelul în producție nu este static. Noi modele de PCB, modificări în proces de sudare, uzura luminilor - totul contribuie la deriva de model: o degradare treptată a performanței nu întotdeauna vizibilă în metrici clasificare până când devine critică. Monitorizarea proactivă este esențială.
import numpy as np
import json
import time
from pathlib import Path
from collections import deque
from dataclasses import dataclass, field
from scipy import stats as scipy_stats
import warnings
@dataclass
class DriftReport:
"""Report dello stato del drift del modello."""
timestamp: float = field(default_factory=time.time)
window_size: int = 0
current_mean_score: float = 0.0
baseline_mean_score: float = 0.0
score_drift: float = 0.0 # differenza dalla baseline
ks_statistic: float = 0.0 # Kolmogorov-Smirnov stat
ks_p_value: float = 1.0 # p-value KS test
is_drifting: bool = False
drift_level: str = 'none' # 'none', 'warning', 'critical'
action_required: str = ''
class ModelDriftMonitor:
"""
Monitora il drift del modello PatchCore in produzione.
Basato su:
1. Score distribution monitoring (media rolling dei "good" score)
2. Kolmogorov-Smirnov test tra distribuzione corrente e baseline
3. Alert adattivi con 3 livelli: OK / WARNING / CRITICAL
Strategia di aggiornamento:
- WARNING: aumenta frequenza di sampling manuale (ogni 100 schede invece di 1000)
- CRITICAL: blocca linea per re-calibrazione del modello
"""
def __init__(self,
baseline_scores_path: str,
window_size: int = 500,
warning_threshold: float = 0.05, # +5% drift
critical_threshold: float = 0.10, # +10% drift
ks_alpha: float = 0.01): # p-value per KS test
"""
baseline_scores_path: JSON con score di "good" boards baseline
window_size: numero di score da mantenere nella finestra rolling
"""
self.window_size = window_size
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.ks_alpha = ks_alpha
# Carica baseline
with open(baseline_scores_path) as f:
data = json.load(f)
self.baseline_scores = np.array(data['good_scores'])
self.baseline_mean = float(np.mean(self.baseline_scores))
self.baseline_std = float(np.std(self.baseline_scores))
print(f"Baseline caricata: {len(self.baseline_scores)} score")
print(f" Mean: {self.baseline_mean:.4f} ± {self.baseline_std:.4f}")
# Finestra rolling per i "good" score correnti
self.current_scores: deque = deque(maxlen=window_size)
self.report_history: list[DriftReport] = []
def record_score(self, anomaly_score: float,
is_good: bool) -> None:
"""
Registra lo score di una scheda.
is_good: True se la scheda è stata classificata come OK
(o confermata come buona dall'operatore)
"""
if is_good:
self.current_scores.append(anomaly_score)
def check_drift(self) -> DriftReport:
"""
Esegue il controllo drift sulla finestra corrente.
Returns: DriftReport con stato e azione consigliata.
"""
report = DriftReport(window_size=len(self.current_scores))
if len(self.current_scores) < 50:
# Non abbastanza dati per un test significativo
report.action_required = 'collecting_data'
return report
current = np.array(self.current_scores)
report.current_mean_score = float(np.mean(current))
report.baseline_mean_score = self.baseline_mean
# 1. Score drift (differenza dalla baseline in deviazioni standard)
report.score_drift = abs(
report.current_mean_score - self.baseline_mean
) / (self.baseline_std + 1e-10)
# 2. Kolmogorov-Smirnov test
# Confronta la distribuzione corrente con la baseline
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ks_stat, ks_pval = scipy_stats.ks_2samp(
self.baseline_scores, current
)
report.ks_statistic = float(ks_stat)
report.ks_p_value = float(ks_pval)
# 3. Classificazione del drift
distribution_changed = ks_pval < self.ks_alpha
mean_drifted_critical = report.score_drift > self.critical_threshold * 10
mean_drifted_warning = report.score_drift > self.warning_threshold * 10
if mean_drifted_critical and distribution_changed:
report.is_drifting = True
report.drift_level = 'critical'
report.action_required = 'STOP_LINE: re-calibrate model immediately'
elif mean_drifted_warning or distribution_changed:
report.is_drifting = True
report.drift_level = 'warning'
report.action_required = 'increase_sampling: inspect every 100 boards'
else:
report.drift_level = 'none'
report.action_required = 'continue_normal_operation'
self.report_history.append(report)
return report
def suggest_retraining(self, current_auroc: float,
target_auroc: float = 0.99) -> dict:
"""
Suggerisce una strategia di retraining basata sulle metriche correnti.
"""
auroc_drop = target_auroc - current_auroc
latest_report = self.report_history[-1] if self.report_history else None
strategy = {
'retrain_needed': current_auroc < target_auroc - 0.005,
'current_auroc': current_auroc,
'target_auroc': target_auroc,
'auroc_gap': auroc_drop,
}
if current_auroc >= target_auroc:
strategy['action'] = 'no_action_needed'
elif auroc_drop < 0.02:
# Piccola degradazione: aggiorna solo memory bank con nuovi campioni buoni
strategy['action'] = 'update_memory_bank'
strategy['description'] = (
'Aggiungi 200-500 nuove immagini buone alla memory bank. '
'Non richiede re-training del backbone.'
)
elif auroc_drop < 0.05:
# Degradazione media: fine-tune con nuovi campioni
strategy['action'] = 'incremental_finetune'
strategy['description'] = (
'Fine-tune backbone con mix di dati vecchi (70%) e nuovi (30%). '
'Learning rate basso: 1e-5. 10-20 epoch.'
)
else:
# Degradazione severa: retrain completo
strategy['action'] = 'full_retrain'
strategy['description'] = (
'Rebuild completo della memory bank con dati correnti. '
'Considera re-annotazione se sono cambiati i tipi di difetti.'
)
return strategy
# ---- Threshold adattivo basato su SLA di produzione ----
def compute_adaptive_threshold(model_scores_good: np.ndarray,
model_scores_defective: np.ndarray,
max_fnr: float = 0.003, # SLA: max 0.3% FNR
max_fpr: float = 0.02) -> dict:
"""
Calcola la soglia ottimale per rispettare gli SLA di produzione.
Priorità: FNR (difetti sfuggiti) è il vincolo critico.
FPR (falsi allarmi) è secondario.
Returns: {threshold, fnr, fpr, tradeoff_note}
"""
# Genera candidati threshold
all_scores = np.concatenate([model_scores_good, model_scores_defective])
thresholds = np.percentile(all_scores, np.arange(1, 100, 0.5))
best_threshold = None
best_fpr = float('inf')
for t in thresholds:
# Predizioni
tp = np.sum(model_scores_defective > t)
fn = np.sum(model_scores_defective <= t)
fp = np.sum(model_scores_good > t)
tn = np.sum(model_scores_good <= t)
fnr = fn / (fn + tp + 1e-10)
fpr = fp / (fp + tn + 1e-10)
# Rispetta il vincolo FNR e minimizza FPR
if fnr <= max_fnr and fpr < best_fpr:
best_fpr = fpr
best_threshold = t
if best_threshold is None:
# Fallback: usa EER se non riesce a rispettare FNR SLA
import warnings
warnings.warn("Impossibile rispettare FNR SLA. Usando EER come fallback.")
# ... (implementazione EER omessa per brevita)
best_threshold = np.median(all_scores)
best_fpr = 0.05
# Metriche finali alla soglia scelta
tp = np.sum(model_scores_defective > best_threshold)
fn = np.sum(model_scores_defective <= best_threshold)
fp = np.sum(model_scores_good > best_threshold)
tn = np.sum(model_scores_good <= best_threshold)
fnr_final = fn / (fn + tp + 1e-10)
fpr_final = fp / (fp + tn + 1e-10)
return {
'threshold': float(best_threshold),
'fnr': float(fnr_final),
'fpr': float(fpr_final),
'precision': float(tp / (tp + fp + 1e-10)),
'recall': float(tp / (tp + fn + 1e-10)),
'meets_fnr_sla': fnr_final <= max_fnr,
'meets_fpr_target': fpr_final <= max_fpr,
}
8. Rezultate și lecții învățate
Rezultate la proiectul real (după 3 luni de producție)
| Metric | Inspecția umană | Sistem Vision AI | Îmbunătăţire |
|---|---|---|---|
| Debit | 20 de carduri/min | 140 carduri/min | 7x |
| Rata fals negative | 2-15% (variază în funcție de efort) | 0,3% | ~10x |
| Rata fals pozitivă | 0,5% | 1,4% | -2,8x (agravat) |
| Costul inspecției/fișei | 0,12 € | 0,02 € | Reducere de 6x |
| Defecte ratate pe teren | 23 daune/luna | 3 revendicări/lună | 87% reducere |
Cele 5 lecții cheie ale proiectului
- Rata fals pozitivă este negociabilă, rata fals negativă nu este: clientul acceptă să mai arunce câteva cărți bune, dar nu tolerează cărțile defecte în teren. Proiectați întotdeauna pragul pentru a îndeplini mai întâi constrângerea FNR.
- Iluminatul este componenta cea mai critică: 60% din falsele pozitive inițiale s-au datorat variațiilor de iluminare, nu defectelor reale. Investește într-un sistem de iluminat controlat (stroboscop cu LED, dom difuzor) înainte de a te gândi măcar la model.
- PatchCore se degradează lent, dar se degradează: după 3 luni, AUROC a scăzut de la 99,1% la 97,8% pentru noile modele de PCB care nu se aflau în banca de memorie. Planificați o strategie de actualizare incrementală a băncii de memorie.
- Localizarea este esențială pentru operator: decizia binară „OK/NOK” nu este suficientă. Harta anomaliilor care arată UNDE este localizat defectul reduce timpul de analiză al operatorului pentru verificarea manuală a fals pozitive cu 70%.
- Monitorizați distribuția scorului, nu doar acuratețea: deriva de model este văzută mai întâi în distribuția scorurilor decât în metricile de clasificare. Alertă dacă media scorurilor „bune” crește peste valoarea de referință istorică.
Concluziile seriei
Acest studiu de caz încheie seria noastră despre Computer Vision cu Deep Learning. Avem a acoperit întreaga traiectorie: de la CNN-urile fundamentale la transferul de învățare, de la detectare obiecte cu YOLO26 la segmentare, de la augmentare la conducte de producție cu OpenCV, de la implementarea marginilor la recunoașterea feței, până la acest caz real de utilizare industrială.
Viziunea computerizată este o disciplină practică: cele mai bune rezultate vin de la cei care înțeleg atât teorie (arhitecturi, funcții de pierdere, metrici), cât și inginerie de sistem (preprocesare, implementare, monitorizare). Domeniul evoluează rapid - YOLO26 a fost lansat în ianuarie 2026, SAM2 a revoluționat segmentarea interactivă – dar i principiile fundamentale rămân stabile.
Navigare în serie
- Anterior: Detectarea și recunoașterea feței: tehnici moderne
- Începutul seriei: CNN: Rețele convoluționale de la zero la producție
Resurse între serii
- MLOps: Monitorizare și detecție a devierii modelului - monitorizare avansata in productie
- Învățare profundă avansată: cuantizare și compresie - optimizarea modelului
- Inginerie AI: RAG și căutare vectorială - integrare cu sisteme AI







