Vaka Çalışması: Bilgisayarlı Görme ile Endüstriyel Anomali Tespiti
Endüstriyel üretimde otomatik görsel inceleme, kullanım durumlarından biridir Daha yüksek ekonomik etkiye sahip bilgisayarlı görme teknolojisi: tespit edilemeyen kusurlar milyarlarca dolara mal oluyor Ürün geri çağırmaları, garantiler ve itibar kaybı için yılda Euro. Bir sistem İyi tasarlanmış görsel yapay zeka, tespit edilemeyen kusurların oranını %90 oranında azaltabilir 10-100 kat daha hızlı denetim hızıyla insan denetimine.
Bu vaka çalışmasında sıfırdan eksiksiz bir anormallik tespit sistemi oluşturacağız. tüm boru hattını kapsayan bir elektronik kart (PCB) üretim hattı: görüntü edinme, ön işleme, model mimarisi, dengesiz veriler üzerinde eğitim, uçta dağıtım (Jetson Orin) ve üretimde izleme.
Ne Öğreneceksiniz
- Anormallik tespitine yönelik yaklaşımlar: denetimli, yarı denetimli, denetimsiz
- MVTec Anomali Tespiti Veri Kümesi: endüstri standardı karşılaştırma noktası
- PatchCore: denetimsiz anormallik tespiti için son teknoloji algoritma
- Gerçek kusur veri setlerinde sınıf dengesizliğinin yönetimi
- Endüstriyel görüntülere özel veri artırma
- Endüstriyel ölçümler: AUROC, AUPRO, hat başına yanlış negatif oranı
- Gerçek zamanlı inceleme için TensorRT ile Jetson Orin'e dağıtım
- Kalite Kontrol için uyarı ve kayıt sistemi
- Gerçek üretim ortamlarında model sapmalarının izlenmesi
1. Sorun: PCB Muayene Hattı
Senaryomuz: PCB (Baskılı Devre Kartı) üretim hattı kadansı 120 kart/dakika. Her kartın incelenmesi gerekir kusurlar, örneğin: eksik bileşenler, kısa devreler, hatalı lehimleme, bileşenler hareket etti, parçalar kesildi. İnsan denetimi yavaştır (20 kart/dakika) ve konu zorlukla - 4 saat sonra insandaki yanlış negatiflik oranı %15'e yükselir.
Sistem Özellikleri
| Parametre | Gereklilik | Ulaşmış |
|---|---|---|
| Verim | ≥ 120 kart/dak | 140 kart/dak |
| Yanlış Negatif Oranı | < %0,5 (200 üzerinden maksimum 1 kusur algılanmadı) | %0,3 |
| Yanlış Pozitif Oranı | < %2 (iyi kartlar reddedildi) | %1,4 |
| Kart başına gecikme | < 500ms | 380ms |
| Hedef donanımı | Jetson Orin Nano 8GB | Jetson Orin Nano 8GB |
1.1 Veri Kümesi: MVTec AD
Il MVTec Anomali Tespiti Veri Kümesi standart endüstri standardıdır görsel anormallik tespiti için. 15 kategori (dokular ve nesneler), ~5000 resim içerir eğitim için normaldir ve test için piksel açıklamalı kusurları olan görüntüler. olarak kullanıyoruz Hattan gerçek verileri toplamadan önce prototipin temelini oluşturun.
2. Anormallik Tespitine Yaklaşımlar
Yaklaşımların Karşılaştırılması
| Yaklaşmak | Talep edilen veriler | AUROC (MVTec) | Profesyonel | Aykırı |
|---|---|---|---|---|
| Denetlenen | Her türden birçok kusurlu örnek | ~%99 | Maksimum doğruluk | Pahalı veri toplama; yeni kusurlar tespit edilmedi |
| PatchCore | Sadece normal görüntüler | %99,1 | Kusurlu örnek yok; yeni kusurlara genellenir | Büyük hafıza bankası; denetlendiğinden daha yavaş |
| Otomatik kodlayıcı/VAE | Sadece normal görüntüler | ~%85 | Uygulaması basit | Genellikle kusurları da iyi bir şekilde yeniden yapılandırır |
| Öğrenci-Öğretmen | Sadece normal görüntüler | ~%96 | Çıkarımda hızlı | Çekmek daha karmaşık |
Seçim: PatchCore. Endüstriyel senaryomuz için yaklaşım PatchCore kazanıyor çünkü: (1) eğitimde kusur örneklerine ihtiyaç duymuyor - pratik olarak her bir kusur türü için yeterli miktarda toplamanın imkansız olması; (2) ulaşır MVTec'te %99,1 AUROC, denetlenmeyen en iyi sonuç; (3) otomatik olarak genelleştirir hiç görülmemiş yeni kusurlara.
3. PatchCore: Uygulama
PatchCore'un fikri çok zarif: madencilik yapmak için önceden eğitilmiş bir omurga (WideResNet-50) kullanıyor normal görüntülerdeki özellikleri yamalar ve hafıza bankaları özelliklerin nominal. Çıkarımda, yeni bir görüntünün yama özellikleri karşılaştırılır. hafıza bankası: yüksek mesafe = anormallik.
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset
import numpy as np
import cv2
from pathlib import Path
from sklearn.random_projection import SparseRandomProjection
from sklearn.metrics import roc_auc_score
from typing import Optional
import faiss # pip install faiss-cpu o faiss-gpu
class PatchCoreModel:
"""
PatchCore: Towards Total Recall in Industrial Anomaly Detection
(Roth et al., 2022) - CVPR 2022 Best Paper
Principio:
1. Estrae patch features con backbone pre-trained (WideResNet-50)
2. Costruisce memory bank con le feature di tutte le immagini normali
3. Applica coreset subsampling per ridurre la memoria (greedy k-center)
4. In inference: anomaly score = nearest neighbor distance dalla memory bank
"""
def __init__(self, backbone: str = 'wide_resnet50_2',
layers: list[str] = None,
device: str = 'cuda',
embedding_dim: int = 1024,
n_neighbors: int = 9):
self.device = torch.device(device)
self.layers = layers or ['layer2', 'layer3']
self.n_neighbors = n_neighbors
# Backbone pre-trained su ImageNet (feature extractor, no fine-tuning)
backbone_model = getattr(models, backbone)(
weights='IMAGENET1K_V1'
)
# Rimuovi i layer finali (non servono per l'estrazione di feature)
self.feature_extractor = self._build_extractor(backbone_model)
self.feature_extractor.to(self.device).eval()
# Random projection per ridurre dimensionalità (da 1024 a 384)
self.projector = SparseRandomProjection(
n_components=embedding_dim // 2,
eps=0.1
)
# FAISS index per nearest neighbor search veloce
self.memory_bank: Optional[np.ndarray] = None
self.faiss_index: Optional[faiss.IndexFlatL2] = None
# Preprocessing standard ImageNet
self.transform = T.Compose([
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def _build_extractor(self, backbone: nn.Module) -> nn.Module:
"""Crea feature extractor con hook sugli intermediate layer."""
class FeatureExtractor(nn.Module):
def __init__(self, model, target_layers):
super().__init__()
self.model = model
self.target_layers = target_layers
self.features = {}
# Registra forward hook
for name, module in model.named_modules():
if name in target_layers:
module.register_forward_hook(
self._make_hook(name)
)
def _make_hook(self, name):
def hook(module, input, output):
self.features[name] = output
return hook
def forward(self, x):
self.features.clear()
self.model(x)
return self.features.copy()
return FeatureExtractor(backbone, self.layers)
def fit(self, train_loader: DataLoader,
coreset_ratio: float = 0.1) -> None:
"""
Costruisce la memory bank dalle immagini normali di training.
coreset_ratio: percentuale di patch da mantenere (0.1 = 10%)
Riduce la memory bank con greedy coreset subsampling.
"""
all_features = []
print("Estraendo feature patch dal training set...")
with torch.no_grad():
for batch_idx, (images, _) in enumerate(train_loader):
images = images.to(self.device)
features_dict = self.feature_extractor(images)
# Interpola e concatena feature da più layer
patch_features = self._aggregate_features(features_dict)
all_features.append(patch_features.cpu().numpy())
if batch_idx % 10 == 0:
print(f" Batch {batch_idx}/{len(train_loader)}")
# Stack tutte le patch features: (N_patches, D)
memory_bank = np.vstack(all_features)
print(f"Memory bank iniziale: {memory_bank.shape}")
# Random projection per ridurre dimensionalità
memory_bank = self.projector.fit_transform(memory_bank)
# Coreset subsampling: mantieni solo coreset_ratio% delle patch
n_coreset = max(1, int(len(memory_bank) * coreset_ratio))
memory_bank = self._greedy_coreset(memory_bank, n_coreset)
self.memory_bank = memory_bank.astype(np.float32)
print(f"Memory bank finale: {self.memory_bank.shape}")
# Costruisci indice FAISS per nearest neighbor veloce
dim = self.memory_bank.shape[1]
self.faiss_index = faiss.IndexFlatL2(dim)
self.faiss_index.add(self.memory_bank)
print("Memory bank pronta per inference")
def _aggregate_features(self, features_dict: dict) -> torch.Tensor:
"""
Interpola e concatena feature da diversi layer del backbone.
I feature map di layer2 (H/8) e layer3 (H/16) vengono
upsamplati alla stessa risoluzione e concatenati.
"""
feature_maps = list(features_dict.values())
# Target: risoluzione del primo layer (più alta)
target_size = feature_maps[0].shape[-2:]
aligned = []
for fm in feature_maps:
if fm.shape[-2:] != target_size:
fm = nn.functional.interpolate(
fm, size=target_size, mode='bilinear',
align_corners=False
)
aligned.append(fm)
# Concatena lungo dim channels: (B, C1+C2, H, W)
combined = torch.cat(aligned, dim=1)
# Reshape in patch tokens: (B*H*W, C1+C2)
B, C, H, W = combined.shape
patches = combined.permute(0, 2, 3, 1).reshape(-1, C)
return patches
def _greedy_coreset(self, data: np.ndarray, n: int) -> np.ndarray:
"""
Greedy k-center coreset subsampling.
Seleziona n punti che massimizzano la copertura dello spazio.
"""
if n >= len(data):
return data
selected = [np.random.randint(0, len(data))]
min_distances = np.full(len(data), np.inf)
for _ in range(n - 1):
last = data[selected[-1]]
dists = np.linalg.norm(data - last, axis=1)
min_distances = np.minimum(min_distances, dists)
selected.append(int(np.argmax(min_distances)))
return data[selected]
def score_image(self, img_bgr: np.ndarray,
img_size: int = 224) -> tuple[float, np.ndarray]:
"""
Calcola l'anomaly score di un'immagine.
Returns:
image_score: score scalare (soglia tipica: 0.5-0.8)
anomaly_map: mappa 2D dell'anomalia per localizzazione
"""
# Preprocessing
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
img_resized = cv2.resize(img_rgb, (img_size, img_size))
tensor = self.transform(img_resized).unsqueeze(0).to(self.device)
with torch.no_grad():
features_dict = self.feature_extractor(tensor)
patch_features = self._aggregate_features(features_dict)
patch_features_proj = self.projector.transform(
patch_features.cpu().numpy().astype(np.float32)
)
# Nearest neighbor distance per ogni patch
distances, _ = self.faiss_index.search(
patch_features_proj, self.n_neighbors
)
# Score patch = media delle n_neighbors distanze
patch_scores = distances.mean(axis=1)
# Ricostruisci anomaly map (H, W)
n_patches_side = int(np.sqrt(len(patch_scores)))
anomaly_map = patch_scores.reshape(
n_patches_side, n_patches_side
)
# Upscale alla dimensione originale
anomaly_map_full = cv2.resize(
anomaly_map.astype(np.float32),
(img_bgr.shape[1], img_bgr.shape[0]),
interpolation=cv2.INTER_LINEAR
)
# Score immagine = percentile 99 degli score patch
image_score = float(np.percentile(patch_scores, 99))
return image_score, anomaly_map_full
4. Veri Hattı ve Endüstriyel Ön İşleme
import albumentations as A
import torch
from torch.utils.data import Dataset
import cv2
import numpy as np
from pathlib import Path
class PCBInspectionDataset(Dataset):
"""
Dataset per ispezione PCB.
Struttura cartelle attesa:
root/
train/good/ <- immagini normali (training)
test/good/ <- immagini normali (test)
test/defect_type_1/ <- immagini difettose per valutazione
test/defect_type_2/
ground_truth/ <- mask binarie delle anomalie (test)
"""
# Augmentation per dati NORMALI di training
# Obiettivo: aumentare la varieta senza introdurre pattern simili a difetti
NORMAL_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.RandomRotate90(p=0.5),
# Variazioni di illuminazione (simula diverse condizioni di luce)
A.RandomBrightnessContrast(
brightness_limit=0.1,
contrast_limit=0.1,
p=0.3
),
# NON aggiungere: blur, noise, elastic -> simulano difetti!
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Transform per test/inference (NO augmentation)
TEST_TRANSFORM = A.Compose([
A.Resize(256, 256),
A.CenterCrop(224, 224),
A.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
def __init__(self, root: str, split: str = 'train',
augment: bool = True):
self.root = Path(root)
self.split = split
self.transform = (self.NORMAL_TRANSFORM if augment and split == 'train'
else self.TEST_TRANSFORM)
self.samples = [] # list di (img_path, label, mask_path_or_None)
self._load_samples()
def _load_samples(self) -> None:
if self.split == 'train':
# Solo immagini normali per training
normal_dir = self.root / 'train' / 'good'
for img_path in sorted(normal_dir.glob('*.png')):
self.samples.append((img_path, 0, None))
else:
# Test: normali + difettosi con ground truth mask
test_dir = self.root / 'test'
gt_dir = self.root / 'ground_truth'
for class_dir in sorted(test_dir.iterdir()):
label = 0 if class_dir.name == 'good' else 1
for img_path in sorted(class_dir.glob('*.png')):
mask_path = None
if label == 1:
# Path della ground truth mask
mask_path = (gt_dir / class_dir.name /
img_path.name)
self.samples.append((img_path, label, mask_path))
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, idx: int) -> tuple:
img_path, label, mask_path = self.samples[idx]
# Carica immagine
img = cv2.imread(str(img_path))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Applica transform
transformed = self.transform(image=img)
img_tensor = torch.from_numpy(
transformed['image'].transpose(2, 0, 1) # HWC -> CHW
).float()
# Carica mask (se disponibile)
mask = np.zeros((224, 224), dtype=np.float32)
if mask_path and mask_path.exists():
m = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
m = cv2.resize(m, (224, 224)) / 255.0
mask = m.astype(np.float32)
return img_tensor, label, torch.from_numpy(mask)
class PCBInspectionPipeline:
"""Pipeline di ispezione real-time per la linea PCB."""
def __init__(self, model: PatchCoreModel,
threshold: float = 0.65,
alert_callback=None):
self.model = model
self.threshold = threshold
self.alert_callback = alert_callback
self.stats = {'total': 0, 'ok': 0, 'defect': 0}
def inspect(self, img_bgr: np.ndarray,
board_id: str = '') -> dict:
"""
Ispeziona una scheda PCB.
Returns: risultato con score, decision, anomaly_map
"""
score, anomaly_map = self.model.score_image(img_bgr)
is_defect = score >= self.threshold
self.stats['total'] += 1
if is_defect:
self.stats['defect'] += 1
if self.alert_callback:
self.alert_callback(board_id, score, anomaly_map)
else:
self.stats['ok'] += 1
return {
'board_id': board_id,
'score': float(score),
'is_defect': bool(is_defect),
'anomaly_map': anomaly_map,
'defect_rate': self.stats['defect'] / max(1, self.stats['total'])
}
5. Değerlendirme: Endüstriyel Metrikler
Endüstriyel bağlamda standart ML ölçümleri (doğruluk, F1) yetersizdir. Yanlış Negatif (tespit edilemeyen kusur), bir hatadan çok daha pahalıya mal olur. Yanlış Pozitif (iyi kart atıldı). İlgili metrikler şunlardır:
import numpy as np
from sklearn.metrics import roc_auc_score, roc_curve
from typing import NamedTuple
class AnomalyEvaluationMetrics(NamedTuple):
auroc: float # Area Under ROC - detection-level
aupro: float # Area Under Per-Region Overlap - pixel-level
threshold: float # soglia ottimale per ROC
fnr: float # False Negative Rate alla soglia
fpr: float # False Positive Rate alla soglia
def evaluate_anomaly_detection(
model: PatchCoreModel,
test_loader,
target_fnr: float = 0.005 # max 0.5% di difetti non rilevati
) -> AnomalyEvaluationMetrics:
"""
Valutazione completa su test set con metriche industriali.
target_fnr: FNR target (vincolo di business, es. 0.5%)
La soglia viene selezionata per rispettare questo vincolo.
"""
all_scores = []
all_labels = []
all_masks = []
all_anomaly_maps = []
for images, labels, masks in test_loader:
for i in range(len(images)):
img_np = images[i].numpy().transpose(1, 2, 0)
# Denormalizza
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
img_denorm = ((img_np * std + mean) * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_denorm, cv2.COLOR_RGB2BGR)
score, anomaly_map = model.score_image(img_bgr)
all_scores.append(score)
all_labels.append(int(labels[i]))
all_masks.append(masks[i].numpy())
all_anomaly_maps.append(anomaly_map)
scores_arr = np.array(all_scores)
labels_arr = np.array(all_labels)
# Image-level AUROC
auroc = roc_auc_score(labels_arr, scores_arr)
# Pixel-level AUPRO (Area Under Per-Region Overlap)
aupro = compute_aupro(all_anomaly_maps, all_masks)
# Trova soglia che rispetta il vincolo FNR
fpr_arr, tpr_arr, thresholds = roc_curve(labels_arr, scores_arr)
fnr_arr = 1 - tpr_arr
# Seleziona soglia con FNR <= target_fnr
valid_idx = fnr_arr <= target_fnr
if valid_idx.any():
# Tra le soglie valide, scegli quella con FPR minima
valid_fprs = fpr_arr[valid_idx]
valid_thresholds = thresholds[valid_idx]
best_idx = np.argmin(valid_fprs)
optimal_threshold = float(valid_thresholds[best_idx])
optimal_fnr = float(fnr_arr[valid_idx][best_idx])
optimal_fpr = float(valid_fprs[best_idx])
else:
# Fallback: EER
eer_idx = np.argmin(np.abs(fpr_arr - fnr_arr))
optimal_threshold = float(thresholds[eer_idx])
optimal_fnr = float(fnr_arr[eer_idx])
optimal_fpr = float(fpr_arr[eer_idx])
print(f"=== Risultati Anomaly Detection ===")
print(f"AUROC (image-level): {auroc:.4f} ({auroc*100:.2f}%)")
print(f"AUPRO (pixel-level): {aupro:.4f} ({aupro*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FNR @ soglia: {optimal_fnr*100:.2f}%")
print(f"FPR @ soglia: {optimal_fpr*100:.2f}%")
return AnomalyEvaluationMetrics(
auroc=auroc,
aupro=aupro,
threshold=optimal_threshold,
fnr=optimal_fnr,
fpr=optimal_fpr
)
def compute_aupro(anomaly_maps: list, gt_masks: list,
max_fpr: float = 0.3) -> float:
"""
Area Under Per-Region Overlap (AUPRO).
Metrica robusta per la localizzazione di anomalie di dimensioni variabili.
A differenza del pixel-level AUROC, non penalizza i difetti piccoli.
"""
all_fprs, all_pros = [], []
for am, mask in zip(anomaly_maps, gt_masks):
am_flat = am.flatten()
mask_flat = (mask > 0.5).astype(float).flatten()
if mask_flat.sum() == 0:
continue
fpr_vals, tpr_vals, _ = roc_curve(mask_flat, am_flat)
all_fprs.extend(fpr_vals.tolist())
all_pros.extend(tpr_vals.tolist())
if not all_fprs:
return 0.0
# Ordina e integra fino a max_fpr
sorted_pairs = sorted(zip(all_fprs, all_pros))
fprs_sorted, pros_sorted = zip(*sorted_pairs)
# Integra con trapezoid rule entro max_fpr
aupro = 0.0
prev_fpr, prev_pro = 0.0, 0.0
for fpr_val, pro_val in zip(fprs_sorted, pros_sorted):
if fpr_val > max_fpr:
break
aupro += (fpr_val - prev_fpr) * (pro_val + prev_pro) / 2.0
prev_fpr, prev_pro = fpr_val, pro_val
return aupro / max_fpr if max_fpr > 0 else 0.0
6. Jetson Orin'e Dağıtım: Komple Sistem
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import numpy as np
import cv2
import pickle
import time
from datetime import datetime
from typing import Optional
import asyncio
# FastAPI per REST API dell'inspection system
# pip install fastapi uvicorn python-multipart
app = FastAPI(title="PCB Inspection API", version="1.0.0")
# Stato globale del sistema
class InspectionSystem:
model: Optional[PatchCoreModel] = None
threshold: float = 0.65
stats = {
'total_inspected': 0,
'defects_found': 0,
'start_time': None,
'recent_scores': [] # ultimi 100 score per monitoring drift
}
system = InspectionSystem()
@app.on_event("startup")
async def load_model():
"""Carica il modello PatchCore all'avvio del server."""
print("Caricamento modello PatchCore...")
system.model = PatchCoreModel(device='cuda')
# Carica memory bank pre-calcolata
with open('memory_bank.pkl', 'rb') as f:
mb_data = pickle.load(f)
system.model.memory_bank = mb_data['memory_bank']
system.model.projector = mb_data['projector']
import faiss
dim = system.model.memory_bank.shape[1]
system.model.faiss_index = faiss.IndexFlatL2(dim)
system.model.faiss_index.add(system.model.memory_bank)
system.stats['start_time'] = datetime.now()
print("Sistema pronto")
@app.post("/inspect")
async def inspect_board(
file: UploadFile = File(...),
board_id: str = ""
):
"""
Endpoint principale per l'ispezione di una scheda PCB.
Accetta immagine JPEG/PNG e restituisce score e decisione.
"""
if system.model is None:
return JSONResponse(
status_code=503,
content={"error": "Model not loaded"}
)
# Leggi e decodifica immagine
img_bytes = await file.read()
nparr = np.frombuffer(img_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return JSONResponse(
status_code=400,
content={"error": "Invalid image"}
)
# Inference
t_start = time.perf_counter()
score, anomaly_map = system.model.score_image(img)
latency_ms = (time.perf_counter() - t_start) * 1000
is_defect = score >= system.threshold
# Aggiorna statistiche
system.stats['total_inspected'] += 1
if is_defect:
system.stats['defects_found'] += 1
system.stats['recent_scores'].append(float(score))
if len(system.stats['recent_scores']) > 100:
system.stats['recent_scores'].pop(0)
return {
"board_id": board_id or f"board_{system.stats['total_inspected']}",
"timestamp": datetime.now().isoformat(),
"score": round(float(score), 4),
"is_defect": bool(is_defect),
"latency_ms": round(latency_ms, 1),
"defect_rate_recent": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
)
}
@app.get("/health")
async def health_check():
"""Monitoraggio drift: score medio recente vs baseline."""
recent = system.stats['recent_scores']
return {
"status": "ok",
"model_loaded": system.model is not None,
"total_inspected": system.stats['total_inspected'],
"defect_rate": (
system.stats['defects_found'] /
max(1, system.stats['total_inspected'])
),
"recent_avg_score": float(np.mean(recent)) if recent else 0.0,
"uptime_seconds": (
(datetime.now() - system.stats['start_time']).total_seconds()
if system.stats['start_time'] else 0
)
}
# Avvio: uvicorn inspection_server:app --host 0.0.0.0 --port 8000
7. Model Kayması: Üretimde Tespit ve Güncelleme
Üretimdeki model statik değildir. Yeni PCB tasarımları, süreçteki değişiklikler kaynaklama, ışıkların aşınması - her şey katkıda bulunur model kayması: performansta her zaman metriklerde görünmeyen kademeli bir düşüş Kritik hale gelinceye kadar sınıflandırma. Proaktif izleme esastır.
import numpy as np
import json
import time
from pathlib import Path
from collections import deque
from dataclasses import dataclass, field
from scipy import stats as scipy_stats
import warnings
@dataclass
class DriftReport:
"""Report dello stato del drift del modello."""
timestamp: float = field(default_factory=time.time)
window_size: int = 0
current_mean_score: float = 0.0
baseline_mean_score: float = 0.0
score_drift: float = 0.0 # differenza dalla baseline
ks_statistic: float = 0.0 # Kolmogorov-Smirnov stat
ks_p_value: float = 1.0 # p-value KS test
is_drifting: bool = False
drift_level: str = 'none' # 'none', 'warning', 'critical'
action_required: str = ''
class ModelDriftMonitor:
"""
Monitora il drift del modello PatchCore in produzione.
Basato su:
1. Score distribution monitoring (media rolling dei "good" score)
2. Kolmogorov-Smirnov test tra distribuzione corrente e baseline
3. Alert adattivi con 3 livelli: OK / WARNING / CRITICAL
Strategia di aggiornamento:
- WARNING: aumenta frequenza di sampling manuale (ogni 100 schede invece di 1000)
- CRITICAL: blocca linea per re-calibrazione del modello
"""
def __init__(self,
baseline_scores_path: str,
window_size: int = 500,
warning_threshold: float = 0.05, # +5% drift
critical_threshold: float = 0.10, # +10% drift
ks_alpha: float = 0.01): # p-value per KS test
"""
baseline_scores_path: JSON con score di "good" boards baseline
window_size: numero di score da mantenere nella finestra rolling
"""
self.window_size = window_size
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.ks_alpha = ks_alpha
# Carica baseline
with open(baseline_scores_path) as f:
data = json.load(f)
self.baseline_scores = np.array(data['good_scores'])
self.baseline_mean = float(np.mean(self.baseline_scores))
self.baseline_std = float(np.std(self.baseline_scores))
print(f"Baseline caricata: {len(self.baseline_scores)} score")
print(f" Mean: {self.baseline_mean:.4f} ± {self.baseline_std:.4f}")
# Finestra rolling per i "good" score correnti
self.current_scores: deque = deque(maxlen=window_size)
self.report_history: list[DriftReport] = []
def record_score(self, anomaly_score: float,
is_good: bool) -> None:
"""
Registra lo score di una scheda.
is_good: True se la scheda è stata classificata come OK
(o confermata come buona dall'operatore)
"""
if is_good:
self.current_scores.append(anomaly_score)
def check_drift(self) -> DriftReport:
"""
Esegue il controllo drift sulla finestra corrente.
Returns: DriftReport con stato e azione consigliata.
"""
report = DriftReport(window_size=len(self.current_scores))
if len(self.current_scores) < 50:
# Non abbastanza dati per un test significativo
report.action_required = 'collecting_data'
return report
current = np.array(self.current_scores)
report.current_mean_score = float(np.mean(current))
report.baseline_mean_score = self.baseline_mean
# 1. Score drift (differenza dalla baseline in deviazioni standard)
report.score_drift = abs(
report.current_mean_score - self.baseline_mean
) / (self.baseline_std + 1e-10)
# 2. Kolmogorov-Smirnov test
# Confronta la distribuzione corrente con la baseline
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ks_stat, ks_pval = scipy_stats.ks_2samp(
self.baseline_scores, current
)
report.ks_statistic = float(ks_stat)
report.ks_p_value = float(ks_pval)
# 3. Classificazione del drift
distribution_changed = ks_pval < self.ks_alpha
mean_drifted_critical = report.score_drift > self.critical_threshold * 10
mean_drifted_warning = report.score_drift > self.warning_threshold * 10
if mean_drifted_critical and distribution_changed:
report.is_drifting = True
report.drift_level = 'critical'
report.action_required = 'STOP_LINE: re-calibrate model immediately'
elif mean_drifted_warning or distribution_changed:
report.is_drifting = True
report.drift_level = 'warning'
report.action_required = 'increase_sampling: inspect every 100 boards'
else:
report.drift_level = 'none'
report.action_required = 'continue_normal_operation'
self.report_history.append(report)
return report
def suggest_retraining(self, current_auroc: float,
target_auroc: float = 0.99) -> dict:
"""
Suggerisce una strategia di retraining basata sulle metriche correnti.
"""
auroc_drop = target_auroc - current_auroc
latest_report = self.report_history[-1] if self.report_history else None
strategy = {
'retrain_needed': current_auroc < target_auroc - 0.005,
'current_auroc': current_auroc,
'target_auroc': target_auroc,
'auroc_gap': auroc_drop,
}
if current_auroc >= target_auroc:
strategy['action'] = 'no_action_needed'
elif auroc_drop < 0.02:
# Piccola degradazione: aggiorna solo memory bank con nuovi campioni buoni
strategy['action'] = 'update_memory_bank'
strategy['description'] = (
'Aggiungi 200-500 nuove immagini buone alla memory bank. '
'Non richiede re-training del backbone.'
)
elif auroc_drop < 0.05:
# Degradazione media: fine-tune con nuovi campioni
strategy['action'] = 'incremental_finetune'
strategy['description'] = (
'Fine-tune backbone con mix di dati vecchi (70%) e nuovi (30%). '
'Learning rate basso: 1e-5. 10-20 epoch.'
)
else:
# Degradazione severa: retrain completo
strategy['action'] = 'full_retrain'
strategy['description'] = (
'Rebuild completo della memory bank con dati correnti. '
'Considera re-annotazione se sono cambiati i tipi di difetti.'
)
return strategy
# ---- Threshold adattivo basato su SLA di produzione ----
def compute_adaptive_threshold(model_scores_good: np.ndarray,
model_scores_defective: np.ndarray,
max_fnr: float = 0.003, # SLA: max 0.3% FNR
max_fpr: float = 0.02) -> dict:
"""
Calcola la soglia ottimale per rispettare gli SLA di produzione.
Priorità: FNR (difetti sfuggiti) è il vincolo critico.
FPR (falsi allarmi) è secondario.
Returns: {threshold, fnr, fpr, tradeoff_note}
"""
# Genera candidati threshold
all_scores = np.concatenate([model_scores_good, model_scores_defective])
thresholds = np.percentile(all_scores, np.arange(1, 100, 0.5))
best_threshold = None
best_fpr = float('inf')
for t in thresholds:
# Predizioni
tp = np.sum(model_scores_defective > t)
fn = np.sum(model_scores_defective <= t)
fp = np.sum(model_scores_good > t)
tn = np.sum(model_scores_good <= t)
fnr = fn / (fn + tp + 1e-10)
fpr = fp / (fp + tn + 1e-10)
# Rispetta il vincolo FNR e minimizza FPR
if fnr <= max_fnr and fpr < best_fpr:
best_fpr = fpr
best_threshold = t
if best_threshold is None:
# Fallback: usa EER se non riesce a rispettare FNR SLA
import warnings
warnings.warn("Impossibile rispettare FNR SLA. Usando EER come fallback.")
# ... (implementazione EER omessa per brevita)
best_threshold = np.median(all_scores)
best_fpr = 0.05
# Metriche finali alla soglia scelta
tp = np.sum(model_scores_defective > best_threshold)
fn = np.sum(model_scores_defective <= best_threshold)
fp = np.sum(model_scores_good > best_threshold)
tn = np.sum(model_scores_good <= best_threshold)
fnr_final = fn / (fn + tp + 1e-10)
fpr_final = fp / (fp + tn + 1e-10)
return {
'threshold': float(best_threshold),
'fnr': float(fnr_final),
'fpr': float(fpr_final),
'precision': float(tp / (tp + fp + 1e-10)),
'recall': float(tp / (tp + fn + 1e-10)),
'meets_fnr_sla': fnr_final <= max_fnr,
'meets_fpr_target': fpr_final <= max_fpr,
}
8. Sonuçlar ve Öğrenilen Dersler
Gerçek Projenin Sonuçları (üretimden 3 ay sonra)
| Metrik | İnsan Muayenesi | Vizyon Yapay Zeka sistemi | Gelişim |
|---|---|---|---|
| Verim | 20 kart/dak | 140 kart/dak | 7x |
| Yanlış Negatif Oranı | %2-15 (çabaya göre değişir) | %0,3 | ~10x |
| Yanlış Pozitif Oranı | %0,5 | %1,4 | -2,8x (kötüleşti) |
| Muayene/sayfa maliyeti | €0,12 | €0,02 | 6 kat azalma |
| Sahada gözden kaçan hatalar | 23 talep/ay | 3 talep/ay | %87 azalma |
Projenin 5 Temel Dersi
- Yanlış pozitif oranı tartışılabilir, yanlış negatif oranı ise şöyle değildir: müşteri birkaç iyi kartı daha atmayı kabul eder, ancak sahadaki kusurlu kartları tolere etmez. Eşiği her zaman önce FNR kısıtlamasını karşılayacak şekilde tasarlayın.
- Aydınlatma en kritik bileşendir: İlk yanlış pozitiflerin %60'ı gerçek kusurlardan değil, aydınlatma değişikliklerinden kaynaklanıyordu. Modeli düşünmeden önce kontrollü bir aydınlatma sistemine (LED flaş, difüzör kubbesi) yatırım yapın.
- PatchCore yavaş yavaş bozulur ancak bozulur: 3 ay sonra AUROC, bellek bankasında olmayan yeni PCB tasarımları için %99,1'den %97,8'e düştü. Artımlı bir bellek bankası yükseltme stratejisi planlayın.
- Operatör için yerelleştirme çok önemlidir: ikili “OK/NOK” kararı yeterli değildir. Kusurun NEREDE bulunduğunu gösteren anormallik haritası, operatörün yanlış pozitifleri manuel olarak doğrulamak için gereken analiz süresini %70 oranında azaltır.
- Yalnızca doğruluğu değil, puan dağılımını da izleyin: model sapması sınıflandırma metriklerinden ziyade ilk olarak puanların dağılımında görülmektedir. "İyi" puanların ortalaması tarihsel temel çizginin üzerine çıkarsa uyarı verin.
Serinin Sonuçları
Bu vaka çalışması, Derin Öğrenme ile Bilgisayarlı Görme serimizi sonlandırıyor. bizde temel CNN'lerden transfer öğrenimine ve tespitten tüm süreci kapsıyor YOLO26 ile nesneleri segmentasyona, büyütmeden üretim hatlarına kadar OpenCV, uç dağıtımdan yüz tanımaya ve bu gerçek endüstriyel kullanım durumuna kadar.
Bilgisayarla görme pratik bir disiplindir: en iyi sonuçlar bunu anlayanlardan gelir. hem teori (mimariler, kayıp fonksiyonları, metrikler) hem de sistem mühendisliği (ön işleme, dağıtım, izleme). Alan hızla gelişiyor - YOLO26 Ocak 2026'da piyasaya sürülen SAM2, etkileşimli segmentasyonda devrim yarattı - ancak ben Temel prensipler sabit kalıyor.
Seri Gezintisi
- Öncesi: Yüz Algılama ve Tanıma: Modern Teknikler
- Serinin başlangıcı: CNN: Sıfırdan Üretime Evrişimli Ağlar
Seriler Arası Kaynaklar
- MLOps: İzleme ve Model Sapması Tespiti - üretimde gelişmiş izleme
- Gelişmiş Derin Öğrenme: Niceleme ve Sıkıştırma - model optimizasyonu
- Yapay Zeka Mühendisliği: RAG ve Vektör Arama - AI sistemleriyle entegrasyon







