Detectarea și recunoașterea feței: MediaPipe, MTCNN și FaceNet
Recunoașterea facială este una dintre cele mai mature aplicații de viziune computerizată și răspândit: de la sisteme de securitate la smartphone-uri, de la controlul accesului la analiză demografie în comerțul cu amănuntul. Cu toate acestea, implementați-o corect - cu atenție la acuratețe, viteză și mai ales etică – necesită o înțelegere profunzimea tehnicilor implicate.
În acest articol vom explora întregul teanc: detectarea feței (găsirea fețelor în o imagine), alinierea feței (normalizare geometrică), încorporarea feței (reprezentare vector) și verificarea/identificarea feței. Vom folosi MediaPipe în timp real, MTCNN pentru acuratețe și FaceNet/ArcFace pentru recunoaștere.
Ce vei învăța
- Detectarea feței vs pipeline de recunoaștere a feței: diferențe și cazuri de utilizare
- MediaPipe Face Detection: rapid, ușor, multiplatform
- MediaPipe Face Mesh: 468 de repere faciale în timp real
- MTCNN: CNN în cascadă cu mai multe sarcini pentru o detectare precisă
- Alinierea feței: normalizare geometrică cu repere
- Încorporarea feței: FaceNet și ArcFace pentru reprezentări compacte
- Verificarea feței (1:1) și identificare (1:N)
- Construirea unui sistem de recunoaștere de la zero cu o bază de date cu fețe
- Considerații etice și juridice: GDPR, părtinire, consimțământ
1. Detectarea feței vs Recunoașterea feței: conducta completă
Termenul „recunoaștere facială” grupează adesea două sarcini distincte cu cerințe tehnicieni foarte diferiti:
Componentele conductei faciale
| Fază | Sarcini | Ieșiri | Model tipic |
|---|---|---|---|
| Detectare | Găsiți poziția fețelor | Cutii de delimitare | MediaPipe, MTCNN, RetinaFace |
| Aliniere | Normalizează geometria | Imagine normalizată 112x112 | Asemănare similară cu reper |
| Încorporarea | Extrageți descriptori de caracteristici | Transportator 128-512D | FaceNet, ArcFace, AdaFace |
| Verificare | Aceeași persoană? (1:1) | Scorul de similaritate, boolean | Distanța cosinus între înglobări |
| Identificare | Cine este? (1:N) | Identitate + încredere | KNN privind încorporarea bazei de date |
2. MediaPipe: Face Detection și Face Mesh
MediaPipe de Google și cel mai practic cadru pentru detectarea feței în timp real pe CPU. Modelul BlazeFace este optimizat special pentru viteza pe dispozitive mobile și încorporate, ajungând la peste 200 de FPS pe un laptop modern.
2.1 Detectarea feței cu MediaPipe
import mediapipe as mp
import cv2
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class FaceDetection:
"""Risultato di detection per un singolo volto."""
bbox: tuple[int, int, int, int] # x1, y1, x2, y2
confidence: float
keypoints: dict[str, tuple[int, int]] # nome -> (x, y) in pixel
class MediaPipeFaceDetector:
"""
Face detector basato su MediaPipe BlazeFace.
Velocissimo su CPU: 200+ FPS su immagini 640x480.
Ottimo per real-time, non per immagini ad alta densita di volti.
"""
KEYPOINT_NAMES = [
'right_eye', 'left_eye', 'nose_tip',
'mouth_center', 'right_ear_tragion', 'left_ear_tragion'
]
def __init__(self, min_confidence: float = 0.5,
model_selection: int = 0):
"""
model_selection:
0 = short range (entro 2m, più veloce)
1 = full range (fino a 5m, più accurato)
"""
self.mp_face = mp.solutions.face_detection
self.detector = self.mp_face.FaceDetection(
model_selection=model_selection,
min_detection_confidence=min_confidence
)
self.mp_draw = mp.solutions.drawing_utils
def detect(self, img_bgr: np.ndarray) -> list[FaceDetection]:
"""Rileva volti in un'immagine BGR."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.detector.process(img_rgb)
faces = []
if not results.detections:
return faces
for detection in results.detections:
score = detection.score[0]
bbox_rel = detection.location_data.relative_bounding_box
# Coordinate relative -> pixel
x1 = max(0, int(bbox_rel.xmin * w))
y1 = max(0, int(bbox_rel.ymin * h))
x2 = min(w, int((bbox_rel.xmin + bbox_rel.width) * w))
y2 = min(h, int((bbox_rel.ymin + bbox_rel.height) * h))
# Keypoints (occhi, naso, bocca, orecchie)
keypoints = {}
for idx, name in enumerate(self.KEYPOINT_NAMES):
kp = detection.location_data.relative_keypoints[idx]
keypoints[name] = (int(kp.x * w), int(kp.y * h))
faces.append(FaceDetection(
bbox=(x1, y1, x2, y2),
confidence=float(score),
keypoints=keypoints
))
return faces
def draw(self, img_bgr: np.ndarray,
faces: list[FaceDetection]) -> np.ndarray:
"""Annota immagine con i risultati della detection."""
annotated = img_bgr.copy()
for face in faces:
x1, y1, x2, y2 = face.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(annotated, f"{face.confidence:.2f}",
(x1, y1 - 8), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 255, 0), 2)
# Disegna keypoints
for name, (kx, ky) in face.keypoints.items():
color = (0, 0, 255) if 'eye' in name else (255, 0, 0)
cv2.circle(annotated, (kx, ky), 4, color, -1)
return annotated
# Utilizzo: detection su webcam in real-time
def run_face_detection_webcam() -> None:
detector = MediaPipeFaceDetector(min_confidence=0.5)
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
break
faces = detector.detect(frame)
annotated = detector.draw(frame, faces)
cv2.putText(annotated, f"Faces: {len(faces)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('MediaPipe Face Detection', annotated)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
2.2 Face Mesh: 468 de repere în timp real
Modelul FaceMesh din MediaPipe extrage 468 de repere 3D (x, y, z) a feței. Util pentru alinierea feței, estimarea emoțiilor, filtre AR, privirea ochilor și detectarea somnolenței (raport aspectul ochilor).
import mediapipe as mp
import cv2
import numpy as np
class FaceMeshAnalyzer:
"""
MediaPipe Face Mesh: 468 landmark 3D in real-time.
Utilita incluse: eye aspect ratio (sonnolenza), head pose, ecc.
"""
# Indici dei landmark MediaPipe per occhi
LEFT_EYE_IDX = [362, 385, 387, 263, 373, 380]
RIGHT_EYE_IDX = [33, 160, 158, 133, 153, 144]
def __init__(self, max_faces: int = 1,
refine_landmarks: bool = True):
"""
refine_landmarks=True: aggiunge landmark attorno agli occhi
e alle iridi (468 -> 478 punti totali).
"""
self.mp_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_mesh.FaceMesh(
max_num_faces=max_faces,
refine_landmarks=refine_landmarks,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.mp_draw = mp.solutions.drawing_utils
self.mp_styles = mp.solutions.drawing_styles
def process(self, img_bgr: np.ndarray) -> Optional[list]:
"""Processa immagine e restituisce lista di landmark per ogni volto."""
h, w = img_bgr.shape[:2]
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
if not results.multi_face_landmarks:
return None
all_faces_lm = []
for face_landmarks in results.multi_face_landmarks:
# Converti da coordinate normalizzate a pixel
lm_pixels = []
for lm in face_landmarks.landmark:
lm_pixels.append((int(lm.x * w), int(lm.y * h), lm.z))
all_faces_lm.append(lm_pixels)
return all_faces_lm
def eye_aspect_ratio(self, landmarks: list,
eye_indices: list) -> float:
"""
Eye Aspect Ratio (EAR) - indicatore di sonnolenza.
EAR < 0.2 per 20+ frame consecutivi = occhio chiuso.
Formula: EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)
"""
pts = [np.array(landmarks[i][:2]) for i in eye_indices]
# Distanze verticali
A = np.linalg.norm(pts[1] - pts[5])
B = np.linalg.norm(pts[2] - pts[4])
# Distanza orizzontale
C = np.linalg.norm(pts[0] - pts[3])
return (A + B) / (2.0 * C) if C > 0 else 0.0
def draw_mesh(self, img_bgr: np.ndarray,
results_raw) -> np.ndarray:
"""Disegna la mesh completa con stili MediaPipe predefiniti."""
annotated = img_bgr.copy()
if results_raw and results_raw.multi_face_landmarks:
for face_lm in results_raw.multi_face_landmarks:
self.mp_draw.draw_landmarks(
image=annotated,
landmark_list=face_lm,
connections=self.mp_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=self.mp_styles
.get_default_face_mesh_tesselation_style()
)
return annotated
# Rilevamento sonnolenza con Eye Aspect Ratio
def drowsiness_detector(threshold: float = 0.22,
consec_frames: int = 20) -> None:
"""Sistema di alert sonnolenza basato su EAR."""
analyzer = FaceMeshAnalyzer(max_faces=1)
cap = cv2.VideoCapture(0)
ear_counter = 0
alert_active = False
while True:
ret, frame = cap.read()
if not ret:
break
landmarks_list = analyzer.process(frame)
if landmarks_list:
lms = landmarks_list[0] # primo volto
ear_l = analyzer.eye_aspect_ratio(lms, analyzer.LEFT_EYE_IDX)
ear_r = analyzer.eye_aspect_ratio(lms, analyzer.RIGHT_EYE_IDX)
avg_ear = (ear_l + ear_r) / 2.0
if avg_ear < threshold:
ear_counter += 1
if ear_counter >= consec_frames:
alert_active = True
cv2.putText(frame, "ALERT: SONNOLENZA!",
(50, 200), cv2.FONT_HERSHEY_SIMPLEX,
1.5, (0, 0, 255), 3)
else:
ear_counter = 0
alert_active = False
cv2.putText(frame, f"EAR: {avg_ear:.3f}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2)
cv2.imshow('Drowsiness Detector', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
3. MTCNN: CNN în cascadă cu sarcini multiple
MTCNN și un detector cu trei trepte (P-Net, R-Net, O-Net) care echilibrează viteza si precizia. Este standardul de aur pentru detectarea precisă în sisteme recunoaștere: identifică fețele cu 5 repere necesare (ochi, nas, colțuri ale gurii). pentru alinierea feței. Mai lent decât MediaPipe, dar mai robust în condiții dure.
from mtcnn import MTCNN
import cv2
import numpy as np
from PIL import Image
class MTCNNFaceProcessor:
"""
MTCNN per detection precisa + face alignment.
Produce immagini 112x112 normalizzate, ottimali per FaceNet/ArcFace.
"""
def __init__(self, min_face_size: int = 40,
thresholds: list = None,
scale_factor: float = 0.709):
self.detector = MTCNN(
min_face_size=min_face_size,
thresholds=thresholds or [0.6, 0.7, 0.7],
scale_factor=scale_factor
)
def detect_and_align(self, img_bgr: np.ndarray,
output_size: int = 112) -> list[np.ndarray]:
"""
Rileva volti e li restituisce allineati (112x112 default).
L'allineamento usa una trasformazione affine sui 5 landmark
per portare gli occhi in posizione canonica.
Returns: lista di immagini volto allineate (BGR, float32 [0,1])
"""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
aligned_faces = []
for det in detections:
if det['confidence'] < 0.90:
continue
keypoints = det['keypoints']
src_pts = np.array([
keypoints['left_eye'],
keypoints['right_eye'],
keypoints['nose'],
keypoints['mouth_left'],
keypoints['mouth_right']
], dtype=np.float32)
# Punti di destinazione canonici per 112x112
dst_pts = np.array([
[38.2946, 51.6963],
[73.5318, 51.6963],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.3655]
], dtype=np.float32)
# Scale per output_size diversi da 112
scale = output_size / 112.0
dst_pts *= scale
# Trasformazione affine -> immagine allineata
M = cv2.estimateAffinePartial2D(src_pts, dst_pts)[0]
aligned = cv2.warpAffine(img_bgr, M, (output_size, output_size))
aligned_faces.append(aligned.astype(np.float32) / 255.0)
return aligned_faces
def detect_with_info(self, img_bgr: np.ndarray) -> list[dict]:
"""Rileva volti con tutte le informazioni MTCNN."""
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
detections = self.detector.detect_faces(img_rgb)
results = []
h, w = img_bgr.shape[:2]
for det in detections:
x, y, bw, bh = det['box']
x1 = max(0, x)
y1 = max(0, y)
x2 = min(w, x + bw)
y2 = min(h, y + bh)
results.append({
'bbox': (x1, y1, x2, y2),
'confidence': det['confidence'],
'keypoints': det['keypoints']
})
return results
4. Recunoaștere facială: FaceNet și ArcFace
După detectare și aliniere, inima sistemului de recunoaștere este model de încorporare a feței: o rețea neuronală care transformă o imagine 112x112 într-un vector de 128-512 dimensiuni. Fețele aceleiași persoane produc vectori din apropiere în spațiu; fețe diferite sunt departe.
Comparația modelelor de încorporare a feței
| Model | Încorporare Dim | Pierderi | LFW Acc. | Dimensiune |
|---|---|---|---|---|
| FaceNet (Google) | 128 | Pierderea tripletului | 99,63% | 90 MB |
| ArcFace (InsightFace) | 512 | ArcFaceLoss | 99,83% | 249 MB |
| AdaFace | 512 | Pierderea AdaFace | 99,82% | 249 MB |
| MobileFaceNet (margine) | 128 | ArcFaceLoss | 99,55% | 4 MB |
import insightface
from insightface.app import FaceAnalysis
import numpy as np
import cv2
import pickle
from pathlib import Path
from sklearn.preprocessing import normalize
from sklearn.neighbors import KNeighborsClassifier
class FaceRecognitionSystem:
"""
Sistema completo di face recognition basato su InsightFace (ArcFace).
Supporta registrazione di nuove identità e riconoscimento real-time.
Installazione: pip install insightface onnxruntime scikit-learn
"""
def __init__(self, db_path: str = 'face_db.pkl',
recognition_threshold: float = 0.5):
"""
recognition_threshold: soglia coseno per considerare un match
(0.5 e un buon default per ArcFace 512D)
"""
# Inizializza FaceAnalysis (detection + embedding in un'unica API)
self.app = FaceAnalysis(
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.app.prepare(ctx_id=0, det_size=(640, 640))
self.db_path = Path(db_path)
self.threshold = recognition_threshold
self.database: dict[str, list[np.ndarray]] = {}
self.knn: Optional[KNeighborsClassifier] = None
if self.db_path.exists():
self._load_database()
def register_person(self, name: str,
images: list[np.ndarray],
max_faces_per_image: int = 1) -> int:
"""
Registra una nuova persona nel database.
name: identificatore della persona
images: lista di immagini BGR (almeno 5 per robustezza)
Returns: numero di embedding registrati con successo
"""
embeddings = []
for img in images:
faces = self.app.get(img)
if not faces:
continue
# Prendi il volto più grande (per immagini con una persona)
face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) *
(f.bbox[3]-f.bbox[1]))
emb = normalize(face.embedding.reshape(1, -1))[0]
embeddings.append(emb)
if len(embeddings) >= max_faces_per_image * len(images):
break
if not embeddings:
print(f"[WARN] Nessun volto rilevato per {name}")
return 0
if name not in self.database:
self.database[name] = []
self.database[name].extend(embeddings)
self._rebuild_knn()
self._save_database()
print(f"Registrato {name}: {len(embeddings)} embedding")
return len(embeddings)
def recognize(self, img_bgr: np.ndarray) -> list[dict]:
"""
Riconosce tutti i volti in un'immagine.
Returns: lista di dict con bbox, identity, confidence per ogni volto
"""
faces = self.app.get(img_bgr)
results = []
for face in faces:
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = self._match_embedding(emb)
x1, y1, x2, y2 = face.bbox.astype(int)
results.append({
'bbox': (x1, y1, x2, y2),
'identity': identity,
'confidence': confidence,
'is_known': confidence >= self.threshold
})
return results
def _match_embedding(self, emb: np.ndarray) -> tuple[str, float]:
"""Trova la corrispondenza migliore nel database."""
if not self.database or self.knn is None:
return ('unknown', 0.0)
# Usa KNN con metrica coseno (1 - cosine_similarity = cosine_distance)
dist, idx = self.knn.kneighbors([emb], n_neighbors=1)
labels = [name for name, embs in self.database.items()
for _ in embs]
best_name = labels[idx[0][0]]
similarity = 1.0 - dist[0][0] # da distanza coseno a similarità
return (best_name, float(similarity))
def _rebuild_knn(self) -> None:
"""Ricostruisce il classificatore KNN dopo aggiornamenti al DB."""
all_embs = []
all_labels = []
for name, embs in self.database.items():
all_embs.extend(embs)
all_labels.extend([name] * len(embs))
if len(all_embs) < 2:
return
self.knn = KNeighborsClassifier(
n_neighbors=min(3, len(all_embs)),
metric='cosine',
algorithm='brute'
)
self.knn.fit(np.array(all_embs), all_labels)
def _save_database(self) -> None:
with open(self.db_path, 'wb') as f:
pickle.dump(self.database, f)
def _load_database(self) -> None:
with open(self.db_path, 'rb') as f:
self.database = pickle.load(f)
self._rebuild_knn()
print(f"Database caricato: {len(self.database)} identità")
def annotate(self, img_bgr: np.ndarray,
results: list[dict]) -> np.ndarray:
"""Annota l'immagine con i risultati del riconoscimento."""
annotated = img_bgr.copy()
for r in results:
x1, y1, x2, y2 = r['bbox']
color = (0, 255, 0) if r['is_known'] else (0, 0, 255)
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
label = (f"{r['identity']} ({r['confidence']:.2f})"
if r['is_known'] else "Unknown")
cv2.putText(annotated, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return annotated
5. Verificare feței: prag și curbă ROC
La verificarea feței răspunde la întrebarea: „aceste două fotografii arată aceeași persoană?”. Este o problemă de potrivire 1:1, diferită de identificare (1:N). Cheia este să alegeți pragul de similaritate corect prin analiza curbei ROC.
import numpy as np
from sklearn.metrics import roc_curve, auc
import matplotlib
matplotlib.use('Agg') # Per ambienti senza display
import matplotlib.pyplot as plt
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Similarità coseno tra due embedding normalizzati."""
emb1_n = emb1 / (np.linalg.norm(emb1) + 1e-10)
emb2_n = emb2 / (np.linalg.norm(emb2) + 1e-10)
return float(np.dot(emb1_n, emb2_n))
def find_optimal_threshold(same_person_pairs: list[tuple],
diff_person_pairs: list[tuple]) -> dict:
"""
Trova la soglia ottimale analizzando la ROC curve.
same_person_pairs: lista di coppie (emb1, emb2) della stessa persona
diff_person_pairs: lista di coppie (emb1, emb2) di persone diverse
Returns: {threshold, eer, auc, far, frr}
"""
scores = []
labels = []
for emb1, emb2 in same_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(1) # stessa persona
for emb1, emb2 in diff_person_pairs:
scores.append(cosine_similarity(emb1, emb2))
labels.append(0) # persone diverse
scores_arr = np.array(scores)
labels_arr = np.array(labels)
# ROC curve
fpr, tpr, thresholds = roc_curve(labels_arr, scores_arr)
roc_auc = auc(fpr, tpr)
# Equal Error Rate (EER): punto dove FAR = FRR
fnr = 1 - tpr
eer_idx = np.argmin(np.abs(fpr - fnr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2.0
optimal_threshold = thresholds[eer_idx]
# Metriche a soglia ottimale
predictions = (scores_arr >= optimal_threshold).astype(int)
tp = np.sum((predictions == 1) & (labels_arr == 1))
fp = np.sum((predictions == 1) & (labels_arr == 0))
fn = np.sum((predictions == 0) & (labels_arr == 1))
tn = np.sum((predictions == 0) & (labels_arr == 0))
far = fp / (fp + tn) if (fp + tn) > 0 else 0 # False Accept Rate
frr = fn / (fn + tp) if (fn + tp) > 0 else 0 # False Reject Rate
print(f"=== Face Verification Metrics ===")
print(f"AUC-ROC: {roc_auc:.4f}")
print(f"EER: {eer:.4f} ({eer*100:.2f}%)")
print(f"Soglia ottimale: {optimal_threshold:.4f}")
print(f"FAR @ EER: {far:.4f} ({far*100:.2f}%)")
print(f"FRR @ EER: {frr:.4f} ({frr*100:.2f}%)")
return {
'threshold': float(optimal_threshold),
'eer': float(eer),
'auc': float(roc_auc),
'far': float(far),
'frr': float(frr)
}
6. Anti-Spoofing și Detectare Liveness
Un sistem de recunoaștere a feței fără detectarea vieții si vulnerabil la atacurile de falsificare: este nevoie doar de o fotografie tipărită, un videoclip pe un smartphone sau o mască 3D pentru a păcăli majoritatea detectorilor. Detectarea vieții distinge o față real dintr-un artefact.
Tipuri de atacuri de falsificare
| Tip de atac | Descriere | Dificultatea de apărare | Tehnica de atenuare |
|---|---|---|---|
| Print Attack | Fotografie tipărită pe hârtie/hârtie lucioasă | Scăzut | Analiza texturii, detectarea modelului moaré |
| Replay Attack | Față video pe ecran | Medie | Detectare reflexie pe ecran, adâncime 3D |
| Mască 3D | Mască realistă imprimată 3D | Ridicat | Senzor IR, provocare-răspuns, micromișcare |
| Videoclipuri deepfake | Videoclip sintetic generat de AI | Foarte sus | Detector de deepfake, analiza fluxului sanguin |
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from typing import Optional
class LivenessDetector:
"""
Sistema di liveness detection basato su due segnali complementari:
1. Texture analysis (LBP-based + CNN) - rileva print attacks
2. Micro-motion analysis - rileva replay attacks (video statici non hanno micro-movimenti)
Per deployment serio, considera: SilentFace, FAS-SGTD, CentralDiff-CNN
"""
def __init__(self, model_path: Optional[str] = None,
device: str = 'auto'):
self.device = torch.device(
'cuda' if torch.cuda.is_available() and device == 'auto'
else 'cpu'
)
# Modello CNN per texture analysis (fine-tuned su dataset anti-spoofing)
# Dataset: CelebA-Spoof, OULU-NPU, MSU-MFSD
self.model = self._build_model(model_path)
self.model.eval()
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# Buffer per micro-motion analysis
self.frame_buffer: list[np.ndarray] = []
self.buffer_size = 10 # 10 frame ~= 333ms @ 30FPS
def _build_model(self, model_path: Optional[str]) -> nn.Module:
"""
MobileNetV2 fine-tuned per binary classification: real vs spoof.
MobileNetV2 perchè e leggero (3.4M params) e veloce su CPU/edge.
"""
model = models.mobilenet_v2(pretrained=False)
model.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(model.last_channel, 2) # [spoof, real]
)
if model_path:
state_dict = torch.load(model_path, map_location=self.device)
model.load_state_dict(state_dict)
return model.to(self.device)
def is_live_texture(self, face_roi: np.ndarray,
threshold: float = 0.7) -> tuple[bool, float]:
"""
Analisi texture CNN: classifica il volto come reale o spoof.
face_roi: crop del volto BGR [H, W, 3]
threshold: soglia per considerare il volto reale
Returns: (is_live, confidence_score)
"""
img_rgb = cv2.cvtColor(face_roi, cv2.COLOR_BGR2RGB)
tensor = self.transform(img_rgb).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1)
live_prob = probs[0, 1].item() # indice 1 = "real"
return live_prob >= threshold, live_prob
def compute_lbp_features(self, gray: np.ndarray,
radius: int = 3, n_points: int = 24) -> np.ndarray:
"""
Local Binary Pattern (LBP) texture descriptor.
Le stampe su carta hanno pattern LBP caratteristici (moaré).
Feature complementare alla CNN per robustezza.
"""
h, w = gray.shape
lbp = np.zeros_like(gray, dtype=np.uint8)
for r in range(radius, h - radius):
for c in range(radius, w - radius):
center = int(gray[r, c])
code = 0
for p in range(n_points):
angle = 2 * np.pi * p / n_points
nr = r - int(radius * np.sin(angle))
nc = c + int(radius * np.cos(angle))
nr = np.clip(nr, 0, h - 1)
nc = np.clip(nc, 0, w - 1)
code |= (int(gray[nr, nc]) >= center) << p
lbp[r, c] = code % 256
# Istogramma LBP come feature vector
hist, _ = np.histogram(lbp.ravel(), bins=256, range=(0, 256))
hist = hist.astype(float)
hist /= (hist.sum() + 1e-7)
return hist
def analyze_micro_motion(self, frame_bgr: np.ndarray) -> tuple[bool, float]:
"""
Analisi micro-movimento: rileva movimenti naturali del volto (micro-espressioni,
respiro, battito ciglia) assenti in foto/video statici.
Returns: (has_micro_motion, motion_score)
Un video replay di solito ha motion_score < 0.5
"""
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
self.frame_buffer.append(gray)
if len(self.frame_buffer) > self.buffer_size:
self.frame_buffer.pop(0)
if len(self.frame_buffer) < 3:
return True, 1.0 # Non abbastanza frame, assumi live
# Optical flow su ultimi 3 frame
flow_magnitudes = []
for i in range(len(self.frame_buffer) - 2, len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
flow_magnitudes.append(np.mean(magnitude))
avg_motion = np.mean(flow_magnitudes)
# Calcola varianza del motion (micro-movimenti irregolari = live)
if len(self.frame_buffer) >= self.buffer_size:
all_flows = []
for i in range(len(self.frame_buffer) - 1):
flow = cv2.calcOpticalFlowFarneback(
self.frame_buffer[i], self.frame_buffer[i+1],
None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
all_flows.append(np.mean(magnitude))
motion_variance = np.var(all_flows)
# Alta varianza = movimenti naturali irregolari = live
motion_score = min(1.0, motion_variance * 100)
else:
motion_score = 0.5
return motion_score > 0.3, float(motion_score)
def predict(self, face_roi: np.ndarray,
frame_bgr: np.ndarray) -> dict:
"""
Prediction combinata: texture CNN + micro-motion.
Fusion con regola AND conservativa per sicurezza.
"""
is_live_tex, tex_score = self.is_live_texture(face_roi)
has_motion, motion_score = self.analyze_micro_motion(frame_bgr)
# Logica di fusione: entrambi i segnali devono concordare
combined_score = 0.6 * tex_score + 0.4 * motion_score
is_live = is_live_tex and (motion_score > 0.2)
return {
'is_live': is_live,
'combined_score': combined_score,
'texture_score': tex_score,
'motion_score': motion_score,
'verdict': 'LIVE' if is_live else 'SPOOF'
}
# Pipeline completa: face recognition + liveness check
def secure_recognition_pipeline(recognizer, liveness_detector, frame_bgr):
"""
Pipeline sicura: prima verifica liveness, poi riconosce.
Se il volto non e live, non procedere con il riconoscimento.
"""
# 1. Rileva volti
faces = recognizer.app.get(frame_bgr)
results = []
for face in faces:
x1, y1, x2, y2 = face.bbox.astype(int)
face_roi = frame_bgr[max(0,y1):y2, max(0,x1):x2]
if face_roi.size == 0:
continue
# 2. Liveness check (PRIMA del riconoscimento)
liveness = liveness_detector.predict(face_roi, frame_bgr)
if not liveness['is_live']:
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'SPOOF',
'identity': None,
'liveness': liveness
})
continue
# 3. Face recognition (solo se live)
from sklearn.preprocessing import normalize
emb = normalize(face.embedding.reshape(1, -1))[0]
identity, confidence = recognizer._match_embedding(emb)
results.append({
'bbox': (x1, y1, x2, y2),
'verdict': 'LIVE',
'identity': identity if confidence > 0.5 else 'unknown',
'confidence': confidence,
'liveness': liveness
})
return results
7. Bază de date scalabilă: FAISS pentru milioane de înglobare
Sistemul scikit-learn KNN funcționează bine până la ~10.000 de încorporare. Dincolo de asta prag, căutarea cu forța brută devine blocajul. FAISS (Facebook AI Similarity Search) se ridică la miliarde de vectori cu căutare aproximativă în microsecunde.
import faiss
import numpy as np
import pickle
from pathlib import Path
class FAISSFaceDatabase:
"""
Database di embedding facciali scalabile con FAISS.
Ricerca approssimata (HNSW) per 1M+ embedding in < 1ms.
Installazione: pip install faiss-cpu (o faiss-gpu per GPU)
"""
def __init__(self, embedding_dim: int = 512,
db_path: str = 'faiss_face_db',
index_type: str = 'hnsw'):
"""
index_type:
'flat' - Ricerca esatta, O(n), per < 100K embedding
'hnsw' - Ricerca approssimata HNSW, per 100K - 10M embedding
'ivf' - Inverted File Index, per 10M+ embedding
"""
self.embedding_dim = embedding_dim
self.db_path = Path(db_path)
self.db_path.mkdir(exist_ok=True)
self.index_type = index_type
self.index = self._build_index()
self.id_to_name: dict[int, str] = {} # FAISS ID -> nome persona
self.next_id = 0
# Carica se esiste
if (self.db_path / 'index.faiss').exists():
self._load()
def _build_index(self) -> faiss.Index:
"""Costruisce indice FAISS appropriato."""
if self.index_type == 'flat':
# Ricerca esatta con distanza coseno (IP su vettori normalizzati)
index = faiss.IndexFlatIP(self.embedding_dim)
elif self.index_type == 'hnsw':
# HNSW: Hierarchical Navigable Small World
# M=32: connessioni per nodo (più alto = più accurato ma più RAM)
# ef_construction=200: qualità indice durante build
index = faiss.IndexHNSWFlat(self.embedding_dim, 32)
index.hnsw.efConstruction = 200
index.hnsw.efSearch = 64 # tradeoff accuratezza/velocità a query time
elif self.index_type == 'ivf':
# IVF: divide lo spazio in cluster, cerca solo nei cluster più vicini
n_lists = 100 # numero di cluster (sqrt(N) e una buona regola)
quantizer = faiss.IndexFlatIP(self.embedding_dim)
index = faiss.IndexIVFFlat(quantizer, self.embedding_dim, n_lists,
faiss.METRIC_INNER_PRODUCT)
else:
raise ValueError(f"Tipo indice sconosciuto: {self.index_type}")
return index
def add_embedding(self, name: str,
embedding: np.ndarray) -> int:
"""
Aggiunge un embedding al database.
Normalizza L2 per usare inner product come similarità coseno.
"""
emb_norm = embedding / (np.linalg.norm(embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
# IVF richiede training prima del primo add
if self.index_type == 'ivf' and not self.index.is_trained:
print("Training IVF index... (richiede un batch iniziale)")
# In pratica: train con tutti gli embedding prima di usare
self.index.train(emb_norm)
self.index.add(emb_norm)
self.id_to_name[self.next_id] = name
self.next_id += 1
return self.next_id - 1
def add_person(self, name: str,
embeddings: list[np.ndarray]) -> int:
"""Aggiunge più embedding per la stessa persona."""
for emb in embeddings:
self.add_embedding(name, emb)
return len(embeddings)
def search(self, query_embedding: np.ndarray,
k: int = 1,
min_similarity: float = 0.5) -> list[dict]:
"""
Cerca i k embedding più simili nel database.
Returns: lista di {name, similarity, faiss_id}
Ordinate per similarità decrescente.
"""
if self.next_id == 0:
return []
emb_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
emb_norm = emb_norm.astype(np.float32).reshape(1, -1)
actual_k = min(k, self.next_id)
similarities, indices = self.index.search(emb_norm, actual_k)
results = []
for sim, idx in zip(similarities[0], indices[0]):
if idx == -1 or sim < min_similarity:
continue
results.append({
'name': self.id_to_name.get(int(idx), 'unknown'),
'similarity': float(sim),
'faiss_id': int(idx)
})
return results
def identify(self, query_embedding: np.ndarray,
threshold: float = 0.5) -> tuple[str, float]:
"""Identifica la persona con maggiore similarità."""
results = self.search(query_embedding, k=3)
if not results:
return 'unknown', 0.0
# Voto di maggioranza tra top-3 (robustezza)
from collections import Counter
names = [r['name'] for r in results]
best_name = Counter(names).most_common(1)[0][0]
best_sim = max(r['similarity'] for r in results
if r['name'] == best_name)
if best_sim < threshold:
return 'unknown', best_sim
return best_name, best_sim
def save(self) -> None:
"""Salva indice FAISS e mapping ID->nome su disco."""
faiss.write_index(self.index,
str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'wb') as f:
pickle.dump({'id_to_name': self.id_to_name,
'next_id': self.next_id}, f)
print(f"Database salvato: {self.next_id} embedding")
def _load(self) -> None:
"""Carica indice FAISS e mapping da disco."""
self.index = faiss.read_index(str(self.db_path / 'index.faiss'))
with open(self.db_path / 'id_map.pkl', 'rb') as f:
data = pickle.load(f)
self.id_to_name = data['id_to_name']
self.next_id = data['next_id']
print(f"Database caricato: {self.next_id} embedding, "
f"{len(set(self.id_to_name.values()))} identità")
def stats(self) -> dict:
"""Statistiche del database."""
names = list(self.id_to_name.values())
from collections import Counter
name_counts = Counter(names)
return {
'total_embeddings': self.next_id,
'total_identities': len(name_counts),
'avg_embeddings_per_person': np.mean(list(name_counts.values()))
if name_counts else 0,
'index_type': self.index_type
}
# Benchmark: KNN sklearn vs FAISS per database di varie dimensioni
def benchmark_search_backends(n_identities: int = 10000,
embs_per_person: int = 5) -> None:
"""Confronta tempi di ricerca KNN vs FAISS."""
import time
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import normalize
n_total = n_identities * embs_per_person
dim = 512
# Genera embedding sintetici
embeddings = np.random.randn(n_total, dim).astype(np.float32)
embeddings = normalize(embeddings)
labels = np.repeat(np.arange(n_identities), embs_per_person)
query = np.random.randn(1, dim).astype(np.float32)
query = normalize(query)
# KNN sklearn
knn = KNeighborsClassifier(n_neighbors=3, metric='cosine', algorithm='brute')
knn.fit(embeddings, labels)
t0 = time.perf_counter()
for _ in range(100):
knn.predict(query)
knn_time = (time.perf_counter() - t0) / 100 * 1000
# FAISS HNSW
index = faiss.IndexHNSWFlat(dim, 32)
index.add(embeddings)
t0 = time.perf_counter()
for _ in range(100):
index.search(query, 3)
faiss_time = (time.perf_counter() - t0) / 100 * 1000
print(f"\nBenchmark ricerca ({n_total:,} embedding, dim={dim}):")
print(f" KNN sklearn: {knn_time:.2f} ms/query")
print(f" FAISS HNSW: {faiss_time:.3f} ms/query")
print(f" Speedup: {knn_time/faiss_time:.0f}x")
8. Considerații etice și juridice
Atenție: Date biometrice conform GDPR
Datele faciale sunt date biometrice conform GDPR (Art. 9) și prelucrarea acestora și supuse unor restricții severe. În Italia și UE:
- Consimțământ explicit obligatoriu: Datele biometrice nu pot fi colectate fără consimțământul informat specific pentru fiecare scop
- Minimizarea datelor: Păstrați doar înglobările necesare, nu imaginile originale
- Dreptul la ștergere: Implementați un punct final pentru a șterge toate datele unei persoane
- Scop limitat: Datele colectate pentru un sistem de conectare nu pot fi utilizate pentru analize de marketing
- Testarea obligatorie a părtinirii: Înainte de implementare, verificați valorile pentru diferite grupuri demografice (EER în funcție de sex, vârstă, etnie)
- Fără supraveghere publică: Regulamentul AI Act EU 2024 interzice aproape complet recunoașterea facială în spațiile publice
Algoritmul de testare a părtinirii
Înainte de fiecare implementare, efectuați întotdeauna o subevaluare demografică. Un sistem cu EER agregat de 2% poate avea EER de 5% pentru un subgrup specific – ceea ce este inacceptabil din punct de vedere etic și legal.
9. Cele mai bune practici
Lista de verificare pentru sistemele de recunoaștere a feței pregătite pentru producție
- Utilizați MediaPipe în timp real, MTCNN pentru precizie ridicată: nu sunt în competiție – alegeți în funcție de context
- Minim 5-10 imagini de persoană: în diferite condiții (lumină, unghi, expresie). Cu o singură imagine, sistemul este fragil
- Normalizați ÎNTOTDEAUNA înglobările:
emb = emb / np.linalg.norm(emb). Fără normalizare, distanța cosinus nu funcționează corect - Calibrați pragul pe date reale: nu utilizați 0,5 ca prag implicit fără a-l fi validat pe setul de date. Calculați EER pe scenariul dvs. specific
- Anti-spoofing: sistemele fără detectarea vieții sunt vulnerabile la fotografii și videoclipuri. Integrează un model de detectare a vieții (MobileNetV2 reglat fin pe setul de date de falsificare)
- Actualizați înglobările în timp: oamenii își schimbă aspectul. Planificați o reînscriere periodică sau o actualizare online a înglobărilor
- Conectarea cu confidențialitate: înregistrează numai înglobări (nu imagini), cu hashing de identitate pentru depanare fără a expune datele personale
Concluzii
Conducta modernă și robustă de recunoaștere a feței, modulară și accesibilă. Avem a acoperit fiecare strat al unui sistem pregătit pentru producție:
- MediaPipe: Detectare ultra-rapidă pe CPU, excelentă pentru timp real, cu constrângeri de resurse. 200+ FPS pe laptopul modern.
- MTCNN + Alinierea feței: bază solidă pentru sisteme de recunoaștere precise. Cele 5 repere sunt fundamentale pentru alinierea canonică de 112x112.
- InsightFace/ArcFace: încorporare 512D cu o precizie de 99,83% pe LFW - stadiul tehnicii accesibil prin pip.
- Prag calibrat cu ROC/EER: diferența dintre un sistem robust și unul nefiabil. Nu utilizați 0.5 în mod implicit fără validare.
- Anti-Spoofing + Detectare Liveness: esenţial pentru sistemele de securitate. Textura CNN + analiză micro-mișcare pentru rezistență la atacurile de printare/reluare.
- FAISS pentru scalare: de la KNN scikit-learn (încorporare 10K) la FAISS HNSW (încorporare 1M+) cu accelerare de 100-1000x.
- Etica și GDPR: nu este o cerință opțională, ci fundamentală. EU AI Act 2024 interzice recunoașterea facială în spațiile publice.
Navigare în serie
Resurse între serii
- MLOps: Model care servește în producție - implementați modele pe API-ul REST
- Învățare profundă avansată: Vision Transformers







