Monitoring NLP in Produzione: Drift Detection e Retraining
Un modello NLP eccellente al momento del deployment può diventare rapidamente obsoleto. Il linguaggio evolve, i pattern cambiano, i dati reali si discostano dal training set. Questo fenomeno si chiama data drift e, se non monitorato, causa un degrado silenzioso delle prestazioni che spesso viene scoperto troppo tardi — quando i clienti si lamentano o i KPI aziendali crollano.
In questo articolo costruiremo un sistema di monitoring completo per modelli NLP in produzione: dal tracciamento delle predizioni in tempo reale al rilevamento automatico del drift, dall'alerting al retraining automatico con MLflow e Airflow. Questo articolo chiude la serie NLP Moderno: da BERT ai LLM con un focus avanzato sull'operativita.
Cosa Imparerai
- Tipi di drift: data drift, concept drift, label drift, feature drift
- Metriche da monitorare per modelli NLP in produzione
- Text drift detection: embedding drift con Population Stability Index (PSI)
- Monitoring della qualità delle predizioni senza label (proxy metrics)
- Sistema di alerting con soglie e notifiche
- Logging strutturato delle predizioni NLP
- Pipeline di retraining automatico con trigger basati su drift
- A/B testing per nuove versioni del modello
- Dashboard di monitoring con Grafana e Prometheus
- Shadow deployment per validare nuovi modelli senza impatto
1. Tipi di Drift nei Modelli NLP
Il "drift" in un modello NLP può manifestarsi in modi diversi, ognuno con cause e soluzioni differenti.
Tassonomia del Drift
| Tipo | Definizione | Esempio NLP | Soluzione |
|---|---|---|---|
| Data Drift | Distribuzione input cambia | Nuovo gergo su Twitter | Retraining con nuovi dati |
| Concept Drift | Relazione input-output cambia | "Trump" = policy vs persona | Retraining frequente |
| Label Drift | Distribuzione output cambia | Più predizioni negative in crisi | Monitoraggio output distribution |
| Feature Drift | Statistiche delle feature cambiano | Lunghezza media testi aumenta | Feature monitoring + alert |
2. Sistema di Logging delle Predizioni
La base di qualsiasi sistema di monitoring e il logging strutturato di ogni predizione. Dobbiamo catturare abbastanza informazioni per analizzare il comportamento del modello nel tempo.
import json
import time
import hashlib
import logging
from dataclasses import dataclass, asdict, field
from typing import Optional, List, Dict, Any
from datetime import datetime
import uuid
@dataclass
class NLPPredictionLog:
"""Schema di logging per predizioni NLP."""
prediction_id: str
timestamp: str
model_version: str
input_text: str
input_hash: str # hash del testo (non il testo per privacy)
input_length_chars: int
input_length_tokens: int
predicted_label: str
predicted_label_id: int
confidence_score: float
all_class_scores: Dict[str, float]
inference_latency_ms: float
true_label: Optional[str] = None # None se non disponibile
feedback: Optional[str] = None # feedback utente se disponibile
metadata: Dict[str, Any] = field(default_factory=dict)
class NLPPredictionLogger:
"""Logger strutturato per predizioni NLP."""
def __init__(self, model_version: str, log_path: str = "./prediction_logs"):
self.model_version = model_version
self.log_path = log_path
self.logger = logging.getLogger("nlp_predictions")
# Handler per file JSON Lines (JSONL)
handler = logging.FileHandler(f"{log_path}/predictions.jsonl")
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_prediction(self,
text: str,
predicted_label: str,
predicted_label_id: int,
confidence: float,
all_scores: Dict[str, float],
latency_ms: float,
num_tokens: int,
true_label: Optional[str] = None,
metadata: Optional[dict] = None) -> str:
"""Logga una singola predizione. Restituisce prediction_id."""
# Hash dell'input (non salvare il testo originale per GDPR)
input_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
prediction_id = str(uuid.uuid4())
log_entry = NLPPredictionLog(
prediction_id=prediction_id,
timestamp=datetime.utcnow().isoformat(),
model_version=self.model_version,
input_text=text[:500], # troncato per storage
input_hash=input_hash,
input_length_chars=len(text),
input_length_tokens=num_tokens,
predicted_label=predicted_label,
predicted_label_id=predicted_label_id,
confidence_score=confidence,
all_class_scores=all_scores,
inference_latency_ms=latency_ms,
true_label=true_label,
metadata=metadata or {}
)
self.logger.info(json.dumps(asdict(log_entry)))
return prediction_id
# Uso nella pipeline di inferenza
class MonitoredSentimentClassifier:
def __init__(self, model_path: str, model_version: str):
from transformers import pipeline, AutoTokenizer
self.pipeline = pipeline("text-classification", model=model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.logger = NLPPredictionLogger(model_version)
self.model_version = model_version
def predict(self, text: str, metadata: dict = None) -> dict:
start = time.time()
# Inferenza
result = self.pipeline(text)[0]
# Calcola numero di token
num_tokens = len(self.tokenizer.tokenize(text)[:128])
latency_ms = (time.time() - start) * 1000
# Log
pred_id = self.logger.log_prediction(
text=text,
predicted_label=result['label'],
predicted_label_id=0 if result['label'] == 'NEGATIVE' else 1,
confidence=result['score'],
all_scores={result['label']: result['score']},
latency_ms=latency_ms,
num_tokens=num_tokens,
metadata=metadata or {}
)
return {
"prediction_id": pred_id,
"label": result['label'],
"confidence": result['score'],
"latency_ms": latency_ms
}
3. Drift Detection: Text Embedding Approach
Il metodo più robusto per rilevare il data drift nel testo e confrontare la distribuzione degli embedding delle frasi nel training set con quelli in produzione.
import numpy as np
from sentence_transformers import SentenceTransformer
from scipy.stats import ks_2samp
from scipy.spatial.distance import jensenshannon
import warnings
class EmbeddingDriftDetector:
"""
Rileva data drift confrontando la distribuzione degli embedding.
Usa il test di Kolmogorov-Smirnov (KS) per ogni dimensione dell'embedding.
"""
def __init__(self, embedding_model: str = 'all-MiniLM-L6-v2',
ks_threshold: float = 0.1,
psi_threshold: float = 0.2):
self.model = SentenceTransformer(embedding_model)
self.ks_threshold = ks_threshold # soglia test KS
self.psi_threshold = psi_threshold # soglia PSI
self.reference_embeddings = None
self.reference_stats = None
def fit(self, reference_texts: List[str], batch_size: int = 64):
"""Calcola statistiche di riferimento dal training set."""
print(f"Computing reference embeddings for {len(reference_texts)} texts...")
self.reference_embeddings = self.model.encode(
reference_texts, batch_size=batch_size, show_progress_bar=True
)
self.reference_stats = {
'mean': self.reference_embeddings.mean(axis=0),
'std': self.reference_embeddings.std(axis=0),
'n': len(reference_texts)
}
print(f"Reference embeddings computed: shape={self.reference_embeddings.shape}")
def detect_drift(self, production_texts: List[str],
batch_size: int = 64) -> Dict[str, Any]:
"""Rileva drift confrontando produzione con riferimento."""
if self.reference_embeddings is None:
raise ValueError("Call fit() first with reference data")
prod_embeddings = self.model.encode(
production_texts, batch_size=batch_size, show_progress_bar=False
)
# Metodo 1: KS test per ogni dimensione dell'embedding
ks_stats = []
ks_pvalues = []
for dim in range(self.reference_embeddings.shape[1]):
stat, pvalue = ks_2samp(
self.reference_embeddings[:, dim],
prod_embeddings[:, dim]
)
ks_stats.append(stat)
ks_pvalues.append(pvalue)
avg_ks = np.mean(ks_stats)
max_ks = np.max(ks_stats)
# Metodo 2: Cosine distance media tra centroidi
ref_centroid = self.reference_embeddings.mean(axis=0)
prod_centroid = prod_embeddings.mean(axis=0)
centroid_distance = 1 - np.dot(ref_centroid, prod_centroid) / (
np.linalg.norm(ref_centroid) * np.linalg.norm(prod_centroid)
)
# Metodo 3: PSI (Population Stability Index)
psi = self._compute_psi(
self.reference_embeddings[:, :10], # prime 10 dim per PSI
prod_embeddings[:, :10]
)
drift_detected = (avg_ks > self.ks_threshold or
centroid_distance > 0.05)
return {
"drift_detected": drift_detected,
"avg_ks_statistic": float(avg_ks),
"max_ks_statistic": float(max_ks),
"centroid_cosine_distance": float(centroid_distance),
"psi": float(psi),
"n_production": len(production_texts),
"alert_level": "HIGH" if avg_ks > self.ks_threshold * 2
else "MEDIUM" if drift_detected
else "LOW"
}
def _compute_psi(self, reference: np.ndarray, production: np.ndarray,
n_bins: int = 10) -> float:
"""Population Stability Index: misura lo shift della distribuzione."""
psi_values = []
for dim in range(reference.shape[1]):
ref = reference[:, dim]
prod = production[:, dim]
bins = np.percentile(ref, np.linspace(0, 100, n_bins + 1))
bins[0] -= 0.001
bins[-1] += 0.001
ref_counts, _ = np.histogram(ref, bins=bins)
prod_counts, _ = np.histogram(prod, bins=bins)
ref_pct = (ref_counts / ref_counts.sum()) + 1e-10
prod_pct = (prod_counts / prod_counts.sum()) + 1e-10
psi = np.sum((prod_pct - ref_pct) * np.log(prod_pct / ref_pct))
psi_values.append(psi)
return float(np.mean(psi_values))
4. Proxy Metrics: Monitorare Senza Label
In produzione, spesso non abbiamo le label vere per calcolare l'accuracy. Usiamo proxy metrics che correlano con la qualità del modello.
import numpy as np
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
class NLPProxyMetricsMonitor:
"""
Monitora metriche proxy per modelli NLP senza label.
"""
def __init__(self, window_hours: int = 24):
self.window_hours = window_hours
self.predictions = []
def add_prediction(self, prediction: dict):
"""Aggiunge una predizione al log."""
prediction['timestamp'] = datetime.utcnow()
self.predictions.append(prediction)
def compute_proxy_metrics(self) -> dict:
"""Calcola metriche proxy dalla finestra temporale corrente."""
cutoff = datetime.utcnow() - timedelta(hours=self.window_hours)
recent = [p for p in self.predictions if p['timestamp'] > cutoff]
if not recent:
return {"error": "Nessuna predizione nella finestra temporale"}
confidences = [p['confidence'] for p in recent]
latencies = [p['latency_ms'] for p in recent]
labels = [p['predicted_label'] for p in recent]
# 1. Confidence distribution (bassa confidenza = modello incerto)
low_conf_pct = sum(1 for c in confidences if c < 0.7) / len(confidences)
avg_confidence = np.mean(confidences)
confidence_entropy = -np.sum(
[(c * np.log(c) + (1-c) * np.log(1-c + 1e-10)) for c in confidences]
) / len(confidences)
# 2. Label distribution (drift nelle predizioni)
label_counts = defaultdict(int)
for l in labels:
label_counts[l] += 1
label_distribution = {k: v/len(labels) for k, v in label_counts.items()}
# 3. Latency percentiles
p50 = np.percentile(latencies, 50)
p95 = np.percentile(latencies, 95)
p99 = np.percentile(latencies, 99)
# 4. Text length statistics
lengths = [p.get('input_length_chars', 0) for p in recent]
# 5. Refusal rate (se il modello ritorna "UNCERTAIN")
uncertain_pct = sum(1 for l in labels if l == 'UNCERTAIN') / len(labels)
return {
"window_hours": self.window_hours,
"n_predictions": len(recent),
"avg_confidence": round(avg_confidence, 4),
"low_confidence_pct": round(low_conf_pct, 4),
"confidence_entropy": round(float(confidence_entropy), 4),
"label_distribution": label_distribution,
"latency_p50_ms": round(p50, 1),
"latency_p95_ms": round(p95, 1),
"latency_p99_ms": round(p99, 1),
"avg_input_length": round(np.mean(lengths), 1),
"uncertain_pct": round(uncertain_pct, 4)
}
def check_alerts(self, thresholds: dict) -> list:
"""Verifica se le metriche proxy superano le soglie di alert."""
metrics = self.compute_proxy_metrics()
alerts = []
checks = {
"avg_confidence": ("<", thresholds.get("min_confidence", 0.75)),
"low_confidence_pct": (">", thresholds.get("max_low_conf_pct", 0.20)),
"latency_p95_ms": (">", thresholds.get("max_p95_latency_ms", 500)),
"uncertain_pct": (">", thresholds.get("max_uncertain_pct", 0.10)),
}
for metric_name, (op, threshold) in checks.items():
value = metrics.get(metric_name)
if value is None:
continue
triggered = (value < threshold if op == "<" else value > threshold)
if triggered:
alerts.append({
"metric": metric_name,
"value": value,
"threshold": threshold,
"severity": "HIGH" if abs(value - threshold) / threshold > 0.5 else "MEDIUM"
})
return alerts
5. Pipeline di Retraining Automatico
import subprocess
from pathlib import Path
import json
from datetime import datetime
class AutoRetrainingPipeline:
"""
Pipeline di retraining automatico triggered dal drift detection.
"""
def __init__(self,
drift_detector: EmbeddingDriftDetector,
proxy_monitor: NLPProxyMetricsMonitor,
base_model_path: str,
data_path: str,
output_path: str):
self.drift_detector = drift_detector
self.proxy_monitor = proxy_monitor
self.base_model_path = base_model_path
self.data_path = data_path
self.output_path = output_path
self.retraining_history = []
def should_retrain(self,
production_texts: list,
drift_threshold: float = 0.1,
confidence_threshold: float = 0.75) -> dict:
"""
Determina se e necessario il retraining.
Ritorna {should_retrain: bool, reason: str, severity: str}
"""
# Check 1: Embedding drift
drift_report = self.drift_detector.detect_drift(production_texts)
if drift_report['drift_detected']:
return {
"should_retrain": True,
"reason": f"Embedding drift rilevato: KS={drift_report['avg_ks_statistic']:.4f}",
"severity": drift_report['alert_level'],
"drift_report": drift_report
}
# Check 2: Proxy metrics
metrics = self.proxy_monitor.compute_proxy_metrics()
alerts = self.proxy_monitor.check_alerts({
"min_confidence": confidence_threshold,
"max_low_conf_pct": 0.25
})
if any(a['severity'] == 'HIGH' for a in alerts):
return {
"should_retrain": True,
"reason": f"Metriche proxy critiche: {alerts}",
"severity": "HIGH",
"alerts": alerts
}
return {
"should_retrain": False,
"reason": "Tutte le metriche nella norma",
"severity": "LOW"
}
def trigger_retraining(self, trigger_reason: str, new_data_path: str):
"""Avvia il retraining con i nuovi dati."""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
new_model_path = f"{self.output_path}/model_v{timestamp}"
print(f"Avvio retraining: {trigger_reason}")
print(f"Nuovo modello: {new_model_path}")
# Log del retraining
self.retraining_history.append({
"timestamp": timestamp,
"trigger_reason": trigger_reason,
"base_model": self.base_model_path,
"new_data": new_data_path,
"output_model": new_model_path,
"status": "started"
})
# In produzione: trigghera una pipeline CI/CD (Airflow, GitHub Actions, Kubeflow)
# Esempio con subprocess:
# subprocess.Popen([
# "python", "train.py",
# "--base-model", self.base_model_path,
# "--train-data", new_data_path,
# "--output", new_model_path,
# ])
return {
"retraining_id": timestamp,
"new_model_path": new_model_path,
"status": "triggered"
}
6. A/B Testing per Nuove Versioni del Modello
import random
from typing import Callable
class ABTestingRouter:
"""
Router per A/B testing tra versioni del modello.
Splitta il traffico tra il modello corrente (A) e il nuovo (B).
"""
def __init__(self,
model_a: Callable,
model_b: Callable,
traffic_split_b: float = 0.1,
experiment_id: str = "exp_001"):
self.model_a = model_a
self.model_b = model_b
self.traffic_split_b = traffic_split_b
self.experiment_id = experiment_id
self.results = {"a": [], "b": []}
def predict(self, text: str, user_id: str = None) -> dict:
"""Instrada la richiesta al modello A o B in base al traffic split."""
# Instradamento deterministico basato su user_id (per coerenza)
if user_id:
use_b = int(hashlib.md5(user_id.encode()).hexdigest(), 16) % 100 < (self.traffic_split_b * 100)
else:
use_b = random.random() < self.traffic_split_b
model_variant = "b" if use_b else "a"
model_fn = self.model_b if use_b else self.model_a
result = model_fn(text)
result["model_variant"] = model_variant
result["experiment_id"] = self.experiment_id
self.results[model_variant].append({
"confidence": result.get("confidence", 0),
"latency_ms": result.get("latency_ms", 0),
})
return result
def get_experiment_stats(self) -> dict:
"""Calcola statistiche dell'esperimento A/B."""
stats = {}
for variant in ["a", "b"]:
if self.results[variant]:
confs = [r["confidence"] for r in self.results[variant]]
lats = [r["latency_ms"] for r in self.results[variant]]
stats[variant] = {
"n_requests": len(self.results[variant]),
"avg_confidence": round(np.mean(confs), 4),
"avg_latency_ms": round(np.mean(lats), 1),
}
return {"experiment_id": self.experiment_id, "variants": stats}
7. Dashboard con Prometheus e Grafana
# monitoring_api.py
from fastapi import FastAPI
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import time
app = FastAPI()
# Metriche Prometheus per modelli NLP
PREDICTIONS_TOTAL = Counter(
"nlp_predictions_total",
"Numero totale di predizioni NLP",
["model_version", "predicted_label"]
)
CONFIDENCE_HISTOGRAM = Histogram(
"nlp_prediction_confidence",
"Distribuzione del confidence score",
["model_version"],
buckets=[0.5, 0.6, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 0.99, 1.0]
)
LATENCY_HISTOGRAM = Histogram(
"nlp_inference_latency_seconds",
"Latenza dell'inferenza NLP",
["model_version"],
buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0]
)
DRIFT_SCORE = Gauge(
"nlp_embedding_drift_score",
"Score del drift degli embedding (0=no drift, 1=drift massimo)",
["model_version"]
)
INPUT_LENGTH_HISTOGRAM = Histogram(
"nlp_input_length_chars",
"Lunghezza dell'input in caratteri",
["model_version"],
buckets=[50, 100, 200, 500, 1000, 2000, 5000]
)
MODEL_VERSION = "v2.1.0"
@app.post("/predict")
def predict_with_monitoring(request: dict):
text = request["text"]
start = time.time()
# ... Inferenza ...
result = {"label": "POSITIVE", "score": 0.92}
latency = time.time() - start
# Aggiorna metriche Prometheus
PREDICTIONS_TOTAL.labels(
model_version=MODEL_VERSION,
predicted_label=result["label"]
).inc()
CONFIDENCE_HISTOGRAM.labels(model_version=MODEL_VERSION).observe(result["score"])
LATENCY_HISTOGRAM.labels(model_version=MODEL_VERSION).observe(latency)
INPUT_LENGTH_HISTOGRAM.labels(model_version=MODEL_VERSION).observe(len(text))
return {**result, "latency_ms": latency * 1000}
@app.get("/metrics")
def metrics():
"""Endpoint Prometheus per il scraping delle metriche."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# docker-compose.yml per Prometheus + Grafana:
# services:
# prometheus:
# image: prom/prometheus
# volumes:
# - ./prometheus.yml:/etc/prometheus/prometheus.yml
# grafana:
# image: grafana/grafana
# ports:
# - "3000:3000"
8. Shadow Deployment e Rollout Graduale
Prima di esporre un nuovo modello a traffico reale, il shadow deployment permette di validare latenza e comportamento senza alcun rischio per gli utenti. Il modello shadow riceve le stesse richieste del modello in produzione, ma le sue predizioni vengono scartate — servono solo per il monitoring.
import asyncio
import time
from typing import Callable, Dict, Any
class ShadowDeploymentManager:
"""
Gestisce il shadow deployment di un nuovo modello NLP.
Il modello shadow riceve tutto il traffico ma non risponde agli utenti.
"""
def __init__(self,
production_model: Callable,
shadow_model: Callable,
shadow_name: str = "shadow_v2"):
self.production_model = production_model
self.shadow_model = shadow_model
self.shadow_name = shadow_name
self.comparison_log: list = []
def predict(self, text: str, user_id: str = None) -> Dict[str, Any]:
"""
Esegue la predizione in produzione e in background quella shadow.
Restituisce solo il risultato del modello di produzione.
"""
# Predizione produzione (sincrona)
prod_start = time.time()
prod_result = self.production_model(text)
prod_latency = (time.time() - prod_start) * 1000
# Predizione shadow (asincrona, non blocca la risposta)
shadow_start = time.time()
try:
shadow_result = self.shadow_model(text)
shadow_latency = (time.time() - shadow_start) * 1000
shadow_error = None
except Exception as e:
shadow_result = None
shadow_latency = None
shadow_error = str(e)
# Log del confronto
self.comparison_log.append({
"text_hash": hash(text),
"prod_label": prod_result.get("label"),
"prod_confidence": prod_result.get("confidence"),
"prod_latency_ms": prod_latency,
"shadow_label": shadow_result.get("label") if shadow_result else None,
"shadow_confidence": shadow_result.get("confidence") if shadow_result else None,
"shadow_latency_ms": shadow_latency,
"shadow_error": shadow_error,
"agreement": prod_result.get("label") == (shadow_result or {}).get("label")
})
# Restituisce SOLO il risultato del modello di produzione
return prod_result
def get_shadow_stats(self) -> Dict[str, Any]:
"""Calcola statistiche di confronto tra produzione e shadow."""
if not self.comparison_log:
return {"error": "Nessun dato di confronto disponibile"}
agreement_rate = sum(1 for r in self.comparison_log if r["agreement"]) / len(self.comparison_log)
prod_latencies = [r["prod_latency_ms"] for r in self.comparison_log if r["prod_latency_ms"]]
shadow_latencies = [r["shadow_latency_ms"] for r in self.comparison_log if r["shadow_latency_ms"]]
error_rate = sum(1 for r in self.comparison_log if r["shadow_error"]) / len(self.comparison_log)
import numpy as np
return {
"n_requests": len(self.comparison_log),
"agreement_rate": round(agreement_rate, 4),
"prod_p95_latency_ms": round(np.percentile(prod_latencies, 95), 1) if prod_latencies else None,
"shadow_p95_latency_ms": round(np.percentile(shadow_latencies, 95), 1) if shadow_latencies else None,
"shadow_error_rate": round(error_rate, 4),
"ready_for_promotion": agreement_rate >= 0.95 and error_rate < 0.01
}
# Strategia di rollout graduale: 1% → 10% → 50% → 100%
ROLLOUT_STAGES = [
{"traffic_pct": 0.01, "min_requests": 500, "min_agreement": 0.95},
{"traffic_pct": 0.10, "min_requests": 2000, "min_agreement": 0.96},
{"traffic_pct": 0.50, "min_requests": 5000, "min_agreement": 0.97},
{"traffic_pct": 1.00, "min_requests": None, "min_agreement": None}, # full rollout
]
9. Anti-Pattern Comuni nel Monitoring NLP
Anti-Pattern da Evitare
- Monitorare solo la latenza: la latenza e una metrica di infrastruttura, non di qualità del modello. Un modello veloce ma sbagliato e peggio di uno lento corretto.
- Nessuna distribuzione di riferimento: il drift detection e privo di significato senza una solida distribuzione di riferimento calcolata su dati di training/validation.
- Alert fatigue: soglie troppo sensibili allagano il team on-call di falsi positivi. Inizia con soglie conservative e calibra in base ai pattern osservati.
- Decisioni su un unico segnale: non trigghare mai il retraining basandosi su un solo indicatore. Richiedi almeno due segnali indipendenti concordanti.
- Ignorare la qualità dei dati upstream: monitorare il modello senza monitorare la pipeline dei dati e incompleto. Valida schema e freshness dei dati di input.
- Retraining senza validazione offline: un modello retrainato automaticamente deve passare un test set offline prima di essere deployato, anche se il trigger era automatico.
10. Checklist Completa per il Monitoring NLP
Checklist Monitoring NLP in Produzione
- Logging: logga ogni predizione con testo (o hash), confidence, latenza, versione modello e timestamp in formato JSONL
- Drift Detection: controlla embedding drift settimanalmente su finestre da 1.000 campioni; alert immediato se KS > 0.15 o PSI > 0.2
- Proxy Metrics: monitora confidence distribution, label distribution e latenza in real-time tramite Prometheus
- Ground Truth Collection: raccogli label vere tramite feedback utente, annotation team, o campionamento casuale (1-5% del traffico)
- Retraining Trigger: definisci soglie chiare per il retraining automatico (es. drift score > 0.2 o accuracy stimata < 0.85); richiedi 2+ segnali concordanti
- Shadow Deployment: prima dell'A/B test, valida il nuovo modello in shadow mode per almeno 24 ore
- A/B Testing: valida ogni nuova versione del modello con il 10% del traffico per almeno 48 ore prima del full rollout
- Alerting: configura notifiche (Slack, PagerDuty) per alert ad alta severita con link ai runbook di risposta
- Data Retention: conserva i log almeno 90 giorni per l'analisi storica
- GDPR: anonimizza o hasha i testi degli utenti nei log di produzione; mai conservare PII senza consenso esplicito e cifratura
Conclusioni: Fine della Serie
Con questo articolo concludiamo la serie NLP Moderno: da BERT ai LLM. Abbiamo percorso un viaggio completo: dai fondamenti della tokenizzazione e degli embedding, all'architettura BERT, dalla sentiment analysis per l'italiano al fine-tuning di LLM locali, dalla semantic similarity al monitoring in produzione.
Riepilogo della Serie
| # | Articolo | Concetti Chiave |
|---|---|---|
| 1 | Fondamenti NLP | Tokenizzazione, Word2Vec, GloVe, pipeline |
| 2 | BERT e Transformers | Architettura, MLM, NSP, fine-tuning |
| 3 | Sentiment Analysis | VADER, BERT, produzione, FastAPI |
| 4 | NLP Italiano | feel-it, AlBERTo, spaCy, dialetti |
| 5 | Named Entity Recognition | BIO format, spaCy, BERT NER, seqeval |
| 6 | Text Classification | Multi-label, zero-shot, SetFit |
| 7 | HuggingFace Transformers | AutoClass, Trainer, PEFT, Accelerate |
| 8 | Fine-tuning Locale | LoRA, QLoRA, DAPT, catastrophic forgetting |
| 9 | Semantic Similarity | SBERT, FAISS, bi-encoder, cross-encoder |
| 10 | Monitoring NLP | Drift detection, proxy metrics, retraining |
Serie Correlate da Esplorare
- AI Engineering / RAG: costruisci sistemi RAG completi con gli embeddings e le tecniche di ricerca semantica di questa serie
- Deep Learning Avanzato: approfondisci quantizzazione, pruning e tecniche di ottimizzazione per modelli grandi
- MLOps: automatizza il monitoring e il retraining con MLflow, DVC e pipeline CI/CD per modelli ML
- Computer Vision: molte delle tecniche di questa serie (BERT-like architectures, ViT, fine-tuning) si applicano anche al CV







