Face Detection e Recognition: MediaPipe, MTCNN e FaceNet
Il riconoscimento facciale e una delle applicazioni di computer vision più mature e diffuse: dai sistemi di sicurezza agli smartphone, dal controllo accessi alle analisi demografiche nel retail. Eppure, implementarla correttamente - con attenzione all'accuratezza, alla velocità e soprattutto all'etica - richiede una comprensione profonda delle tecniche coinvolte.
In questo articolo esploreremo l'intero stack: face detection (trovare i volti in un'immagine), face alignment (normalizzazione geometrica), face embedding (rappresentazione vettoriale) e face verification/identification. Useremo MediaPipe per il realtime, MTCNN per la precisione, e FaceNet/ArcFace per il riconoscimento.
Cosa Imparerai
- Pipeline face detection vs face recognition: differenze e use cases
- MediaPipe Face Detection: veloce, leggero, cross-platform
- MediaPipe Face Mesh: 468 landmark facciali in real-time
- MTCNN: Multi-task Cascaded CNN per detection precisa
- Face alignment: normalizzazione geometrica con landmark
- Face embedding: FaceNet e ArcFace per rappresentazioni compatte
- Face verification (1:1) e identification (1:N)
- Costruire un sistema di riconoscimento da zero con una face database
- Considerazioni etiche e legal: GDPR, bias, consent
1. Face Detection vs Face Recognition: La Pipeline Completa
Il termine "riconoscimento facciale" spesso raggruppa due task distinti con requisiti tecnici molto diversi:
Componenti della Pipeline Facciale
| Fase | Task | Output | Modello Tipico |
|---|---|---|---|
| Detection | Trovare posizione volti | Bounding boxes | MediaPipe, MTCNN, RetinaFace |
| Alignment | Normalizzare geometria | Immagine 112x112 normalizzata | Similitudine affine con landmark |
| Embedding | Estrarre feature descriptor | Vettore 128-512D | FaceNet, ArcFace, AdaFace |
| Verification | Stessa persona? (1:1) | Similarity score, boolean | Distanza coseno tra embedding |
| Identification | Chi e? (1:N) | Identity + confidence | KNN su database di embedding |
2. MediaPipe: Face Detection e Face Mesh
MediaPipe di Google e il framework più pratico per la face detection real-time su CPU. Il modello BlazeFace e specificatamente ottimizzato per la velocità su dispositivi mobili e embedded, raggiungendo 200+ FPS su un laptop moderno.
2.1 Face Detection con MediaPipe
import mediapipe as mp
import cv2
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class FaceDetection:
"""Risultato di detection per un singolo volto."""
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
confidence: float
keypoints: dict[str, tuple[int, int]] # nome -> (x, y) in pixel
class MediaPipeFaceDetector:
"""
Face detector basato su MediaPipe BlazeFace.
Velocissimo su CPU: 200+ FPS su immagini 640x480.
Ottimo per real-time, non per immagini ad alta densita di volti.
"""
KEYPOINT_NAMES = [
'right_eye', 'left_eye', 'nose_tip',
'mouth_center', 'right_ear_tragion', 'left_ear_tragion'
]
def __init__(self, min_confidence: float = 0.5,
model_selection: int = 0):
"""
model_selection:
0 = short range (entro 2m, più veloce)
1 = full range (fino a 5m, più accurato)
"""
self.mp_face = mp.solutions.face_detection
self.detector = self.mp_face.FaceDetection(
model_selection=model_selection,
min_detection_confidence=min_confidence
)
self.mp_draw = mp.solutions.drawing_utils
def detect(self, img_bgr: np.ndarray) -> list[FaceDetection]:
"""Rileva volti in un'immagine BGR."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.detector.process(img_rgb)
faces = []
if not results.detections:
return faces
for detection in results.detections:
score = detection.score[0]
bbox_rel = detection.location_data.relative_bounding_box
# Coordinate relative -> pixel
x1 = max(0, int(bbox_rel.xmin * w))
y1 = max(0, int(bbox_rel.ymin * h))
x2 = min(w, int((bbox_rel.xmin + bbox_rel.width) * w))
y2 = min(h, int((bbox_rel.ymin + bbox_rel.height) * h))
# Keypoints (occhi, naso, bocca, orecchie)
keypoints = {}
for idx, name in enumerate(self.KEYPOINT_NAMES):
kp = detection.location_data.relative_keypoints[idx]
keypoints[name] = (int(kp.x * w), int(kp.y * h))
faces.append(FaceDetection(
bbox=(x1, y1, x2, y2),
confidence=float(score),
keypoints=keypoints
))
return faces
def draw(self, img_bgr: np.ndarray,
faces: list[FaceDetection]) -> np.ndarray:
"""Annota immagine con i risultati della detection."""
annotated = img_bgr.copy()
for face in faces:
x1, y1, x2, y2 = face.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(annotated, f"{face.confidence:.2f}",
(x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 255, 0), 2)
# Disegna keypoints
for name, (kx, ky) in face.keypoints.items():
color = (0, 0, 255) if 'eye' in name else (255, 0, 0)
cv2.circle(annotated, (kx, ky), 4, color, -1)
return annotated
# Utilizzo: detection su webcam in real-time
def run_face_detection_webcam() -> None:
detector = MediaPipeFaceDetector(min_confidence=0.5)
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
faces = detector.detect(frame)
annotated = detector.draw(frame, faces)
cv2.putText(annotated, f"Faces: {len(faces)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('MediaPipe Face Detection', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
2.2 Face Mesh: 468 Landmark in Real-Time
Il modello Face Mesh di MediaPipe estrae 468 landmark 3D (x, y, z) del volto. Utile per face alignment, emotion estimation, AR filters, eye gaze e detection di colpi di sonno (eye aspect ratio).
import mediapipe as mp
import cv2
import numpy as np
class FaceMeshAnalyzer:
"""
MediaPipe Face Mesh: 468 landmark 3D in real-time.
Utilita incluse: eye aspect ratio (sonnolenza), head pose, ecc.
"""
# Indici dei landmark MediaPipe per occhi
LEFT_EYE_IDX = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_IDX = [33, 160, 158, 133, 153, 144]
def __init__(self, max_faces: int = 1,
refine_landmarks: bool = True):
"""
refine_landmarks=True: aggiunge landmark attorno agli occhi
e alle iridi (468 -> 478 punti totali).
"""
self.mp_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_mesh.FaceMesh(
max_num_faces=max_faces,
refine_landmarks=refine_landmarks,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.mp_draw = mp.solutions.drawing_utils
self.mp_styles = mp.solutions.drawing_styles
def process(self, img_bgr: np.ndarray) -> Optional[list]:
"""Processa immagine e restituisce lista di landmark per ogni volto."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
if not results.multi_face_landmarks:
return None
all_faces_lm = []
for face_landmarks in results.multi_face_landmarks:
# Converti da coordinate normalizzate a pixel
lm_pixels = []
for lm in face_landmarks.landmark:
lm_pixels.append((int(lm.x * w), int(lm.y * h), lm.z))
all_faces_lm.append(lm_pixels)
return all_faces_lm
def eye_aspect_ratio(self, landmarks: list,
eye_indices: list) -> float:
"""
Eye Aspect Ratio (EAR) - indicatore di sonnolenza.
EAR < 0.2 per 20+ frame consecutivi = occhio chiuso.
Formula: EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)
"""
pts = [np.array(landmarks[i][:2]) for i in eye_indices]
# Distanze verticali
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
# Distanza orizzontale
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C) if C > 0 else 0.0
def draw_mesh(self, img_bgr: np.ndarray,
results_raw) -> np.ndarray:
"""Disegna la mesh completa con stili MediaPipe predefiniti."""
annotated = img_bgr.copy()
if results_raw and results_raw.multi_face_landmarks:
for face_lm in results_raw.multi_face_landmarks:
self.mp_draw.draw_landmarks(
image=annotated,
landmark_list=face_lm,
connections=self.mp_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=self.mp_styles
.get_default_face_mesh_tesselation_style()
)
return annotated
# Rilevamento sonnolenza con Eye Aspect Ratio
def drowsiness_detector(threshold: float = 0.22,
consec_frames: int = 20) -> None:
"""Sistema di alert sonnolenza basato su EAR."""
analyzer = FaceMeshAnalyzer(max_faces=1)
cap = cv2.VideoCapture(0)
ear_counter = 0
alert_active = False
while True:
ret, frame = cap.read()
if not ret:
break
landmarks_list = analyzer.process(frame)
if landmarks_list:
lms = landmarks_list[0] # primo volto
ear_l = analyzer.eye_aspect_ratio(lms, analyzer.LEFT_EYE_IDX)
ear_r = analyzer.eye_aspect_ratio(lms, analyzer.RIGHT_EYE_IDX)
avg_ear = (ear_l + ear_r) / 2.0
if avg_ear < threshold:
ear_counter += 1
if ear_counter >= consec_frames:
alert_active = True
cv2.putText(frame, "ALERT: SONNOLENZA!",
(50, 200), cv2.FONT_HERSHEY_SIMPLEX,
1.5, (0, 0, 255), 3)
else:
ear_counter = 0
alert_active = False
cv2.putText(frame, f"EAR: {avg_ear:.3f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2)
cv2.imshow('Drowsiness Detector', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
3. MTCNN: Multi-Task Cascaded CNN
MTCNN e un detector a tre stadi (P-Net, R-Net, O-Net) che bilancia velocità e precisione. E il gold standard per la detection precisa in sistemi di riconoscimento: individua volti con 5 landmark (occhi, naso, angoli bocca), necessari per il face alignment. Più lento di MediaPipe ma più robusto in condizioni difficili.
from mtcnn import MTCNN
import cv2
import numpy as np
from PIL import Image
class MTCNNFaceProcessor:
"""
MTCNN per detection precisa + face alignment.
Produce immagini 112x112 normalizzate, ottimali per FaceNet/ArcFace.
"""
def __init__(self, min_face_size: int = 40,
thresholds: list = None,
scale_factor: float = 0.709):
self.detector = MTCNN(
min_face_size=min_face_size,
thresholds=thresholds or [0.6, 0.7, 0.7],
scale_factor=scale_factor
)
def detect_and_align(self, img_bgr: np.ndarray,
output_size: int = 112) -> list[np.ndarray]:
"""
Rileva volti e li restituisce allineati (112x112 default).
L'allineamento usa una trasformazione affine sui 5 landmark
per portare gli occhi in posizione canonica.
Returns: lista di immagini volto allineate (BGR, float32 [0,1])
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
aligned_faces = []
for det in detections:
if det['confidence'] < 0.90:
continue
keypoints = det['keypoints']
src_pts = np.array([
keypoints['left_eye'],
keypoints['right_eye'],
keypoints['nose'],
keypoints['mouth_left'],
keypoints['mouth_right']
], dtype=np.float32)
# Punti di destinazione canonici per 112x112
dst_pts = np.array([
[38.2946, 51.6963],
[73.5318, 51.6963],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.3655]
], dtype=np.float32)
# Scale per output_size diversi da 112
scale = output_size / 112.0
dst_pts *= scale
# Trasformazione affine -> immagine allineata
M = cv2.estimateAffinePartial2D(src_pts, dst_pts)[0]
aligned = cv2.warpAffine(img_bgr, M, (output_size, output_size))
aligned_faces.append(aligned.astype(np.float32) / 255.0)
return aligned_faces
def detect_with_info(self, img_bgr: np.ndarray) -> list[dict]:
"""Rileva volti con tutte le informazioni MTCNN."""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
results = []
h, w = img_bgr.shape[:2]
for det in detections:
x, y, bw, bh = det['box']
x1 = max(0, x)
y1 = max(0, y)
x2 = min(w, x + bw)
y2 = min(h, y + bh)
results.append({
'bbox': (x1, y1, x2, y2),
'confidence': det['confidence'],
'keypoints': det['keypoints']
})
return results
4. Face Recognition: FaceNet e ArcFace
Dopo la detection e l'alignment, il cuore del sistema di riconoscimento e il face embedding model: una rete neurale che trasforma un'immagine 112x112 in un vettore di 128-512 dimensioni. Volti della stessa persona producono vettori vicini nello spazio; volti diversi sono lontani.
Confronto Modelli di Face Embedding
| Modello | Embedding Dim | Loss | LFW Acc. | Dimensione |
|---|---|---|---|---|
| FaceNet (Google) | 128 | Triplet Loss | 99.63% | 90 MB |
| ArcFace (InsightFace) | 512 | ArcFace Loss | 99.83% | 249 MB |
| AdaFace | 512 | AdaFace Loss | 99.82% | 249 MB |
| MobileFaceNet (edge) | 128 | ArcFace Loss | 99.55% | 4 MB |
import insightface
from insightface.app import FaceAnalysis
import numpy as np
import cv2
import pickle
from pathlib import Path
from sklearn.preprocessing import normalize
from sklearn.neighbors import KNeighborsClassifier
class FaceRecognitionSystem:
"""
Sistema completo di face recognition basato su InsightFace (ArcFace).
Supporta registrazione di nuove identità e riconoscimento real-time.
Installazione: pip install insightface onnxruntime scikit-learn
"""
def __init__(self, db_path: str = 'face_db.pkl',
recognition_threshold: float = 0.5):
"""
recognition_threshold: soglia coseno per considerare un match
(0.5 e un buon default per ArcFace 512D)
"""
# Inizializza FaceAnalysis (detection + embedding in un'unica API)
self.app = FaceAnalysis(
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.db_path = Path(db_path)
self.threshold = recognition_threshold
self.database: dict[str, list[np.ndarray]] = {}
self.knn: Optional[KNeighborsClassifier] = None
if self.db_path.exists():
self._load_database()
def register_person(self, name: str,
images: list[np.ndarray],
max_faces_per_image: int = 1) -> int:
"""
Registra una nuova persona nel database.
name: identificatore della persona
images: lista di immagini BGR (almeno 5 per robustezza)
Returns: numero di embedding registrati con successo
"""
embeddings = []
for img in images:
faces = self.app.get(img)
if not faces:
continue
# Prendi il volto più grande (per immagini con una persona)
face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) *
(f.bbox[3]-f.bbox[1]))
emb = normalize(face.embedding.reshape(1, -1))[0]
embeddings.append(emb)
if len(embeddings) >= max_faces_per_image * len(images):
break
if not embeddings:
print(f"[WARN] Nessun volto rilevato per {name}")
return 0
if name not in self.database:
self.database[name] = []
self.database[name].extend(embeddings)
self._rebuild_knn()
self._save_database()
print(f"Registrato {name}: {len(embeddings)} embedding")
return len(embeddings)
def recognize(self, img_bgr: np.ndarray) -> list[dict]:
"""
Riconosce tutti i volti in un'immagine.
Returns: lista di dict con bbox, identity, confidence per ogni volto
"""
faces = self.app.get(img_bgr)
results = []
for face in faces:
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = self._match_embedding(emb)
x1, y1, x2, y2 = face.bbox.astype(int)
results.append({
'bbox': (x1, y1, x2, y2),
'identity': identity,
'confidence': confidence,
'is_known': confidence >= self.threshold
})
return results
def _match_embedding(self, emb: np.ndarray) -> tuple[str, float]:
"""Trova la corrispondenza migliore nel database."""
if not self.database or self.knn is None:
return ('unknown', 0.0)
# Usa KNN con metrica coseno (1 - cosine_similarity = cosine_distance)
dist, idx = self.knn.kneighbors([emb], n_neighbors=1)
labels = [name for name, embs in self.database.items()
for _ in embs]
best_name = labels[idx[0][0]]
similarity = 1.0 - dist[0][0] # da distanza coseno a similarità
return (best_name, float(similarity))
def _rebuild_knn(self) -> None:
"""Ricostruisce il classificatore KNN dopo aggiornamenti al DB."""
all_embs = []
all_labels = []
for name, embs in self.database.items():
all_embs.extend(embs)
all_labels.extend([name] * len(embs))
if len(all_embs) < 2:
return
self.knn = KNeighborsClassifier(
n_neighbors=min(3, len(all_embs)),
metric='cosine',
algorithm='brute'
)
self.knn.fit(np.array(all_embs), all_labels)
def _save_database(self) -> None:
with open(self.db_path, 'wb') as f:
pickle.dump(self.database, f)
def _load_database(self) -> None:
with open(self.db_path, 'rb') as f:
self.database = pickle.load(f)
self._rebuild_knn()
print(f"Database caricato: {len(self.database)} identità")
def annotate(self, img_bgr: np.ndarray,
results: list[dict]) -> np.ndarray:
"""Annota l'immagine con i risultati del riconoscimento."""
annotated = img_bgr.copy()
for r in results:
x1, y1, x2, y2 = r['bbox']
color = (0, 255, 0) if r['is_known'] else (0, 0, 255)
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = (f"{r['identity']} ({r['confidence']:.2f})"
if r['is_known'] else "Unknown")
cv2.putText(annotated, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return annotated
5. Face Verification: Soglia e ROC Curve
La face verification risponde alla domanda: "queste due foto mostrano la stessa persona?". E un problema di matching 1:1, diverso dall'identification (1:N). La chiave e scegliere la soglia di similarità corretta tramite analisi della ROC curve.
import numpy as np
from sklearn.metrics import roc_curve, auc
import matplotlib
matplotlib.use('Agg') # Per ambienti senza display
import matplotlib.pyplot as plt
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Similarità coseno tra due embedding normalizzati."""
emb1_n = emb1 / (np.linalg.norm(emb1) + 1e-10)
emb2_n = emb2 / (np.linalg.norm(emb2) + 1e-10)
return float(np.dot(emb1_n, emb2_n))
def find_optimal_threshold(same_person_pairs: list[tuple],
diff_person_pairs: list[tuple]) -> dict:
"""
Trova la soglia ottimale analizzando la ROC curve.
same_person_pairs: lista di coppie (emb1, emb2) della stessa persona
diff_person_pairs: lista di coppie (emb1, emb2) di persone diverse
Returns: {threshold, eer, auc, far, frr}
"""
scores = []
labels = []
for emb1, emb2 in same_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(1) # stessa persona
for emb1, emb2 in diff_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(0) # persone diverse
scores_arr = np.array(scores)
labels_arr = np.array(labels)
# ROC curve
fpr, tpr, thresholds = roc_curve(labels_arr, scores_arr)
roc_auc = auc(fpr, tpr)
# Equal Error Rate (EER): punto dove FAR = FRR
fnr = 1 - tpr
eer_idx = np.argmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2.0
optimal_threshold = thresholds[eer_idx]
# Metriche a soglia ottimale
predictions = (scores_arr >= optimal_threshold).astype(int)
tp = np.sum((predictions == 1) & (labels_arr == 1))
fp = np.sum((predictions == 1) & (labels_arr == 0))
fn = np.sum((predictions == 0) & (labels_arr == 1))
tn = np.sum((predictions == 0) & (labels_arr == 0))
far = fp / (fp + tn) if (fp + tn) > 0 else 0 # False Accept Rate
frr = fn / (fn + tp) if (fn + tp) > 0 else 0 # False Reject Rate
print(f"=== Face Verification Metrics ===")
print(f"AUC-ROC: {roc_auc:.4f}")
print(f"EER: {eer:.4f} ({eer*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FAR @ EER: {far:.4f} ({far*100:.2f}%)")
print(f"FRR @ EER: {frr:.4f} ({frr*100:.2f}%)")
return {
'threshold': float(optimal_threshold),
'eer': float(eer),
'auc': float(roc_auc),
'far': float(far),
'frr': float(frr)
}
6. Anti-Spoofing e Liveness Detection
Un sistema di face recognition senza liveness detection e vulnerabile agli attacchi di spoofing: basta una foto stampata, un video su smartphone, o una maschera 3D per ingannare la maggior parte dei detector. La liveness detection distingue un volto reale da un artefatto.
Tipologie di Attacchi di Spoofing
| Tipo di Attacco | Descrizione | Difficolta di Difesa | Tecnica di Mitigazione |
|---|---|---|---|
| Print Attack | Foto stampata su carta/carta lucida | Bassa | Texture analysis, moaré pattern detection |
| Replay Attack | Video del volto su schermo | Media | Screen reflection detection, 3D depth |
| 3D Mask | Maschera realistica stampata in 3D | Alta | IR sensor, challenge-response, micromotion |
| Deepfake Video | Video sintetico generato da AI | Molto Alta | Deepfake detector, blood flow analysis |
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from typing import Optional
class LivenessDetector:
"""
Sistema di liveness detection basato su due segnali complementari:
1. Texture analysis (LBP-based + CNN) - rileva print attacks
2. Micro-motion analysis - rileva replay attacks (video statici non hanno micro-movimenti)
Per deployment serio, considera: SilentFace, FAS-SGTD, CentralDiff-CNN
"""
def __init__(self, model_path: Optional[str] = None,
device: str = 'auto'):
self.device = torch.device(
'cuda' if torch.cuda.is_available() and device == 'auto'
else 'cpu'
)
# Modello CNN per texture analysis (fine-tuned su dataset anti-spoofing)
# Dataset: CelebA-Spoof, OULU-NPU, MSU-MFSD
self.model = self._build_model(model_path)
self.model.eval()
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Buffer per micro-motion analysis
self.frame_buffer: list[np.ndarray] = []
self.buffer_size = 10 # 10 frame ~= 333ms @ 30FPS
def _build_model(self, model_path: Optional[str]) -> nn.Module:
"""
MobileNetV2 fine-tuned per binary classification: real vs spoof.
MobileNetV2 perchè e leggero (3.4M params) e veloce su CPU/edge.
"""
model = models.mobilenet_v2(pretrained=False)
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(model.last_channel, 2) # [spoof, real]
)
if model_path:
state_dict = torch.load(model_path, map_location=self.device)
model.load_state_dict(state_dict)
return model.to(self.device)
def is_live_texture(self, face_roi: np.ndarray,
threshold: float = 0.7) -> tuple[bool, float]:
"""
Analisi texture CNN: classifica il volto come reale o spoof.
face_roi: crop del volto BGR [H, W, 3]
threshold: soglia per considerare il volto reale
Returns: (is_live, confidence_score)
"""
img_rgb = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1)
live_prob = probs[0, 1].item() # indice 1 = "real"
return live_prob >= threshold, live_prob
def compute_lbp_features(self, gray: np.ndarray,
radius: int = 3, n_points: int = 24) -> np.ndarray:
"""
Local Binary Pattern (LBP) texture descriptor.
Le stampe su carta hanno pattern LBP caratteristici (moaré).
Feature complementare alla CNN per robustezza.
"""
h, w = gray.shape
lbp = np.zeros_like(gray, dtype=np.uint8)
for r in range(radius, h - radius):
for c in range(radius, w - radius):
center = int(gray[r, c])
code = 0
for p in range(n_points):
angle = 2 * np.pi * p / n_points
nr = r - int(radius * np.sin(angle))
nc = c + int(radius * np.cos(angle))
nr = np.clip(nr, 0, h - 1)
nc = np.clip(nc, 0, w - 1)
code |= (int(gray[nr, nc]) >= center) << p
lbp[r, c] = code % 256
# Istogramma LBP come feature vector
hist, _ = np.histogram(lbp.ravel(), bins=256, range=(0, 256))
hist = hist.astype(float)
hist /= (hist.sum() + 1e-7)
return hist
def analyze_micro_motion(self, frame_bgr: np.ndarray) -> tuple[bool, float]:
"""
Analisi micro-movimento: rileva movimenti naturali del volto (micro-espressioni,
respiro, battito ciglia) assenti in foto/video statici.
Returns: (has_micro_motion, motion_score)
Un video replay di solito ha motion_score < 0.5
"""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
self.frame_buffer.append(gray)
if len(self.frame_buffer) > self.buffer_size:
self.frame_buffer.pop(0)
if len(self.frame_buffer) < 3:
return True, 1.0 # Non abbastanza frame, assumi live
# Optical flow su ultimi 3 frame
flow_magnitudes = []
for i in range(len(self.frame_buffer) - 2, len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
flow_magnitudes.append(np.mean(magnitude))
avg_motion = np.mean(flow_magnitudes)
# Calcola varianza del motion (micro-movimenti irregolari = live)
if len(self.frame_buffer) >= self.buffer_size:
all_flows = []
for i in range(len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
all_flows.append(np.mean(magnitude))
motion_variance = np.var(all_flows)
# Alta varianza = movimenti naturali irregolari = live
motion_score = min(1.0, motion_variance * 100)
else:
motion_score = 0.5
return motion_score > 0.3, float(motion_score)
def predict(self, face_roi: np.ndarray,
frame_bgr: np.ndarray) -> dict:
"""
Prediction combinata: texture CNN + micro-motion.
Fusion con regola AND conservativa per sicurezza.
"""
is_live_tex, tex_score = self.is_live_texture(face_roi)
has_motion, motion_score = self.analyze_micro_motion(frame_bgr)
# Logica di fusione: entrambi i segnali devono concordare
combined_score = 0.6 * tex_score + 0.4 * motion_score
is_live = is_live_tex and (motion_score > 0.2)
return {
'is_live': is_live,
'combined_score': combined_score,
'texture_score': tex_score,
'motion_score': motion_score,
'verdict': 'LIVE' if is_live else 'SPOOF'
}
# Pipeline completa: face recognition + liveness check
def secure_recognition_pipeline(recognizer, liveness_detector, frame_bgr):
"""
Pipeline sicura: prima verifica liveness, poi riconosce.
Se il volto non e live, non procedere con il riconoscimento.
"""
# 1. Rileva volti
faces = recognizer.app.get(frame_bgr)
results = []
for face in faces:
x1, y1, x2, y2 = face.bbox.astype(int)
face_roi = frame_bgr[max(0,y1):y2, max(0,x1):x2]
if face_roi.size == 0:
continue
# 2. Liveness check (PRIMA del riconoscimento)
liveness = liveness_detector.predict(face_roi, frame_bgr)
if not liveness['is_live']:
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'SPOOF',
'identity': None,
'liveness': liveness
})
continue
# 3. Face recognition (solo se live)
from sklearn.preprocessing import normalize
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = recognizer._match_embedding(emb)
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'LIVE',
'identity': identity if confidence > 0.5 else 'unknown',
'confidence': confidence,
'liveness': liveness
})
return results
7. Database Scalabile: FAISS per Milioni di Embedding
Il sistema con KNN scikit-learn funziona bene fino a ~10.000 embedding. Oltre quella soglia, la ricerca brute-force diventa il collo di bottiglia. FAISS (Facebook AI Similarity Search) scala a miliardi di vettori con ricerca approssimata in microsecondi.
import faiss
import numpy as np
import pickle
from pathlib import Path
class FAISSFaceDatabase:
"""
Database di embedding facciali scalabile con FAISS.
Ricerca approssimata (HNSW) per 1M+ embedding in < 1ms.
Installazione: pip install faiss-cpu (o faiss-gpu per GPU)
"""
def __init__(self, embedding_dim: int = 512,
db_path: str = 'faiss_face_db',
index_type: str = 'hnsw'):
"""
index_type:
'flat' - Ricerca esatta, O(n), per < 100K embedding
'hnsw' - Ricerca approssimata HNSW, per 100K - 10M embedding
'ivf' - Inverted File Index, per 10M+ embedding
"""
self.embedding_dim = embedding_dim
self.db_path = Path(db_path)
self.db_path.mkdir(exist_ok=True)
self.index_type = index_type
self.index = self._build_index()
self.id_to_name: dict[int, str] = {} # FAISS ID -> nome persona
self.next_id = 0
# Carica se esiste
if (self.db_path / 'index.faiss').exists():
self._load()
def _build_index(self) -> faiss.Index:
"""Costruisce indice FAISS appropriato."""
if self.index_type == 'flat':
# Ricerca esatta con distanza coseno (IP su vettori normalizzati)
index = faiss.IndexFlatIP(self.embedding_dim)
elif self.index_type == 'hnsw':
# HNSW: Hierarchical Navigable Small World
# M=32: connessioni per nodo (più alto = più accurato ma più RAM)
# ef_construction=200: qualità indice durante build
index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
index.hnsw.efConstruction = 200
index.hnsw.efSearch = 64 # tradeoff accuratezza/velocità a query time
elif self.index_type == 'ivf':
# IVF: divide lo spazio in cluster, cerca solo nei cluster più vicini
n_lists = 100 # numero di cluster (sqrt(N) e una buona regola)
quantizer = faiss.IndexFlatIP(self.embedding_dim)
index = faiss.IndexIVFFlat(quantizer, self.embedding_dim, n_lists,
faiss.METRIC_INNER_PRODUCT)
else:
raise ValueError(f"Tipo indice sconosciuto: {self.index_type}")
return index
def add_embedding(self, name: str,
embedding: np.ndarray) -> int:
"""
Aggiunge un embedding al database.
Normalizza L2 per usare inner product come similarità coseno.
"""
emb_norm = embedding / (np.linalg.norm(embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
# IVF richiede training prima del primo add
if self.index_type == 'ivf' and not self.index.is_trained:
print("Training IVF index... (richiede un batch iniziale)")
# In pratica: train con tutti gli embedding prima di usare
self.index.train(emb_norm)
self.index.add(emb_norm)
self.id_to_name[self.next_id] = name
self.next_id += 1
return self.next_id - 1
def add_person(self, name: str,
embeddings: list[np.ndarray]) -> int:
"""Aggiunge più embedding per la stessa persona."""
for emb in embeddings:
self.add_embedding(name, emb)
return len(embeddings)
def search(self, query_embedding: np.ndarray,
k: int = 1,
min_similarity: float = 0.5) -> list[dict]:
"""
Cerca i k embedding più simili nel database.
Returns: lista di {name, similarity, faiss_id}
Ordinate per similarità decrescente.
"""
if self.next_id == 0:
return []
emb_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
actual_k = min(k, self.next_id)
similarities, indices = self.index.search(emb_norm, actual_k)
results = []
for sim, idx in zip(similarities[0], indices[0]):
if idx == -1 or sim < min_similarity:
continue
results.append({
'name': self.id_to_name.get(int(idx), 'unknown'),
'similarity': float(sim),
'faiss_id': int(idx)
})
return results
def identify(self, query_embedding: np.ndarray,
threshold: float = 0.5) -> tuple[str, float]:
"""Identifica la persona con maggiore similarità."""
results = self.search(query_embedding, k=3)
if not results:
return 'unknown', 0.0
# Voto di maggioranza tra top-3 (robustezza)
from collections import Counter
names = [r['name'] for r in results]
best_name = Counter(names).most_common(1)[0][0]
best_sim = max(r['similarity'] for r in results
if r['name'] == best_name)
if best_sim < threshold:
return 'unknown', best_sim
return best_name, best_sim
def save(self) -> None:
"""Salva indice FAISS e mapping ID->nome su disco."""
faiss.write_index(self.index,
str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'wb') as f:
pickle.dump({'id_to_name': self.id_to_name,
'next_id': self.next_id}, f)
print(f"Database salvato: {self.next_id} embedding")
def _load(self) -> None:
"""Carica indice FAISS e mapping da disco."""
self.index = faiss.read_index(str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'rb') as f:
data = pickle.load(f)
self.id_to_name = data['id_to_name']
self.next_id = data['next_id']
print(f"Database caricato: {self.next_id} embedding, "
f"{len(set(self.id_to_name.values()))} identità")
def stats(self) -> dict:
"""Statistiche del database."""
names = list(self.id_to_name.values())
from collections import Counter
name_counts = Counter(names)
return {
'total_embeddings': self.next_id,
'total_identities': len(name_counts),
'avg_embeddings_per_person': np.mean(list(name_counts.values()))
if name_counts else 0,
'index_type': self.index_type
}
# Benchmark: KNN sklearn vs FAISS per database di varie dimensioni
def benchmark_search_backends(n_identities: int = 10000,
embs_per_person: int = 5) -> None:
"""Confronta tempi di ricerca KNN vs FAISS."""
import time
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import normalize
n_total = n_identities * embs_per_person
dim = 512
# Genera embedding sintetici
embeddings = np.random.randn(n_total, dim).astype(np.float32)
embeddings = normalize(embeddings)
labels = np.repeat(np.arange(n_identities), embs_per_person)
query = np.random.randn(1, dim).astype(np.float32)
query = normalize(query)
# KNN sklearn
knn = KNeighborsClassifier(n_neighbors=3, metric='cosine', algorithm='brute')
knn.fit(embeddings, labels)
t0 = time.perf_counter()
for _ in range(100):
knn.predict(query)
knn_time = (time.perf_counter() - t0) / 100 * 1000
# FAISS HNSW
index = faiss.IndexHNSWFlat(dim, 32)
index.add(embeddings)
t0 = time.perf_counter()
for _ in range(100):
index.search(query, 3)
faiss_time = (time.perf_counter() - t0) / 100 * 1000
print(f"\nBenchmark ricerca ({n_total:,} embedding, dim={dim}):")
print(f" KNN sklearn: {knn_time:.2f} ms/query")
print(f" FAISS HNSW: {faiss_time:.3f} ms/query")
print(f" Speedup: {knn_time/faiss_time:.0f}x")
8. Considerazioni Etiche e Legali
Attenzione: Dati Biometrici sotto GDPR
I dati facciali sono dati biometrici secondo il GDPR (Art. 9) e il loro trattamento e soggetto a restrizioni severe. In Italia e nell'UE:
- Consenso esplicito obbligatorio: Non si possono raccogliere dati biometrici senza consenso informato specifico per ogni finalita
- Data minimization: Conservare solo gli embedding necessari, non le immagini originali
- Diritto alla cancellazione: Implementare un endpoint per eliminare tutti i dati di una persona
- Finalita limitata: I dati raccolti per un sistema di accesso non possono essere usati per analytics di marketing
- Bias testing obbligatorio: Prima del deploy, verificare le metriche per diversi gruppi demografici (EER per genere, eta, etnia)
- No sorveglianza pubblica: Il Regolamento AI Act UE 2024 vieta quasi completamente il riconoscimento facciale in spazi pubblici
Algoritmo di Bias Testing
Prima di ogni deployment, esegui sempre la valutazione per sottogruppi demografici. Un sistema con EER aggregato del 2% potrebbe avere EER del 5% per un sottogruppo specifico - il che e eticamente e legalmente inaccettabile.
9. Best Practices
Checklist per Sistemi di Face Recognition Production-Ready
- Usa MediaPipe per real-time, MTCNN per alta precisione: non sono in competizione - scegli in base al contesto
- Minimum 5-10 immagini per persona: in condizioni diverse (luce, angolo, espressione). Con 1 sola immagine, il sistema e fragile
- Normalizza SEMPRE gli embedding:
emb = emb / np.linalg.norm(emb). Senza normalizzazione, la distanza coseno non funziona correttamente - Calibra la soglia su dati reali: non usare 0.5 come soglia default senza averla validata sul tuo dataset. Calcola EER sul tuo scenario specifico
- Anti-spoofing: i sistemi senza liveness detection sono vulnerabili a foto e video. Integra un modello di liveness detection (MobileNetV2 fine-tuned su dataset di spoofing)
- Aggiorna gli embedding nel tempo: le persone cambiano aspetto. Pianifica una re-enrollment periodica o un online update degli embedding
- Logging con privacy: logga solo gli embedding (non le immagini), con hashing dell'identity per il debugging senza esporre dati personali
Conclusioni
La pipeline di face recognition moderna e robusta, modulare e accessibile. Abbiamo coperto ogni layer di un sistema production-ready:
- MediaPipe: detection ultra-veloce su CPU, ottimo per real-time con vincoli di risorse. 200+ FPS su laptop moderno.
- MTCNN + Face Alignment: base solida per sistemi di riconoscimento precisi. I 5 landmark sono fondamentali per l'allineamento canonico 112x112.
- InsightFace/ArcFace: embedding 512D con 99.83% di accuratezza su LFW - stato dell'arte accessibile via pip.
- Soglia calibrata con ROC/EER: la differenza tra un sistema robusto e uno inaffidabile. Non usare 0.5 di default senza validazione.
- Anti-Spoofing + Liveness Detection: fondamentale per sistemi di sicurezza. Texture CNN + micro-motion analysis per resistenza a print/replay attacks.
- FAISS per scaling: da KNN scikit-learn (10K embedding) a FAISS HNSW (1M+ embedding) con speedup di 100-1000x.
- Etica e GDPR: non un optional ma un requisito fondamentale. EU AI Act 2024 vieta il riconoscimento facciale in spazi pubblici.
Navigazione Serie
- Precedente: Computer Vision su Edge: Raspberry Pi e Jetson
- Successivo: Case Study: Rilevamento Anomalie Industriali
Risorse Cross-Serie
- MLOps: Model Serving in Produzione - deploy di modelli su API REST
- Deep Learning Avanzato: Vision Transformers







