Model Drift Detection e Retraining Automatico in Produzione
Hai finalmente deployato il tuo modello in produzione. Le metriche sono eccellenti, il team e soddisfatto e gli stakeholder applaudono. Poi, settimane dopo, qualcuno nota che le predizioni sembrano meno accurate. Un mese dopo, il modello e chiaramente degradato. Benvenuto nel problema più subdolo del machine learning in produzione: il model drift.
Secondo una ricerca di Gartner, oltre il 65% dei modelli ML in produzione degrada in modo significativo entro 12 mesi dal deployment, spesso senza che i team se ne accorgano in tempo. Il dato e ancora più preoccupante nel retail e finanza, dove le distribuzioni dei dati cambiano rapidamente in risposta a trend di mercato, stagionalita e comportamenti degli utenti.
In questa guida costruiremo un sistema completo di drift detection e retraining automatico: capiremo i diversi tipi di drift, implementeremo detector con Evidently AI, NannyML e Alibi Detect, configureremo test statistici (KS, PSI, Chi-Square), integreremo Prometheus e Grafana per il monitoring continuo e creeremo pipeline di retraining automatico triggerate da alert.
Cosa Imparerai
- Differenza tra data drift, concept drift, feature drift e label drift
- Test statistici per rilevare drift: KS test, PSI, Chi-Square, MMD
- Implementazione pratica con Evidently AI, NannyML e Alibi Detect
- Dashboard di monitoring con Prometheus e Grafana
- Pipeline di alerting e retraining automatico con MLflow
- Best practices per MLOps production-grade con budget ridotto
perchè il Drift e un Problema Critico
Il mondo reale non e statico. I dati che il tuo modello vedeva in fase di training riflettevano una distribuzione statistica specifica, un "snapshot" del mondo in quel momento. Ma il mondo continua a cambiare: le abitudini degli utenti evolvono, i mercati oscillano, i sistemi upstream modificano il formato dei dati, si verificano eventi inattesi come pandemie o crisi economiche.
Il problema fondamentale del drift e la silent degradation: il modello smette di essere accurato ma continua a produrre predizioni senza errori tecnici. Il servizio risponde con HTTP 200, i log non mostrano eccezioni, ma le decisioni basate su quelle predizioni stanno diventando sempre più sbagliate. Senza un sistema di monitoring attivo, questo degrado può passare inosservato per mesi.
Impatto Economico del Drift Non Rilevato
Un modello di fraud detection degradato può far passare transazioni fraudolente non rilevate. Un sistema di pricing che drifta può costare milioni in pricing non competitivo. Un modello di churn prediction degradato porta a campagne retention sprecate sui clienti sbagliati. Il costo del monitoring e sempre inferiore al costo del drift non rilevato.
Tassonomia del Drift: Quattro Tipi Fondamentali
Prima di implementare soluzioni, e fondamentale capire cosa sta driftando. Esistono quattro categorie principali di drift, ognuna con cause diverse e strategie di detection diverse.
1. Data Drift (Covariate Shift)
Il data drift, noto anche come covariate shift, avviene quando la distribuzione delle feature di input P(X) cambia rispetto al training, ma la relazione tra feature e label P(Y|X) rimane stabile. Esempio classico: il modello e stato addestrato su utenti di una certa fascia d'eta, ma il prodotto viene adottato da una nuova fascia demografica.
Il data drift e il tipo più comune e più facile da rilevare perchè richiede solo di monitorare le distribuzioni delle feature in input, senza necessità di label. Si può rilevare anche in real-time, prima che i risultati impattino le predizioni.
2. Concept Drift
Il concept drift e più insidioso: la relazione P(Y|X) tra feature e label cambia, anche se la distribuzione delle feature X rimane stabile. Esempio: un modello di sentiment analysis addestrato su tweet del 2022 non capisce il linguaggio gergale del 2025. La semantica delle parole (X) e cambiata, quindi il mapping X → Y e diverso.
Il concept drift richiede ground truth per essere rilevato direttamente: bisogna confrontare le predizioni con le etichette reali. Quando queste tardano ad arrivare (come in scenari di churn prediction con finestre di osservazione di 90 giorni), si usano proxy metrics come il prediction drift o le distribuzioni degli score di probabilità.
3. Feature Drift
Il feature drift e un sottoinsieme del data drift che riguarda specifiche feature critiche per il modello. Non tutte le feature hanno lo stesso impatto: una feature con alta importanza che drifta e molto più critica di una feature poco rilevante. I tool di feature importance (SHAP, permutation importance) aiutano a prioritizzare il monitoring.
4. Label Drift (Prior Probability Shift)
Il label drift avviene quando la distribuzione delle etichette target P(Y) cambia. In un modello di classificazione binaria (spam/non-spam), se improvvisamente il 90% dei messaggi e spam invece del solito 10%, il modello e calibrato per una distribuzione diversa e le predizioni saranno distorte. Questo tipo di drift e comune in scenari con class imbalance variabile nel tempo.
Riepilogo Tipi di Drift
- Data Drift: P(X) cambia, P(Y|X) stabile. Rilevabile senza label.
- Concept Drift: P(Y|X) cambia. Richiede label o proxy metrics.
- Feature Drift: Specifiche feature cambiano. Priorità basata su importanza.
- Label Drift: P(Y) cambia. Monitora distribuzione delle predizioni.
Test Statistici per la Drift Detection
La rilevazione statistica del drift si basa sul confronto tra due distribuzioni: la distribuzione di riferimento (training o un periodo di produzione stabile) e la distribuzione corrente (la finestra di monitoraggio). Diversi test statistici hanno caratteristiche diverse in termini di sensibilita, interpretabilita e costo computazionale.
Kolmogorov-Smirnov Test (KS)
Il test KS e il più usato per feature continue. Misura la distanza massima tra le funzioni di distribuzione cumulativa (CDF) delle due distribuzioni. Il p-value ottenuto indica la probabilità che le due campioni vengano dalla stessa distribuzione: p-value basso (tipicamente < 0.05) segnala drift statisticamente significativo.
Vantaggi: non assume una distribuzione specifica (non-parametrico), robusto, facile da interpretare visivamente. Limitazioni: sensibile alle code delle distribuzioni, meno potente con campioni piccoli, può dare falsi positivi con grandi dataset.
Population Stability Index (PSI)
Il PSI e nato nel settore bancario per monitorare la stabilità della distribuzione degli score di rischio. Divide entrambe le distribuzioni in bucket e calcola la somma delle differenze pesate tra le proporzioni. L'interpretazione standard e:
- PSI < 0.1: nessun cambiamento significativo
- PSI 0.1 - 0.2: leggero cambiamento, monitorare
- PSI > 0.2: cambiamento significativo, azione richiesta
Il PSI e molto intuitivo per business stakeholders e si applica sia a feature continue (con discretizzazione in decili) che categoriche. E particolarmente popolare nei modelli di credit scoring e fraud detection.
Chi-Square Test
Il Chi-Square test e il test di riferimento per feature categoriche. Confronta le frequenze osservate con quelle attese e produce un p-value. E appropriato quando le feature hanno un numero limitato di categorie e i campioni sono sufficientemente grandi (frequenza attesa > 5 per ogni categoria). Per feature con alta cardinalita, si raccomanda di raggruppare le categorie rare.
Maximum Mean Discrepancy (MMD)
L'MMD e un test basato su kernel che misura la distanza tra due distribuzioni in uno spazio di Hilbert. E particolarmente potente per rilevare differenze nelle strutture multivariata e viene usato da Alibi Detect per il drift di dati tabulari, immagini e testo. Il vantaggio e che non richiede la scelta di bucket o parametri di discretizzazione.
Implementazione con Evidently AI
Evidently AI e diventata la libreria open-source standard per il monitoring di modelli ML in Python, con oltre 20 milioni di download. Offre preset predefiniti per i casi d'uso più comuni e si integra con qualsiasi orchestratore di workflow.
# Installazione
pip install evidently
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, ClassificationPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
ColumnSummaryMetric
)
# --- Setup dati di riferimento e produzione ---
# Carica training data (reference)
reference_data = pd.read_parquet("data/training_features.parquet")
# Carica batch produzione ultimo mese
current_data = pd.read_parquet("data/production_batch_2025_02.parquet")
# Feature columns
feature_columns = [
"age", "tenure_months", "monthly_charges",
"total_charges", "num_support_tickets",
"contract_type", "payment_method"
]
# --- Report Data Drift ---
drift_report = Report(metrics=[
DatasetDriftMetric(), # overall drift summary
DataDriftTable(), # per-feature drift table
ColumnDriftMetric(column_name="monthly_charges"),
ColumnDriftMetric(column_name="contract_type"),
ColumnSummaryMetric(column_name="monthly_charges"),
])
drift_report.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Salva report HTML interattivo
drift_report.save_html("reports/drift_report_2025_02.html")
# Estrai metriche programmaticamente
report_dict = drift_report.as_dict()
dataset_drift = report_dict["metrics"][0]["result"]
print(f"Dataset drift detected: {dataset_drift['dataset_drift']}")
print(f"Features drifted: {dataset_drift['number_of_drifted_columns']}/{dataset_drift['number_of_columns']}")
print(f"Share of drifted features: {dataset_drift['share_of_drifted_columns']:.1%}")
Evidently genera report HTML interattivi con visualizzazioni delle distribuzioni, istogrammi sovrapposti e tabelle di riepilogo. Per ogni feature vengono riportati il test statistico usato (scelto automaticamente in base al tipo di dato), il p-value o la statistica di test, e un flag drift/no-drift.
Test Suite con Soglie Personalizzate
Per integrare Evidently in una pipeline CI/CD o in un workflow Airflow/Prefect, la Test Suite di Evidently e lo strumento giusto: permette di definire soglie precise e restituisce pass/fail in modo programmatico.
from evidently.test_suite import TestSuite
from evidently.tests import (
TestNumberOfDriftedColumns,
TestShareOfDriftedColumns,
TestColumnDrift,
TestDatasetDrift
)
# --- Test Suite con soglie personalizzate ---
drift_test_suite = TestSuite(tests=[
# Non più del 20% delle feature deve driftare
TestShareOfDriftedColumns(lt=0.2),
# Feature critiche: test individuali con soglie aggressive
TestColumnDrift(
column_name="monthly_charges",
stattest="ks",
stattest_threshold=0.05
),
TestColumnDrift(
column_name="contract_type",
stattest="chi2",
stattest_threshold=0.05
),
TestColumnDrift(
column_name="num_support_tickets",
stattest="psi",
stattest_threshold=0.1 # PSI < 0.1 = no drift
),
# Dataset-level drift test
TestDatasetDrift(stattest_threshold=0.05),
])
drift_test_suite.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Risultato pass/fail per la pipeline
test_result = drift_test_suite.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in test_result["tests"]
)
if not all_passed:
print("DRIFT DETECTED - Pipeline triggering retraining...")
for test in test_result["tests"]:
if test["status"] != "SUCCESS":
print(f" FAILED: {test['name']} - {test['description']}")
# Trigger retraining (vedi sezione retraining)
trigger_retraining_pipeline()
else:
print("All drift tests passed - Model healthy")
Monitoring con NannyML: Performance Senza Label
NannyML risolve uno dei problemi più difficili del model monitoring: stimare le performance del modello quando le etichette reali non sono ancora disponibili. In un modello di churn prediction, le etichette (se il cliente ha effettivamente churned) potrebbero arrivare solo 90 giorni dopo la predizione. NannyML usa il metodo Confidence-Based Performance Estimation (CBPE) per stimare accuracy, F1 e AUC in real-time usando solo le distribuzioni degli score.
pip install nannyml
import nannyml as nml
import pandas as pd
# Carica i dati
reference_df = pd.read_parquet("data/reference_with_targets.parquet")
analysis_df = pd.read_parquet("data/production_last_30_days.parquet")
# --- CBPE: Stima delle performance senza label ---
estimator = nml.CBPE(
y_pred_proba="churn_probability",
y_pred="churn_predicted",
y_true="churned", # presente solo nel reference
timestamp_column_name="prediction_date",
problem_type="binary_classification",
metrics=["roc_auc", "f1", "precision", "recall"],
chunk_size=500 # 500 predizioni per chunk temporale
)
estimator.fit(reference_df)
results = estimator.estimate(analysis_df)
# Visualizza risultati con alert automatici
figure = results.plot()
figure.show()
# Estrai metriche per alerting
estimated_metrics = results.to_df()
latest_chunk = estimated_metrics.tail(1)
auc_lower = latest_chunk["estimated_roc_auc_lower_confidence_boundary"].values[0]
if auc_lower < 0.70:
print(f"ALERT: AUC stimato < 0.70 (lower bound: {auc_lower:.3f})")
trigger_retraining_pipeline()
# --- Univariate Drift Detection ---
univariate_calc = nml.UnivariateDriftCalculator(
column_names=["monthly_charges", "tenure_months", "num_tickets"],
timestamp_column_name="prediction_date",
continuous_methods=["kolmogorov_smirnov", "jensen_shannon"],
categorical_methods=["chi2", "jensen_shannon"],
chunk_size=500
)
univariate_calc.fit(reference_df)
drift_results = univariate_calc.calculate(analysis_df)
# Plotta il drift nel tempo per ogni feature
drift_figure = drift_results.filter(period="analysis").plot()
drift_figure.show()
NannyML produce grafici temporali che mostrano l'evoluzione del drift nel tempo, con bande di confidenza e alert visivi. Questo e particolarmente utile per capire quando il drift e iniziato e se sta peggiorando o stabilizzandosi.
Alibi Detect: Drift Detection Avanzata con MMD e LSDD
Alibi Detect (by Seldon) e la libreria di riferimento per detection avanzata che va oltre le statistiche univariate. Supporta MMD (Maximum Mean Discrepancy) per dati tabulari e immagini, LSDD (Least-Squares Density Difference) e rilevazione di outlier. E ideale quando si ha bisogno di rilevare drift multivariato complesso.
pip install alibi-detect
import numpy as np
from alibi_detect.cd import MMDDrift, KSDrift, TabularDrift
from alibi_detect.saving import save_detector, load_detector
# Carica dati di riferimento (numpy array)
X_ref = reference_data[feature_columns].values.astype(np.float32)
X_current = current_data[feature_columns].values.astype(np.float32)
# --- KS Drift per feature continue ---
ks_detector = KSDrift(
x_ref=X_ref,
p_val=0.05, # soglia p-value
alternative="two-sided"
)
ks_preds = ks_detector.predict(
X_current,
drift_type="batch",
return_p_val=True,
return_distance=True
)
print("KS Drift Results:")
print(f" Drift detected: {ks_preds['data']['is_drift']}")
print(f" p-values per feature: {ks_preds['data']['p_val']}")
print(f" Features drifted: {ks_preds['data']['is_drift'].sum()}")
# --- MMD Drift per rilevazione multivariata ---
# Più potente per distribuzioni complesse
mmd_detector = MMDDrift(
x_ref=X_ref,
backend="pytorch", # o "tensorflow"
p_val=0.05,
n_permutations=200 # più alto = più preciso ma più lento
)
mmd_preds = mmd_detector.predict(
X_current,
return_p_val=True,
return_distance=True
)
print(f"\nMMD Drift (multivariato):")
print(f" Drift detected: {mmd_preds['data']['is_drift']}")
print(f" p-value: {mmd_preds['data']['p_val']:.4f}")
print(f" MMD^2 statistic: {mmd_preds['data']['distance']:.6f}")
# --- TabularDrift: test ottimizzato per dati tabulari misti ---
tabular_detector = TabularDrift(
x_ref=X_ref,
p_val=0.05,
categories_per_feature={
4: None, # feature index 4 = contract_type (categorica)
6: None # feature index 6 = payment_method (categorica)
},
)
# Salva detector per riutilizzo
save_detector(tabular_detector, "models/drift_detector/")
# Successivamente carica e usa
# loaded_detector = load_detector("models/drift_detector/")
Architettura del Sistema di Monitoring
Un sistema di monitoring production-grade richiede più componenti integrati: un layer di raccolta metriche, uno storage time-series, un sistema di visualizzazione e un motore di alerting. La combinazione Prometheus + Grafana e lo standard open-source per questo use case, con ampia integrazione nell'ecosistema Kubernetes.
# monitoring_service.py
# Servizio FastAPI che espone metriche di drift per Prometheus
from fastapi import FastAPI, BackgroundTasks
from prometheus_client import Counter, Gauge, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import pandas as pd
import schedule
import threading
import time
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="ML Monitoring Service")
# --- Prometheus Metrics ---
DRIFT_GAUGE = Gauge(
"ml_feature_drift_psi",
"Population Stability Index per feature",
labelnames=["feature_name", "model_name", "model_version"]
)
DATASET_DRIFT_GAUGE = Gauge(
"ml_dataset_drift_detected",
"1 se drift rilevato a livello dataset, 0 altrimenti",
labelnames=["model_name", "model_version"]
)
DRIFT_FEATURES_COUNT = Gauge(
"ml_drifted_features_count",
"Numero di feature che mostrano drift",
labelnames=["model_name"]
)
ESTIMATED_AUC = Gauge(
"ml_estimated_auc",
"AUC stimato via CBPE (NannyML)",
labelnames=["model_name", "model_version"]
)
PREDICTION_COUNT = Counter(
"ml_predictions_total",
"Numero totale di predizioni",
labelnames=["model_name", "outcome"]
)
INFERENCE_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Latenza inference in secondi",
labelnames=["model_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]
)
# --- Funzione di calcolo drift ---
def calculate_and_update_drift_metrics(
model_name: str,
model_version: str,
reference_data: pd.DataFrame,
current_data: pd.DataFrame,
feature_columns: list
):
"""Calcola PSI per ogni feature e aggiorna gauge Prometheus."""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
report = Report(metrics=[
DatasetDriftMetric(stattest="psi"),
DataDriftTable(stattest="psi"),
])
report.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
result = report.as_dict()
# Dataset-level drift
dataset_result = result["metrics"][0]["result"]
drift_detected = 1 if dataset_result["dataset_drift"] else 0
DATASET_DRIFT_GAUGE.labels(
model_name=model_name,
model_version=model_version
).set(drift_detected)
DRIFT_FEATURES_COUNT.labels(
model_name=model_name
).set(dataset_result["number_of_drifted_columns"])
# Per-feature PSI
feature_results = result["metrics"][1]["result"]["drift_by_columns"]
for feature_name, feature_data in feature_results.items():
psi_value = feature_data.get("stattest_threshold", 0)
actual_stat = feature_data.get("drift_score", 0)
DRIFT_GAUGE.labels(
feature_name=feature_name,
model_name=model_name,
model_version=model_version
).set(actual_stat)
logger.info(f"Drift metrics updated for {model_name} v{model_version}")
return drift_detected
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/drift/check")
async def trigger_drift_check(background_tasks: BackgroundTasks):
"""Trigger manuale del drift check."""
background_tasks.add_task(run_drift_check_job)
return {"status": "drift check started"}
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
Configurazione Prometheus e Grafana
La configurazione di Prometheus per scraping delle metriche ML e semplice: aggiungi il monitoring service come target nel file di configurazione.
# prometheus.yml
global:
scrape_interval: 60s
evaluation_interval: 60s
rule_files:
- "ml_drift_alerts.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ["alertmanager:9093"]
scrape_configs:
- job_name: "ml-monitoring"
static_configs:
- targets: ["ml-monitoring-service:8000"]
metrics_path: "/metrics"
scrape_interval: 60s
- job_name: "model-serving"
static_configs:
- targets: ["fastapi-serving:8080"]
metrics_path: "/metrics"
---
# ml_drift_alerts.yml
groups:
- name: ml_drift_alerts
rules:
- alert: HighFeatureDrift
expr: ml_feature_drift_psi{} > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "High drift detected on feature {{ $labels.feature_name }}"
description: "PSI = {{ $value | humanize }} for feature {{ $labels.feature_name }}"
- alert: DatasetDriftDetected
expr: ml_dataset_drift_detected == 1
for: 10m
labels:
severity: critical
annotations:
summary: "Dataset-level drift detected for model {{ $labels.model_name }}"
description: "Model performance may be degraded. Consider retraining."
- alert: LowEstimatedAUC
expr: ml_estimated_auc < 0.70
for: 15m
labels:
severity: critical
annotations:
summary: "Estimated AUC dropped below threshold"
description: "Estimated AUC = {{ $value | humanize }} for model {{ $labels.model_name }}"
Dashboard Grafana: Metriche Chiave da Monitorare
- PSI per feature: heatmap con soglie 0.1/0.2 colorate (verde/giallo/rosso)
- Drift score nel tempo: grafico a linee per feature critiche
- AUC stimata (CBPE): time series con bande di confidenza
- Numero di feature driftate: gauge con soglia di alert
- Distribuzione predizioni: istogramma score di probabilità
- Latenza e throughput: panel standard per SLA monitoring
Pipeline di Retraining Automatico
Rilevare il drift e necessario ma non sufficiente: bisogna anche reagire automaticamente. Una pipeline di retraining automatico deve essere attivata da alert di drift, validare il nuovo modello prima di sostituire quello in produzione e garantire rollback in caso di regressione delle performance.
# retraining_pipeline.py
# Pipeline di retraining automatico con MLflow
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score
from datetime import datetime
import logging
import requests
logger = logging.getLogger(__name__)
MLFLOW_TRACKING_URI = "http://mlflow-server:5000"
MODEL_NAME = "churn-prediction"
MIN_AUC_THRESHOLD = 0.72 # AUC minima per promuovere in produzione
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
def load_fresh_training_data() -> pd.DataFrame:
"""Carica dati aggiornati per il retraining."""
# In produzione: query al feature store o data warehouse
df = pd.read_parquet("data/training_data_fresh.parquet")
logger.info(f"Loaded {len(df)} training samples")
return df
def train_new_model(df: pd.DataFrame) -> tuple:
"""Addestra un nuovo modello con i dati freschi."""
feature_columns = [
"age", "tenure_months", "monthly_charges",
"total_charges", "num_support_tickets",
"contract_type_encoded", "payment_method_encoded"
]
target_column = "churned"
X = df[feature_columns]
y = df[target_column]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=4,
learning_rate=0.05,
subsample=0.8,
random_state=42
)
model.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_val)[:, 1]
y_pred = model.predict(X_val)
metrics = {
"auc": roc_auc_score(y_val, y_pred_proba),
"f1": f1_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred),
"recall": recall_score(y_val, y_pred),
"val_samples": len(X_val)
}
return model, metrics, feature_columns
def register_and_promote_model(
model,
metrics: dict,
feature_columns: list,
trigger_reason: str
) -> bool:
"""Registra il modello in MLflow e promuovilo in produzione se supera la soglia."""
with mlflow.start_run(run_name=f"retrain_{datetime.utcnow().strftime('%Y%m%d_%H%M')}") as run:
# Log params
mlflow.log_param("trigger_reason", trigger_reason)
mlflow.log_param("training_timestamp", datetime.utcnow().isoformat())
mlflow.log_param("features", feature_columns)
# Log metrics
for metric_name, metric_value in metrics.items():
if isinstance(metric_value, (int, float)):
mlflow.log_metric(metric_name, metric_value)
# Log model
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name=MODEL_NAME
)
run_id = run.info.run_id
logger.info(f"Model registered with run_id={run_id}, AUC={metrics['auc']:.4f}")
# Promuovi in produzione se supera la soglia
if metrics["auc"] >= MIN_AUC_THRESHOLD:
client = mlflow.tracking.MlflowClient()
latest_version = client.get_latest_versions(MODEL_NAME, stages=["None"])[0]
client.transition_model_version_stage(
name=MODEL_NAME,
version=latest_version.version,
stage="Production",
archive_existing_versions=True
)
logger.info(f"Model v{latest_version.version} promoted to Production")
send_slack_notification(f"Model retrained and promoted. AUC={metrics['auc']:.4f}")
return True
else:
logger.warning(f"Model AUC {metrics['auc']:.4f} below threshold {MIN_AUC_THRESHOLD}. Not promoting.")
send_slack_notification(
f"Retraining completed but model below threshold. AUC={metrics['auc']:.4f}. Manual review needed.",
level="warning"
)
return False
def send_slack_notification(message: str, level: str = "info"):
"""Invia notifica Slack (o webhook generico)."""
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
color = "#36a64f" if level == "info" else "#ff0000"
payload = {
"attachments": [{
"color": color,
"title": "MLOps Retraining Alert",
"text": message,
"footer": f"ML Platform | {datetime.utcnow().isoformat()}"
}]
}
try:
requests.post(webhook_url, json=payload, timeout=5)
except Exception as e:
logger.error(f"Failed to send Slack notification: {e}")
def run_retraining_pipeline(trigger_reason: str = "drift_detected"):
"""Entry point della pipeline di retraining."""
logger.info(f"Starting retraining pipeline. Trigger: {trigger_reason}")
df = load_fresh_training_data()
model, metrics, feature_columns = train_new_model(df)
promoted = register_and_promote_model(model, metrics, feature_columns, trigger_reason)
logger.info(f"Retraining pipeline completed. Promoted: {promoted}")
return promoted
if __name__ == "__main__":
run_retraining_pipeline(trigger_reason="manual_trigger")
Strategie di Trigger per il Retraining
Definire quando fare retraining e tanto importante quanto come farlo. Esistono tre strategie principali, ognuna con vantaggi e limitazioni:
Strategie di Retraining a Confronto
- Schedule-based (calendario): Retraining periodico fisso (settimanale, mensile). Semplice da implementare ma inefficiente: fa retraining anche quando non serve e potrebbe non fare retraining abbastanza spesso durante periodi di drift rapido.
- Performance-based: Retraining quando le metriche di performance scendono sotto una soglia. Richiede ground truth disponibile rapidamente. Ideale per modelli con feedback loop veloce (es. click-through rate, conversion).
- Drift-based: Retraining quando viene rilevato drift statisticamente significativo nelle feature o nelle predizioni. Non richiede label. Approccio proattivo che previene il degrado prima che impatti le performance. Rischio di falsi positivi.
- Ibrido (raccomandato): Combina drift detection come trigger primario con validazione delle performance come gate di qualità prima della promozione in produzione. Aggiunge anche un retraining periodico di fallback.
Configurazione Completa con Docker Compose
Per ambienti di sviluppo e staging, Docker Compose permette di avviare l'intero stack di monitoring in modo rapido e riproducibile.
# docker-compose.monitoring.yml
version: "3.8"
services:
# ML Monitoring Service (FastAPI + Evidently)
ml-monitoring:
build: ./monitoring_service
ports:
- "8001:8000"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- REFERENCE_DATA_PATH=/data/reference.parquet
volumes:
- ./data:/data
- ./reports:/reports
depends_on:
- mlflow
# MLflow Tracking Server
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: >
mlflow server
--host 0.0.0.0
--port 5000
--backend-store-uri postgresql://mlflow:mlflow@postgres/mlflow
--default-artifact-root s3://mlflow-artifacts/
depends_on:
- postgres
# PostgreSQL per MLflow
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=mlflow
- POSTGRES_PASSWORD=mlflow
- POSTGRES_DB=mlflow
volumes:
- postgres_data:/var/lib/postgresql/data
# Prometheus
prometheus:
image: prom/prometheus:v2.50.1
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- ./monitoring/alerts.yml:/etc/prometheus/alerts.yml
- prometheus_data:/prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.retention.time=30d"
# Grafana
grafana:
image: grafana/grafana:10.3.3
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
- grafana_data:/var/lib/grafana
depends_on:
- prometheus
# Alertmanager
alertmanager:
image: prom/alertmanager:v0.27.0
ports:
- "9093:9093"
volumes:
- ./monitoring/alertmanager.yml:/etc/alertmanager/alertmanager.yml
volumes:
postgres_data:
prometheus_data:
grafana_data:
Budget <5K EUR/Anno per PMI
Un sistema completo di drift detection non richiede budget enterprise. Con l'approccio open-source e cloud-native, e possibile mantenere un sistema robusto con costi minimi:
- Evidently AI + NannyML: Open-source, gratuito
- MLflow (self-hosted): Open-source, costi solo infrastruttura
- Prometheus + Grafana: Open-source, gratuito
- Compute (VPS/cloud): ~50-100 EUR/mese per VM media (600-1200 EUR/anno)
- Storage S3-compatible: ~20 EUR/mese per 500GB (240 EUR/anno)
- Totale stimato: ~1000-2000 EUR/anno per stack completo
Best Practices per la Drift Detection in Produzione
Checklist di Produzione
- Definisci baseline statistiche prima del deploy: Esegui drift detection contro se stessa sulla validation set per calibrare le soglie. Un PSI > 0 su dati stazionari indica overfitting della soglia.
- Usa finestre temporali appropriate: Non confrontare tutto il traffico storico con oggi. Usa finestre scorrevoli (7/14/30 giorni) per catturare drift recente.
- Prioritizza le feature per importanza: Monitora più aggressivamente le feature ad alto impatto SHAP. Non tutti i drift sono ugualmente critici.
- Distingui drift tecnico da drift semantico: Un cambio nel formato di un campo (es. da stringa a numero) e un bug di engineering, non drift ML. Aggiungi data quality checks separati.
- Evita alert fatigue: Imposta soglie conservative inizialmente e affina nel tempo. Troppi alert porta a ignorarli tutti.
- Loggare le decisioni di retraining: Ogni retraining deve essere tracciato con MLflow, incluso il motivo del trigger, le metriche pre/post e la versione del modello promossa.
- Test del detector stesso: Verifica periodicamente che il sistema di detection funzioni correttamente con data injection testing (inietta drift sintetico e verifica che venga rilevato).
Anti-Patterns da Evitare
- Retraining automatico senza gate di qualità: Non promuovere in produzione un modello appena addestrato senza validazione delle performance. Un retraining su dati contaminati può peggiorare il modello.
- Monitoring solo dell'output: Monitorare solo le predizioni senza le feature di input rende impossibile diagnosticare la causa del drift.
- Soglie fisse per tutti i modelli: Ogni modello ha sensibilita diverse al drift. PSI > 0.2 può essere catastrofico per un modello critico e irrilevante per un modello di bassa priorità.
- Ignorare il concept drift: Se non si raccolgono label di feedback dal modello in produzione, e impossibile rilevare il concept drift direttamente. Investi nell'infrastruttura di feedback loop.
Conclusioni e Prossimi Passi
Un sistema di drift detection e retraining automatico e il cuore di ogni MLOps maturo. Senza monitoring attivo, i modelli ML in produzione degradano silenziosamente, generando decisioni sbagliate che possono costare molto più del costo del sistema di monitoring stesso.
In questa guida abbiamo costruito un sistema completo: dalla comprensione teorica dei quattro tipi di drift, all'implementazione pratica con Evidently AI per i report interattivi, NannyML per la stima delle performance senza label e Alibi Detect per il rilevamento multivariato avanzato. Abbiamo integrato il tutto con Prometheus, Grafana e una pipeline di retraining automatico con MLflow.
Il passo successivo e integrare questo sistema con il serving FastAPI che abbiamo visto nell'articolo precedente e con lo scaling Kubernetes che vedremo nel prossimo. Con questi componenti, avrai un sistema MLOps completo, production-grade e mantenibile.
Continua la Serie MLOps
- Articolo precedente: Experiment Tracking con MLflow: Guida Completa - registrare esperimenti e confrontare modelli
- Articolo successivo: Serving Modelli: FastAPI + Uvicorn in Produzione - costruire inference API scalabili
- Approfondimento: Scaling ML su Kubernetes - orchestrare il deployment con KubeFlow e Seldon
- Serie correlata: Deep Learning Avanzato - monitoring per modelli neurali complessi







