Studium przypadku: Wykrywanie anomalii przemysłowych za pomocą wizji komputerowej
Jednym z przykładów zastosowania jest zautomatyzowana inspekcja wizualna w produkcji przemysłowej widzenia komputerowego o większym wpływie ekonomicznym: niewykryte defekty kosztują miliardy euro rocznie z tytułu wycofania produktów, gwarancji i utraty reputacji. System Dobrze zaprojektowana sztuczna inteligencja wizyjna może zmniejszyć liczbę niewykrytych defektów o 90% w porównaniu do kontroli przez człowieka, z 10-100 razy większą szybkością inspekcji.
W tym studium przypadku zbudujemy od podstaw kompletny system wykrywania anomalii jedna linia do produkcji płytek elektronicznych (PCB), obejmująca cały rurociąg: akwizycja obrazu, wstępna obróbka, architektura modelu, szkolenie na danych niezrównoważonych, wdrożenie na brzegu (Jetson Orin) i monitorowanie na produkcji.
Czego się nauczysz
- Podejścia do wykrywania anomalii: nadzorowane, częściowo nadzorowane, bez nadzoru
- Zestaw danych wykrywania anomalii MVTec: standardowy punkt odniesienia w branży
- PatchCore: najnowocześniejszy algorytm do wykrywania anomalii bez nadzoru
- Zarządzanie brakiem równowagi klas w zbiorach danych o rzeczywistych defektach
- Zwiększanie danych specyficzne dla obrazów przemysłowych
- Wskaźniki przemysłowe: AUROC, AUPRO, współczynnik wyników fałszywie ujemnych na linię
- Wdrożenie na Jetson Orin z TensorRT do kontroli w czasie rzeczywistym
- System alarmowania i logowania dla Kontroli Jakości
- Monitorowanie dryftu modelu w rzeczywistych środowiskach produkcyjnych
1. Problem: Linia inspekcji PCB
Nasz scenariusz: linia do produkcji PCB (płytek drukowanych) z kadencja 120 kart/minutę. Każda karta musi zostać sprawdzona wady takie jak: brakujące elementy, zwarcia, wadliwe lutowanie, podzespoły ruszać, ścieżki przerwane. Kontrola przez ludzi jest powolna (20 kart/minutę) i podlega z trudem - po 4 godzinach odsetek fałszywie ujemnych wyników u ludzi wzrasta do 15%.
Specyfikacje systemu
| Parametr | Wymóg | Osiągnięty |
|---|---|---|
| Przepustowość | ≥ 120 kart/min | 140 kart/min |
| Fałszywie ujemna stawka | < 0,5% (maks. 1 defekt na 200 niewykrytych) | 0,3% |
| Fałszywie dodatni współczynnik | < 2% (odrzucone dobre karty) | 1,4% |
| Opóźnienie na kartę | < 500 ms | 380 ms |
| Docelowy sprzęt | Jetson Orin Nano 8 GB | Jetson Orin Nano 8 GB |
1.1 Zbiór danych: MVTec AD
Il Zestaw danych dotyczących wykrywania anomalii MVTec jest standardowym punktem odniesienia w branży do wizualnego wykrywania anomalii. Zawiera 15 kategorii (tekstury i obiekty), ~ 5000 obrazów normalne do celów szkoleniowych, a obrazy z defektami z adnotacjami w pikselach do testowania. Używamy go jako podstawę prototypu przed pobraniem rzeczywistych danych z linii.
2. Podejścia do wykrywania anomalii
Porównanie podejść
| Zbliżać się | Żądano danych | AUROC (MVTec) | Zawodowiec | Przeciwko |
|---|---|---|---|---|
| Nadzorowany | Wiele wadliwych egzemplarzy każdego typu | ~99% | Maksymalna dokładność | Drogie gromadzenie danych; nie wykryto nowych usterek |
| PatchCore | Tylko normalne obrazy | 99,1% | Żadnych wadliwych przykładów; uogólnia na nowe wady | Duży bank pamięci; wolniej niż nadzorowane |
| Autoenkoder/VAE | Tylko normalne obrazy | ~85% | Proste w wykonaniu | Często dobrze rekonstruuje także ubytki |
| Uczeń-Nauczyciel | Tylko normalne obrazy | ~96% | Szybki w wyciąganiu wniosków | Holowanie jest bardziej skomplikowane |
Wybór: PatchCore. W naszym scenariuszu przemysłowym jest to podejście PatchCore wygrywa, ponieważ: (1) nie wymaga przykładów błędów w szkoleniu - praktycznie niemożliwe jest zebranie wystarczającej ilości dla każdego rodzaju wady; (2) osiąga 99,1% AUROC na MVTec, najlepszy wynik bez nadzoru; (3) generalizuje automatycznie do nowych, nigdy nie widzianych wad.
3. PatchCore: Implementacja
Pomysł PatchCore jest elegancki: wykorzystuje wstępnie wytrenowany szkielet (WideResNet-50) do wydobywania łata funkcje z normalnych obrazów i buduje plik a banki pamięci funkcji nominalny. Podsumowując, porównuje się cechy poprawki nowego obrazu bank pamięci: duża odległość = anomalia.
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
import numpy as np
import cv2
from pathlib import Path
from sklearn.random_projection import SparseRandomProjection
from sklearn.metrics import roc_auc_score
from typing import Optional
import faiss # pip install faiss-cpu o faiss-gpu
class PatchCoreModel:
"""
PatchCore: Towards Total Recall in Industrial Anomaly Detection
(Roth et al., 2022) - CVPR 2022 Best Paper
Principio:
1. Estrae patch features con backbone pre-trained (WideResNet-50)
2. Costruisce memory bank con le feature di tutte le immagini normali
3. Applica coreset subsampling per ridurre la memoria (greedy k-center)
4. In inference: anomaly score = nearest neighbor distance dalla memory bank
"""
def __init__(self, backbone: str = 'wide_resnet50_2',
layers: list[str] = None,
device: str = 'cuda',
embedding_dim: int = 1024,
n_neighbors: int = 9):
self.device = torch.device(device)
self.layers = layers or ['layer2', 'layer3']
self.n_neighbors = n_neighbors
# Backbone pre-trained su ImageNet (feature extractor, no fine-tuning)
backbone_model = getattr(models, backbone)(
weights='IMAGENET1K_V1'
)
# Rimuovi i layer finali (non servono per l'estrazione di feature)
self.feature_extractor = self._build_extractor(backbone_model)
self.feature_extractor.to(self.device).eval()
# Random projection per ridurre dimensionalità (da 1024 a 384)
self.projector = SparseRandomProjection(
n_components=embedding_dim // 2,
eps=0.1
)
# FAISS index per nearest neighbor search veloce
self.memory_bank: Optional[np.ndarray] = None
self.faiss_index: Optional[faiss.IndexFlatL2] = None
# Preprocessing standard ImageNet
self.transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def _build_extractor(self, backbone: nn.Module) -> nn.Module:
"""Crea feature extractor con hook sugli intermediate layer."""
class FeatureExtractor(nn.Module):
def __init__(self, model, target_layers):
super().__init__()
self.model = model
self.target_layers = target_layers
self.features = {}
# Registra forward hook
for name, module in model.named_modules():
if name in target_layers:
module.register_forward_hook(
self._make_hook(name)
)
def _make_hook(self, name):
def hook(module, input, output):
self.features[name] = output
return hook
def forward(self, x):
self.features.clear()
self.model(x)
return self.features.copy()
return FeatureExtractor(backbone, self.layers)
def fit(self, train_loader: DataLoader,
coreset_ratio: float = 0.1) -> None:
"""
Costruisce la memory bank dalle immagini normali di training.
coreset_ratio: percentuale di patch da mantenere (0.1 = 10%)
Riduce la memory bank con greedy coreset subsampling.
"""
all_features = []
print("Estraendo feature patch dal training set...")
with torch.no_grad():
for batch_idx, (images, _) in enumerate(train_loader):
images = images.to(self.device)
features_dict = self.feature_extractor(images)
# Interpola e concatena feature da più layer
patch_features = self._aggregate_features(features_dict)
all_features.append(patch_features.cpu().numpy())
if batch_idx % 10 == 0:
print(f" Batch {batch_idx}/{len(train_loader)}")
# Stack tutte le patch features: (N_patches, D)
memory_bank = np.vstack(all_features)
print(f"Memory bank iniziale: {memory_bank.shape}")
# Random projection per ridurre dimensionalità
memory_bank = self.projector.fit_transform(memory_bank)
# Coreset subsampling: mantieni solo coreset_ratio% delle patch
n_coreset = max(1, int(len(memory_bank) * coreset_ratio))
memory_bank = self._greedy_coreset(memory_bank, n_coreset)
self.memory_bank = memory_bank.astype(np.float32)
print(f"Memory bank finale: {self.memory_bank.shape}")
# Costruisci indice FAISS per nearest neighbor veloce
dim = self.memory_bank.shape[1]
self.faiss_index = faiss.IndexFlatL2(dim)
self.faiss_index.add(self.memory_bank)
print("Memory bank pronta per inference")
def _aggregate_features(self, features_dict: dict) -> torch.Tensor:
"""
Interpola e concatena feature da diversi layer del backbone.
I feature map di layer2 (H/8) e layer3 (H/16) vengono
upsamplati alla stessa risoluzione e concatenati.
"""
feature_maps = list(features_dict.values())
# Target: risoluzione del primo layer (più alta)
target_size = feature_maps[0].shape[-2:]
aligned = []
for fm in feature_maps:
if fm.shape[-2:] != target_size:
fm = nn.functional.interpolate(
fm, size=target_size, mode='bilinear',
align_corners=False
)
aligned.append(fm)
# Concatena lungo dim channels: (B, C1+C2, H, W)
combined = torch.cat(aligned, dim=1)
# Reshape in patch tokens: (B*H*W, C1+C2)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def _greedy_coreset(self, data: np.ndarray, n: int) -> np.ndarray:
"""
Greedy k-center coreset subsampling.
Seleziona n punti che massimizzano la copertura dello spazio.
"""
if n >= len(data):
return data
selected = [np.random.randint(0, len(data))]
min_distances = np.full(len(data), np.inf)
for _ in range(n - 1):
last = data[selected[-1]]
dists = np.linalg.norm(data - last, axis=1)
min_distances = np.minimum(min_distances, dists)
selected.append(int(np.argmax(min_distances)))
return data[selected]
def score_image(self, img_bgr: np.ndarray,
img_size: int = 224) -> tuple[float, np.ndarray]:
"""
Calcola l'anomaly score di un'immagine.
Returns:
image_score: score scalare (soglia tipica: 0.5-0.8)
anomaly_map: mappa 2D dell'anomalia per localizzazione
"""
# Preprocessing
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (img_size, img_size))
tensor = self.transform(img_resized).unsqueeze(0).to(self.device)
with torch.no_grad():
features_dict = self.feature_extractor(tensor)
patch_features = self._aggregate_features(features_dict)
patch_features_proj = self.projector.transform(
patch_features.cpu().numpy().astype(np.float32)
)
# Nearest neighbor distance per ogni patch
distances, _ = self.faiss_index.search(
patch_features_proj, self.n_neighbors
)
# Score patch = media delle n_neighbors distanze
patch_scores = distances.mean(axis=1)
# Ricostruisci anomaly map (H, W)
n_patches_side = int(np.sqrt(len(patch_scores)))
anomaly_map = patch_scores.reshape(
n_patches_side, n_patches_side
)
# Upscale alla dimensione originale
anomaly_map_full = cv2.resize(
anomaly_map.astype(np.float32),
(img_bgr.shape[1], img_bgr.shape[0]),
interpolation=cv2.INTER_LINEAR
)
# Score immagine = percentile 99 degli score patch
image_score = float(np.percentile(patch_scores, 99))
return image_score, anomaly_map_full
4. Potok danych i wstępne przetwarzanie przemysłowe
import albumentations as A
import torch
from torch.utils.data import Dataset
import cv2
import numpy as np
from pathlib import Path
class PCBInspectionDataset(Dataset):
"""
Dataset per ispezione PCB.
Struttura cartelle attesa:
root/
train/good/ <- immagini normali (training)
test/good/ <- immagini normali (test)
test/defect_type_1/ <- immagini difettose per valutazione
test/defect_type_2/
ground_truth/ <- mask binarie delle anomalie (test)
"""
# Augmentation per dati NORMALI di training
# Obiettivo: aumentare la varieta senza introdurre pattern simili a difetti
NORMAL_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.RandomRotate90(p=0.5),
# Variazioni di illuminazione (simula diverse condizioni di luce)
A.RandomBrightnessContrast(
brightness_limit=0.1,
contrast_limit=0.1,
p=0.3
),
# NON aggiungere: blur, noise, elastic -> simulano difetti!
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Transform per test/inference (NO augmentation)
TEST_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def __init__(self, root: str, split: str = 'train',
augment: bool = True):
self.root = Path(root)
self.split = split
self.transform = (self.NORMAL_TRANSFORM if augment and split == 'train'
else self.TEST_TRANSFORM)
self.samples = [] # list di (img_path, label, mask_path_or_None)
self._load_samples()
def _load_samples(self) -> None:
if self.split == 'train':
# Solo immagini normali per training
normal_dir = self.root / 'train' / 'good'
for img_path in sorted(normal_dir.glob('*.png')):
self.samples.append((img_path, 0, None))
else:
# Test: normali + difettosi con ground truth mask
test_dir = self.root / 'test'
gt_dir = self.root / 'ground_truth'
for class_dir in sorted(test_dir.iterdir()):
label = 0 if class_dir.name == 'good' else 1
for img_path in sorted(class_dir.glob('*.png')):
mask_path = None
if label == 1:
# Path della ground truth mask
mask_path = (gt_dir / class_dir.name /
img_path.name)
self.samples.append((img_path, label, mask_path))
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> tuple:
img_path, label, mask_path = self.samples[idx]
# Carica immagine
img = cv2.imread(str(img_path))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Applica transform
transformed = self.transform(image=img)
img_tensor = torch.from_numpy(
transformed['image'].transpose(2, 0, 1) # HWC -> CHW
).float()
# Carica mask (se disponibile)
mask = np.zeros((224, 224), dtype=np.float32)
if mask_path and mask_path.exists():
m = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
m = cv2.resize(m, (224, 224)) / 255.0
mask = m.astype(np.float32)
return img_tensor, label, torch.from_numpy(mask)
class PCBInspectionPipeline:
"""Pipeline di ispezione real-time per la linea PCB."""
def __init__(self, model: PatchCoreModel,
threshold: float = 0.65,
alert_callback=None):
self.model = model
self.threshold = threshold
self.alert_callback = alert_callback
self.stats = {'total': 0, 'ok': 0, 'defect': 0}
def inspect(self, img_bgr: np.ndarray,
board_id: str = '') -> dict:
"""
Ispeziona una scheda PCB.
Returns: risultato con score, decision, anomaly_map
"""
score, anomaly_map = self.model.score_image(img_bgr)
is_defect = score >= self.threshold
self.stats['total'] += 1
if is_defect:
self.stats['defect'] += 1
if self.alert_callback:
self.alert_callback(board_id, score, anomaly_map)
else:
self.stats['ok'] += 1
return {
'board_id': board_id,
'score': float(score),
'is_defect': bool(is_defect),
'anomaly_map': anomaly_map,
'defect_rate': self.stats['defect'] / max(1, self.stats['total'])
}
5. Ocena: wskaźniki przemysłowe
W kontekście przemysłowym standardowe wskaźniki ML (dokładność, F1) są niewystarczające. Fałszywie negatywny wynik (niewykryta wada) kosztuje ogromnie więcej niż np Fałszywie dodatni (odrzucona dobra karta). Odpowiednie wskaźniki to:
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from typing import NamedTuple
class AnomalyEvaluationMetrics(NamedTuple):
auroc: float # Area Under ROC - detection-level
aupro: float # Area Under Per-Region Overlap - pixel-level
threshold: float # soglia ottimale per ROC
fnr: float # False Negative Rate alla soglia
fpr: float # False Positive Rate alla soglia
def evaluate_anomaly_detection(
model: PatchCoreModel,
test_loader,
target_fnr: float = 0.005 # max 0.5% di difetti non rilevati
) -> AnomalyEvaluationMetrics:
"""
Valutazione completa su test set con metriche industriali.
target_fnr: FNR target (vincolo di business, es. 0.5%)
La soglia viene selezionata per rispettare questo vincolo.
"""
all_scores = []
all_labels = []
all_masks = []
all_anomaly_maps = []
for images, labels, masks in test_loader:
for i in range(len(images)):
img_np = images[i].numpy().transpose(1, 2, 0)
# Denormalizza
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_denorm = ((img_np * std + mean) * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_denorm, cv2.COLOR_RGB2BGR)
score, anomaly_map = model.score_image(img_bgr)
all_scores.append(score)
all_labels.append(int(labels[i]))
all_masks.append(masks[i].numpy())
all_anomaly_maps.append(anomaly_map)
scores_arr = np.array(all_scores)
labels_arr = np.array(all_labels)
# Image-level AUROC
auroc = roc_auc_score(labels_arr, scores_arr)
# Pixel-level AUPRO (Area Under Per-Region Overlap)
aupro = compute_aupro(all_anomaly_maps, all_masks)
# Trova soglia che rispetta il vincolo FNR
fpr_arr, tpr_arr, thresholds = roc_curve(labels_arr, scores_arr)
fnr_arr = 1 - tpr_arr
# Seleziona soglia con FNR <= target_fnr
valid_idx = fnr_arr <= target_fnr
if valid_idx.any():
# Tra le soglie valide, scegli quella con FPR minima
valid_fprs = fpr_arr[valid_idx]
valid_thresholds = thresholds[valid_idx]
best_idx = np.argmin(valid_fprs)
optimal_threshold = float(valid_thresholds[best_idx])
optimal_fnr = float(fnr_arr[valid_idx][best_idx])
optimal_fpr = float(valid_fprs[best_idx])
else:
# Fallback: EER
eer_idx = np.argmin(np.abs(fpr_arr - fnr_arr))
optimal_threshold = float(thresholds[eer_idx])
optimal_fnr = float(fnr_arr[eer_idx])
optimal_fpr = float(fpr_arr[eer_idx])
print(f"=== Risultati Anomaly Detection ===")
print(f"AUROC (image-level): {auroc:.4f} ({auroc*100:.2f}%)")
print(f"AUPRO (pixel-level): {aupro:.4f} ({aupro*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FNR @ soglia: {optimal_fnr*100:.2f}%")
print(f"FPR @ soglia: {optimal_fpr*100:.2f}%")
return AnomalyEvaluationMetrics(
auroc=auroc,
aupro=aupro,
threshold=optimal_threshold,
fnr=optimal_fnr,
fpr=optimal_fpr
)
def compute_aupro(anomaly_maps: list, gt_masks: list,
max_fpr: float = 0.3) -> float:
"""
Area Under Per-Region Overlap (AUPRO).
Metrica robusta per la localizzazione di anomalie di dimensioni variabili.
A differenza del pixel-level AUROC, non penalizza i difetti piccoli.
"""
all_fprs, all_pros = [], []
for am, mask in zip(anomaly_maps, gt_masks):
am_flat = am.flatten()
mask_flat = (mask > 0.5).astype(float).flatten()
if mask_flat.sum() == 0:
continue
fpr_vals, tpr_vals, _ = roc_curve(mask_flat, am_flat)
all_fprs.extend(fpr_vals.tolist())
all_pros.extend(tpr_vals.tolist())
if not all_fprs:
return 0.0
# Ordina e integra fino a max_fpr
sorted_pairs = sorted(zip(all_fprs, all_pros))
fprs_sorted, pros_sorted = zip(*sorted_pairs)
# Integra con trapezoid rule entro max_fpr
aupro = 0.0
prev_fpr, prev_pro = 0.0, 0.0
for fpr_val, pro_val in zip(fprs_sorted, pros_sorted):
if fpr_val > max_fpr:
break
aupro += (fpr_val - prev_fpr) * (pro_val + prev_pro) / 2.0
prev_fpr, prev_pro = fpr_val, pro_val
return aupro / max_fpr if max_fpr > 0 else 0.0
6. Wdrożenie na Jetson Orin: Kompletny system
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import numpy as np
import cv2
import pickle
import time
from datetime import datetime
from typing import Optional
import asyncio
# FastAPI per REST API dell'inspection system
# pip install fastapi uvicorn python-multipart
app = FastAPI(title="PCB Inspection API", version="1.0.0")
# Stato globale del sistema
class InspectionSystem:
model: Optional[PatchCoreModel] = None
threshold: float = 0.65
stats = {
'total_inspected': 0,
'defects_found': 0,
'start_time': None,
'recent_scores': [] # ultimi 100 score per monitoring drift
}
system = InspectionSystem()
@app.on_event("startup")
async def load_model():
"""Carica il modello PatchCore all'avvio del server."""
print("Caricamento modello PatchCore...")
system.model = PatchCoreModel(device='cuda')
# Carica memory bank pre-calcolata
with open('memory_bank.pkl', 'rb') as f:
mb_data = pickle.load(f)
system.model.memory_bank = mb_data['memory_bank']
system.model.projector = mb_data['projector']
import faiss
dim = system.model.memory_bank.shape[1]
system.model.faiss_index = faiss.IndexFlatL2(dim)
system.model.faiss_index.add(system.model.memory_bank)
system.stats['start_time'] = datetime.now()
print("Sistema pronto")
@app.post("/inspect")
async def inspect_board(
file: UploadFile = File(...),
board_id: str = ""
):
"""
Endpoint principale per l'ispezione di una scheda PCB.
Accetta immagine JPEG/PNG e restituisce score e decisione.
"""
if system.model is None:
return JSONResponse(
status_code=503,
content={"error": "Model not loaded"}
)
# Leggi e decodifica immagine
img_bytes = await file.read()
nparr = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return JSONResponse(
status_code=400,
content={"error": "Invalid image"}
)
# Inference
t_start = time.perf_counter()
score, anomaly_map = system.model.score_image(img)
latency_ms = (time.perf_counter() - t_start) * 1000
is_defect = score >= system.threshold
# Aggiorna statistiche
system.stats['total_inspected'] += 1
if is_defect:
system.stats['defects_found'] += 1
system.stats['recent_scores'].append(float(score))
if len(system.stats['recent_scores']) > 100:
system.stats['recent_scores'].pop(0)
return {
"board_id": board_id or f"board_{system.stats['total_inspected']}",
"timestamp": datetime.now().isoformat(),
"score": round(float(score), 4),
"is_defect": bool(is_defect),
"latency_ms": round(latency_ms, 1),
"defect_rate_recent": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
)
}
@app.get("/health")
async def health_check():
"""Monitoraggio drift: score medio recente vs baseline."""
recent = system.stats['recent_scores']
return {
"status": "ok",
"model_loaded": system.model is not None,
"total_inspected": system.stats['total_inspected'],
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
),
"recent_avg_score": float(np.mean(recent)) if recent else 0.0,
"uptime_seconds": (
(datetime.now() - system.stats['start_time']).total_seconds()
if system.stats['start_time'] else 0
)
}
# Avvio: uvicorn inspection_server:app --host 0.0.0.0 --port 8000
7. Dryf modelu: wykrywanie i aktualizacja w produkcji
Model w produkcji nie jest statyczny. Nowe projekty PCB, zmiany w procesie spawania, zużycie świateł – wszystko to ma wpływ na dryf modelu: stopniowe pogarszanie się wyników, które nie zawsze jest widoczne w wskaźnikach klasyfikacji do czasu, aż stanie się ona krytyczna. Niezbędne jest proaktywne monitorowanie.
import numpy as np
import json
import time
from pathlib import Path
from collections import deque
from dataclasses import dataclass, field
from scipy import stats as scipy_stats
import warnings
@dataclass
class DriftReport:
"""Report dello stato del drift del modello."""
timestamp: float = field(default_factory=time.time)
window_size: int = 0
current_mean_score: float = 0.0
baseline_mean_score: float = 0.0
score_drift: float = 0.0 # differenza dalla baseline
ks_statistic: float = 0.0 # Kolmogorov-Smirnov stat
ks_p_value: float = 1.0 # p-value KS test
is_drifting: bool = False
drift_level: str = 'none' # 'none', 'warning', 'critical'
action_required: str = ''
class ModelDriftMonitor:
"""
Monitora il drift del modello PatchCore in produzione.
Basato su:
1. Score distribution monitoring (media rolling dei "good" score)
2. Kolmogorov-Smirnov test tra distribuzione corrente e baseline
3. Alert adattivi con 3 livelli: OK / WARNING / CRITICAL
Strategia di aggiornamento:
- WARNING: aumenta frequenza di sampling manuale (ogni 100 schede invece di 1000)
- CRITICAL: blocca linea per re-calibrazione del modello
"""
def __init__(self,
baseline_scores_path: str,
window_size: int = 500,
warning_threshold: float = 0.05, # +5% drift
critical_threshold: float = 0.10, # +10% drift
ks_alpha: float = 0.01): # p-value per KS test
"""
baseline_scores_path: JSON con score di "good" boards baseline
window_size: numero di score da mantenere nella finestra rolling
"""
self.window_size = window_size
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.ks_alpha = ks_alpha
# Carica baseline
with open(baseline_scores_path) as f:
data = json.load(f)
self.baseline_scores = np.array(data['good_scores'])
self.baseline_mean = float(np.mean(self.baseline_scores))
self.baseline_std = float(np.std(self.baseline_scores))
print(f"Baseline caricata: {len(self.baseline_scores)} score")
print(f" Mean: {self.baseline_mean:.4f} ± {self.baseline_std:.4f}")
# Finestra rolling per i "good" score correnti
self.current_scores: deque = deque(maxlen=window_size)
self.report_history: list[DriftReport] = []
def record_score(self, anomaly_score: float,
is_good: bool) -> None:
"""
Registra lo score di una scheda.
is_good: True se la scheda è stata classificata come OK
(o confermata come buona dall'operatore)
"""
if is_good:
self.current_scores.append(anomaly_score)
def check_drift(self) -> DriftReport:
"""
Esegue il controllo drift sulla finestra corrente.
Returns: DriftReport con stato e azione consigliata.
"""
report = DriftReport(window_size=len(self.current_scores))
if len(self.current_scores) < 50:
# Non abbastanza dati per un test significativo
report.action_required = 'collecting_data'
return report
current = np.array(self.current_scores)
report.current_mean_score = float(np.mean(current))
report.baseline_mean_score = self.baseline_mean
# 1. Score drift (differenza dalla baseline in deviazioni standard)
report.score_drift = abs(
report.current_mean_score - self.baseline_mean
) / (self.baseline_std + 1e-10)
# 2. Kolmogorov-Smirnov test
# Confronta la distribuzione corrente con la baseline
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ks_stat, ks_pval = scipy_stats.ks_2samp(
self.baseline_scores, current
)
report.ks_statistic = float(ks_stat)
report.ks_p_value = float(ks_pval)
# 3. Classificazione del drift
distribution_changed = ks_pval < self.ks_alpha
mean_drifted_critical = report.score_drift > self.critical_threshold * 10
mean_drifted_warning = report.score_drift > self.warning_threshold * 10
if mean_drifted_critical and distribution_changed:
report.is_drifting = True
report.drift_level = 'critical'
report.action_required = 'STOP_LINE: re-calibrate model immediately'
elif mean_drifted_warning or distribution_changed:
report.is_drifting = True
report.drift_level = 'warning'
report.action_required = 'increase_sampling: inspect every 100 boards'
else:
report.drift_level = 'none'
report.action_required = 'continue_normal_operation'
self.report_history.append(report)
return report
def suggest_retraining(self, current_auroc: float,
target_auroc: float = 0.99) -> dict:
"""
Suggerisce una strategia di retraining basata sulle metriche correnti.
"""
auroc_drop = target_auroc - current_auroc
latest_report = self.report_history[-1] if self.report_history else None
strategy = {
'retrain_needed': current_auroc < target_auroc - 0.005,
'current_auroc': current_auroc,
'target_auroc': target_auroc,
'auroc_gap': auroc_drop,
}
if current_auroc >= target_auroc:
strategy['action'] = 'no_action_needed'
elif auroc_drop < 0.02:
# Piccola degradazione: aggiorna solo memory bank con nuovi campioni buoni
strategy['action'] = 'update_memory_bank'
strategy['description'] = (
'Aggiungi 200-500 nuove immagini buone alla memory bank. '
'Non richiede re-training del backbone.'
)
elif auroc_drop < 0.05:
# Degradazione media: fine-tune con nuovi campioni
strategy['action'] = 'incremental_finetune'
strategy['description'] = (
'Fine-tune backbone con mix di dati vecchi (70%) e nuovi (30%). '
'Learning rate basso: 1e-5. 10-20 epoch.'
)
else:
# Degradazione severa: retrain completo
strategy['action'] = 'full_retrain'
strategy['description'] = (
'Rebuild completo della memory bank con dati correnti. '
'Considera re-annotazione se sono cambiati i tipi di difetti.'
)
return strategy
# ---- Threshold adattivo basato su SLA di produzione ----
def compute_adaptive_threshold(model_scores_good: np.ndarray,
model_scores_defective: np.ndarray,
max_fnr: float = 0.003, # SLA: max 0.3% FNR
max_fpr: float = 0.02) -> dict:
"""
Calcola la soglia ottimale per rispettare gli SLA di produzione.
Priorità: FNR (difetti sfuggiti) è il vincolo critico.
FPR (falsi allarmi) è secondario.
Returns: {threshold, fnr, fpr, tradeoff_note}
"""
# Genera candidati threshold
all_scores = np.concatenate([model_scores_good, model_scores_defective])
thresholds = np.percentile(all_scores, np.arange(1, 100, 0.5))
best_threshold = None
best_fpr = float('inf')
for t in thresholds:
# Predizioni
tp = np.sum(model_scores_defective > t)
fn = np.sum(model_scores_defective <= t)
fp = np.sum(model_scores_good > t)
tn = np.sum(model_scores_good <= t)
fnr = fn / (fn + tp + 1e-10)
fpr = fp / (fp + tn + 1e-10)
# Rispetta il vincolo FNR e minimizza FPR
if fnr <= max_fnr and fpr < best_fpr:
best_fpr = fpr
best_threshold = t
if best_threshold is None:
# Fallback: usa EER se non riesce a rispettare FNR SLA
import warnings
warnings.warn("Impossibile rispettare FNR SLA. Usando EER come fallback.")
# ... (implementazione EER omessa per brevita)
best_threshold = np.median(all_scores)
best_fpr = 0.05
# Metriche finali alla soglia scelta
tp = np.sum(model_scores_defective > best_threshold)
fn = np.sum(model_scores_defective <= best_threshold)
fp = np.sum(model_scores_good > best_threshold)
tn = np.sum(model_scores_good <= best_threshold)
fnr_final = fn / (fn + tp + 1e-10)
fpr_final = fp / (fp + tn + 1e-10)
return {
'threshold': float(best_threshold),
'fnr': float(fnr_final),
'fpr': float(fpr_final),
'precision': float(tp / (tp + fp + 1e-10)),
'recall': float(tp / (tp + fn + 1e-10)),
'meets_fnr_sla': fnr_final <= max_fnr,
'meets_fpr_target': fpr_final <= max_fpr,
}
8. Wyniki i wyciągnięte wnioski
Wyniki prawdziwego projektu (po 3 miesiącach produkcji)
| Metryczny | Inspekcja ludzka | Wizja systemu AI | Poprawa |
|---|---|---|---|
| Przepustowość | 20 kart/min | 140 kart/min | 7x |
| Fałszywie ujemna stawka | 2-15% (różni się w zależności od wysiłku) | 0,3% | ~10x |
| Fałszywie dodatni współczynnik | 0,5% | 1,4% | -2,8x (pogorszenie) |
| Koszt inspekcji/arkusza | 0,12 € | 0,02 € | 6-krotna redukcja |
| Braki, które umknęły na boisku | 23 roszczenia/miesiąc | 3 roszczenia/miesiąc | Redukcja 87%. |
5 kluczowych wniosków płynących z projektu
- Stawka fałszywie dodatnia podlega negocjacjom, stawka fałszywie ujemna nie jest: klient zgadza się odrzucić jeszcze kilka dobrych kart, ale nie toleruje wadliwych kart w terenie. Zawsze projektuj próg tak, aby najpierw spełniał ograniczenie FNR.
- Oświetlenie jest najważniejszym elementem: 60% początkowych fałszywych alarmów było spowodowanych zmianami oświetlenia, a nie rzeczywistymi defektami. Zainwestuj w kontrolowany system oświetlenia (stroboskop LED, kopułka dyfuzora), zanim jeszcze pomyślisz o modelu.
- PatchCore degraduje się powoli, ale degraduje: po 3 miesiącach AUROC spadł z 99,1% do 97,8% dla nowych projektów PCB, które nie znajdują się w banku pamięci. Zaplanuj strategię rozbudowy banku pamięci przyrostowej.
- Lokalizacja jest istotna dla operatora: binarna decyzja „OK/NOK” nie wystarczy. Mapa anomalii pokazująca GDZIE zlokalizowana jest wada, skraca czas analizy operatora w celu ręcznej weryfikacji fałszywych alarmów o 70%.
- Monitoruj dystrybucję wyników, a nie tylko dokładność: dryf modelu jest widoczny przede wszystkim w rozkładzie wyników, a nie w metrykach klasyfikacyjnych. Ostrzegaj, jeśli średnia „dobrych” wyników wzrośnie powyżej historycznej wartości bazowej.
Wnioski z serii
To studium przypadku zamyka naszą serię dotyczącą widzenia komputerowego i głębokiego uczenia się. Mamy objęło całą trajektorię: od podstawowych CNN po transfer uczenia się i wykrywanie obiektów z YOLO26 do segmentacji, od augmentacji po rurociągi produkcyjne OpenCV, od wdrożenia brzegowego po rozpoznawanie twarzy, aż do prawdziwego zastosowania przemysłowego.
Widzenie komputerowe to dyscyplina praktyczna: najlepsze wyniki osiągają ci, którzy rozumieją zarówno teoria (architektury, funkcje strat, metryki), jak i inżynieria systemów (przygotowanie wstępne, wdrożenie, monitorowanie). Dziedzina szybko się rozwija – YOLO26 tak wydany w styczniu 2026 r. SAM2 zrewolucjonizował interaktywną segmentację – ale m.in podstawowe zasady pozostają niezmienne.
Nawigacja serii
- Poprzedni: Wykrywanie i rozpoznawanie twarzy: nowoczesne techniki
- Początek serii: CNN: Sieci splotowe od zera do produkcji
Zasoby międzyserialne
- MLOps: monitorowanie i wykrywanie dryfu modelu - zaawansowany monitoring produkcji
- Zaawansowane głębokie uczenie się: kwantyzacja i kompresja - optymalizacja modelu
- Inżynieria AI: wyszukiwanie RAG i wektorów - integracja z systemami AI







