Detectarea derivării modelului și recalificare automată în producție
În sfârșit, ați implementat modelul dvs. în producție. Valorile sunt excelente, echipa este mulțumită iar părțile interesate aplaudă. Apoi, săptămâni mai târziu, cineva observă că previziunile par mai puțin exacte. O lună mai târziu, modelul este în mod clar degradat. Bine ați venit la cea mai furtunoasă problemă a învățării automate în producție: cel deriva de model.
Conform cercetărilor Gartner, dincolo de 65% dintre modelele ML din producție se degradează semnificativ semnificativă în decurs de 12 luni de la implementare, adesea fără ca echipele să-și dea seama la timp. Datele sunt și mai îngrijorătoare în retail și finanțe, unde distribuțiile datelor se schimbă rapid ca răspuns la tendințele pieței, sezonalitatea și comportamentul utilizatorilor.
În acest ghid vom construi un sistem complet de detectarea derivei și reinstruire automată: vom înțelege diferitele tipuri de drift, vom implementa detectoare cu Evidently AI, NannyML și Alibi Detect, vom configura teste statistice (KS, PSI, Chi-Square), vom integra Prometheus si Grafana pentru monitorizare continuă și vom crea conducte automate de recalificare declanșate de alerte.
Ce vei învăța
- Diferența dintre deriva de date, deriva de concept, deriva de caracteristică și deriva de etichetă
- Teste statistice pentru detectarea derivei: test KS, PSI, Chi-Pătrat, MMD
- Implementare practică cu Evidently AI, NannyML și Alibi Detect
- Monitorizare tablou de bord cu Prometheus și Grafana
- Conductă automată de alertă și reinstruire cu MLflow
- Cele mai bune practici pentru MLO-uri de producție cu buget redus
De ce deriva este o problemă critică
Lumea reală nu este statică. Datele pe care modelul dvs. le-a văzut în timpul antrenamentului s-au reflectat o distribuție statistică specifică, un „instantaneu” al lumii în acel moment. Dar lumea continuă să se schimbe: obiceiurile utilizatorilor evoluează, piețele fluctuează, sistemele din amonte schimba formatul datelor, apar evenimente neașteptate precum pandemii sau crize economice.
Problema fundamentală a derivei este degradare tăcută: modelul se oprește pentru a fi precis, dar totuși să producă predicții fără erori tehnice. Serviciul răspunde cu HTTP 200, jurnalele nu arată excepții, dar deciziile bazate pe acele predicții sunt devenind din ce în ce mai greșit. Fără un sistem activ de monitorizare, această degradare poate trece neobservat luni de zile.
Impactul economic al derivei nedetectate
Un model degradat de detectare a fraudei poate lăsa tranzacțiile frauduloase să rămână nedetectate. Un sistem de prețuri care variază poate costa milioane în prețuri necompetitive. Un model de predicție degradată a pierderii duce la campanii de reținere risipite pe clienții greșiți. Costul monitorizării este întotdeauna mai mic decât costul derivei nedetectate.
Taxonomie de deriva: patru tipuri fundamentale
Înainte de a implementa soluții, este esențial să înțelegeți Ce este în derivă. Ele există patru categorii principale de derive, fiecare cu cauze diferite și strategii de detectare diferite.
1. Derivarea datelor (deplasare covariabilă)
Il deriva de date, cunoscut și ca deplasare covariabilă, se întâmplă când distribuția caracteristicilor de intrare P(X) se modifică în comparație cu antrenament, dar relația dintre caracteristica și eticheta P(Y|X) rămân stabile. Exemplu clasic: modelul a fost antrenat utilizatori dintr-o anumită grupă de vârstă, dar produsul este adoptat de o nouă categorie demografică.
Derivarea datelor este cel mai comun tip și cel mai ușor de detectat, deoarece necesită doar monitorizare distribuțiile caracteristicilor de intrare, fără a fi nevoie de etichete. Poate fi detectat și în în timp real, înainte ca rezultatele să afecteze predicțiile.
2. Conceptul de deriva
Il deriva conceptului și mai insidios: relația P(Y|X) dintre caracteristici și etichete se modifică, chiar dacă distribuția caracteristicii X rămâne stabilă. Exemplu: un model de Analiza de sentimente instruită pe tweet-uri din 2022 nu înțelege jargonul anului 2025. Semantica cuvintelor (X) s-a schimbat, astfel încât maparea X → Y este diferită.
Derivarea conceptului necesită ca adevărul de la sol să fie detectat direct: trebuie comparat predicții cu etichete reale. Când acestea întârzie să sosească (ca în scenarii de predicție de abandon cu ferestre de observare de 90 de zile), sunt utilizate valorile proxy cum ar fi deriva de predicție sau distribuția scorului de probabilitate.
3. Deriva caracteristică
Il deriva caracteristică și un subset de deriva de date care se referă la specificații caracteristici critice pentru model. Nu toate caracteristicile au același impact: o caracteristică cu o importanță ridicată, care derivă și este mult mai critic decât o caracteristică de relevanță scăzută. Instrumentele de importanță caracteristică (SHAP, importanța permutării) ajută la prioritizarea monitorizării.
4. Derivarea etichetei (schimbarea probabilității anterioară)
Il deriva de etichetă apare atunci când distribuția etichetelor țintă P(Y) schimbare. Într-un model de clasificare binar (spam/non-spam), dacă dintr-o dată 90% dintre mesaje sunt spam în loc de 10% obișnuit, modelul este calibrat pentru o singură distribuție diferite și previziunile vor fi distorsionate. Acest tip de derivă este comun în scenariile cu dezechilibru de clasă variabilă în timp.
Rezumatul tipurilor de deriva
- Deriva datei: P(X) se modifică, P(Y|X) stabil. Descoperibil fără etichetă.
- Derivarea conceptului: P(Y|X) se modifică. Necesită valori pentru etichetă sau proxy.
- Caracteristici Drift: Caracteristicile specifice se schimbă. Prioritate bazată pe importanță.
- Derivarea etichetei: P(Y) se modifică. Monitorizați distribuția predicțiilor.
Teste statistice pentru detectarea derivei
Detectarea statistică a derivei se bazează pe comparația între două distribuții: distribuția referință (formare sau o perioadă stabilă de producție) și distribuția curentă (fereastra de monitorizare). Testele statistice diferite au caracteristici diferite în termeni de sensibilitate, interpretabilitate și cost de calcul.
Testul Kolmogorov-Smirnov (KS)
Il Testul KS și cel mai folosit pentru caracteristici continue. Măsurați distanța maximă între funcţiile de distribuţie cumulativă (CDF) ale celor două distribuţii. Valoarea p obţinută indică probabilitatea ca cele două eșantioane să provină din aceeași distribuție: valoare p scăzută (de obicei < 0,05) semnalează o deriva semnificativă statistic.
Avantaje: nu presupune o distributie specifica (neparametrica), robusta, usor de utilizat interpreta vizual. Limitări: sensibil la cozile de distribuție, mai puțin puternic cu mostre mici, poate da rezultate false pozitive cu seturi de date mari.
Indicele de stabilitate a populației (PSI)
Il PSI și născut în sectorul bancar pentru a monitoriza stabilitatea distribuirea scorurilor de risc. Împarte ambele distribuții în găleți și calculează suma diferenţelor ponderate dintre proporţii. Interpretarea standard este:
- PSI < 0,1: nicio modificare semnificativă
- PSI 0,1 - 0,2: modificare ușoară, monitor
- PSI > 0,2: schimbare semnificativă, acțiune necesară
PSI este foarte intuitiv pentru părțile interesate de afaceri și se aplică ambelor funcții continue (cu discretizare în decile) și categoric. Și mai ales popular în modele de scoring de credit și de detectare a fraudei.
Testul Chi-Pătrat
Il Testul Chi-Pătrat și testul de bază pentru caracteristicile categoriale. Comparați frecvențele observate cu cele așteptate și produce o valoare p. Și potrivite atunci când caracteristicile au un număr limitat de categorii și eșantioanele sunt suficient de mari (frecvență așteptați > 5 pentru fiecare categorie). Pentru caracteristicile cu cardinalitate mare, se recomandă gruparea categorii rare.
Discrepanța medie maximă (MMD)
L'MMD și un test bazat pe nucleu care măsoară distanța dintre două distribuții într-un spațiu Hilbert. Este deosebit de puternic pentru detectarea diferențelor de structuri multivariat și este utilizat de Alibi Detect pentru derivarea datelor tabelare, imagini și text. Avantajul este că nu necesită alegerea găleților sau a parametrilor de discretizare.
Implementare cu Evidently AI
Evident AI a devenit biblioteca standard open-source pentru monitorizare de modele ML în Python, cu peste 20 de milioane de descărcări. Oferă presetări predefinite pentru cele mai comune cazuri de utilizare și se integrează cu orice orchestrator de flux de lucru.
# Installazione
pip install evidently
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, ClassificationPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
ColumnSummaryMetric
)
# --- Setup dati di riferimento e produzione ---
# Carica training data (reference)
reference_data = pd.read_parquet("data/training_features.parquet")
# Carica batch produzione ultimo mese
current_data = pd.read_parquet("data/production_batch_2025_02.parquet")
# Feature columns
feature_columns = [
"age", "tenure_months", "monthly_charges",
"total_charges", "num_support_tickets",
"contract_type", "payment_method"
]
# --- Report Data Drift ---
drift_report = Report(metrics=[
DatasetDriftMetric(), # overall drift summary
DataDriftTable(), # per-feature drift table
ColumnDriftMetric(column_name="monthly_charges"),
ColumnDriftMetric(column_name="contract_type"),
ColumnSummaryMetric(column_name="monthly_charges"),
])
drift_report.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Salva report HTML interattivo
drift_report.save_html("reports/drift_report_2025_02.html")
# Estrai metriche programmaticamente
report_dict = drift_report.as_dict()
dataset_drift = report_dict["metrics"][0]["result"]
print(f"Dataset drift detected: {dataset_drift['dataset_drift']}")
print(f"Features drifted: {dataset_drift['number_of_drifted_columns']}/{dataset_drift['number_of_columns']}")
print(f"Share of drifted features: {dataset_drift['share_of_drifted_columns']:.1%}")
Evident generează rapoarte HTML interactive cu vizualizări ale distribuțiilor, histograme suprapuneri și tabele rezumative. Testul statistic este raportat pentru fiecare caracteristică utilizate (alese automat în funcție de tipul de date), valoarea p sau statistica testului, și un steag în derivă/no-drift.
Suită de testare cu praguri personalizate
Pentru a integra Evidently într-o conductă CI/CD sau flux de lucru Airflow/Prefect, Suita de teste de Evident este instrumentul potrivit: vă permite să definiți praguri precis și returnează pass/fail în mod programatic.
from evidently.test_suite import TestSuite
from evidently.tests import (
TestNumberOfDriftedColumns,
TestShareOfDriftedColumns,
TestColumnDrift,
TestDatasetDrift
)
# --- Test Suite con soglie personalizzate ---
drift_test_suite = TestSuite(tests=[
# Non più del 20% delle feature deve driftare
TestShareOfDriftedColumns(lt=0.2),
# Feature critiche: test individuali con soglie aggressive
TestColumnDrift(
column_name="monthly_charges",
stattest="ks",
stattest_threshold=0.05
),
TestColumnDrift(
column_name="contract_type",
stattest="chi2",
stattest_threshold=0.05
),
TestColumnDrift(
column_name="num_support_tickets",
stattest="psi",
stattest_threshold=0.1 # PSI < 0.1 = no drift
),
# Dataset-level drift test
TestDatasetDrift(stattest_threshold=0.05),
])
drift_test_suite.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Risultato pass/fail per la pipeline
test_result = drift_test_suite.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in test_result["tests"]
)
if not all_passed:
print("DRIFT DETECTED - Pipeline triggering retraining...")
for test in test_result["tests"]:
if test["status"] != "SUCCESS":
print(f" FAILED: {test['name']} - {test['description']}")
# Trigger retraining (vedi sezione retraining)
trigger_retraining_pipeline()
else:
print("All drift tests passed - Model healthy")
Monitorizare cu NannyML: Performanță fără etichete
NannyML rezolvă una dintre cele mai dificile probleme în monitorizarea modelului: estimați performanța modelului atunci când etichetele reale nu sunt încă disponibile. Într-un model de predicție a abandonului, etichetele (dacă clientul a reușit efectiv) pot ajunge la numai 90 de zile de la predicție. NannyML folosește metoda Estimarea performanței bazată pe încredere (CBPE) pentru a estima acuratețea, F1 și AUC în timp real folosind doar distribuțiile de scor.
pip install nannyml
import nannyml as nml
import pandas as pd
# Carica i dati
reference_df = pd.read_parquet("data/reference_with_targets.parquet")
analysis_df = pd.read_parquet("data/production_last_30_days.parquet")
# --- CBPE: Stima delle performance senza label ---
estimator = nml.CBPE(
y_pred_proba="churn_probability",
y_pred="churn_predicted",
y_true="churned", # presente solo nel reference
timestamp_column_name="prediction_date",
problem_type="binary_classification",
metrics=["roc_auc", "f1", "precision", "recall"],
chunk_size=500 # 500 predizioni per chunk temporale
)
estimator.fit(reference_df)
results = estimator.estimate(analysis_df)
# Visualizza risultati con alert automatici
figure = results.plot()
figure.show()
# Estrai metriche per alerting
estimated_metrics = results.to_df()
latest_chunk = estimated_metrics.tail(1)
auc_lower = latest_chunk["estimated_roc_auc_lower_confidence_boundary"].values[0]
if auc_lower < 0.70:
print(f"ALERT: AUC stimato < 0.70 (lower bound: {auc_lower:.3f})")
trigger_retraining_pipeline()
# --- Univariate Drift Detection ---
univariate_calc = nml.UnivariateDriftCalculator(
column_names=["monthly_charges", "tenure_months", "num_tickets"],
timestamp_column_name="prediction_date",
continuous_methods=["kolmogorov_smirnov", "jensen_shannon"],
categorical_methods=["chi2", "jensen_shannon"],
chunk_size=500
)
univariate_calc.fit(reference_df)
drift_results = univariate_calc.calculate(analysis_df)
# Plotta il drift nel tempo per ogni feature
drift_figure = drift_results.filter(period="analysis").plot()
drift_figure.show()
NannyML produce grafice temporale care arată evoluția derivei în timp, cu benzi încredere și alerte vizuale. Acest lucru este deosebit de util de înțeles Când a început deriva și dacă se înrăutățește sau se stabilizează.
Alibi Detect: Detectare avansată a derivei cu MMD și LSDD
Alibi Detect (de Seldon) și biblioteca de referință pentru detectarea avansată care depăşeşte statisticile univariate. Suportă MMD (Discrepanță medie maximă) pentru date tabulare și imagini, LSDD (Least-Squares Density Difference) și detectarea valorii aberante. Este ideală atunci când trebuie să detectați o deriva multivariată complexă.
pip install alibi-detect
import numpy as np
from alibi_detect.cd import MMDDrift, KSDrift, TabularDrift
from alibi_detect.saving import save_detector, load_detector
# Carica dati di riferimento (numpy array)
X_ref = reference_data[feature_columns].values.astype(np.float32)
X_current = current_data[feature_columns].values.astype(np.float32)
# --- KS Drift per feature continue ---
ks_detector = KSDrift(
x_ref=X_ref,
p_val=0.05, # soglia p-value
alternative="two-sided"
)
ks_preds = ks_detector.predict(
X_current,
drift_type="batch",
return_p_val=True,
return_distance=True
)
print("KS Drift Results:")
print(f" Drift detected: {ks_preds['data']['is_drift']}")
print(f" p-values per feature: {ks_preds['data']['p_val']}")
print(f" Features drifted: {ks_preds['data']['is_drift'].sum()}")
# --- MMD Drift per rilevazione multivariata ---
# Più potente per distribuzioni complesse
mmd_detector = MMDDrift(
x_ref=X_ref,
backend="pytorch", # o "tensorflow"
p_val=0.05,
n_permutations=200 # più alto = più preciso ma più lento
)
mmd_preds = mmd_detector.predict(
X_current,
return_p_val=True,
return_distance=True
)
print(f"\nMMD Drift (multivariato):")
print(f" Drift detected: {mmd_preds['data']['is_drift']}")
print(f" p-value: {mmd_preds['data']['p_val']:.4f}")
print(f" MMD^2 statistic: {mmd_preds['data']['distance']:.6f}")
# --- TabularDrift: test ottimizzato per dati tabulari misti ---
tabular_detector = TabularDrift(
x_ref=X_ref,
p_val=0.05,
categories_per_feature={
4: None, # feature index 4 = contract_type (categorica)
6: None # feature index 6 = payment_method (categorica)
},
)
# Salva detector per riutilizzo
save_detector(tabular_detector, "models/drift_detector/")
# Successivamente carica e usa
# loaded_detector = load_detector("models/drift_detector/")
Arhitectura sistemului de monitorizare
Un sistem de monitorizare la nivel de producție necesită mai multe componente integrate: un strat de colectare de valori, o stocare a serii de timp, un sistem de vizualizare și un motor de alertare. Combinația Prometeu + Grafana și standardul open-source pentru acest caz de utilizare, cu integrare extinsă în ecosistemul Kubernetes.
# monitoring_service.py
# Servizio FastAPI che espone metriche di drift per Prometheus
from fastapi import FastAPI, BackgroundTasks
from prometheus_client import Counter, Gauge, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import pandas as pd
import schedule
import threading
import time
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="ML Monitoring Service")
# --- Prometheus Metrics ---
DRIFT_GAUGE = Gauge(
"ml_feature_drift_psi",
"Population Stability Index per feature",
labelnames=["feature_name", "model_name", "model_version"]
)
DATASET_DRIFT_GAUGE = Gauge(
"ml_dataset_drift_detected",
"1 se drift rilevato a livello dataset, 0 altrimenti",
labelnames=["model_name", "model_version"]
)
DRIFT_FEATURES_COUNT = Gauge(
"ml_drifted_features_count",
"Numero di feature che mostrano drift",
labelnames=["model_name"]
)
ESTIMATED_AUC = Gauge(
"ml_estimated_auc",
"AUC stimato via CBPE (NannyML)",
labelnames=["model_name", "model_version"]
)
PREDICTION_COUNT = Counter(
"ml_predictions_total",
"Numero totale di predizioni",
labelnames=["model_name", "outcome"]
)
INFERENCE_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Latenza inference in secondi",
labelnames=["model_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]
)
# --- Funzione di calcolo drift ---
def calculate_and_update_drift_metrics(
model_name: str,
model_version: str,
reference_data: pd.DataFrame,
current_data: pd.DataFrame,
feature_columns: list
):
"""Calcola PSI per ogni feature e aggiorna gauge Prometheus."""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
report = Report(metrics=[
DatasetDriftMetric(stattest="psi"),
DataDriftTable(stattest="psi"),
])
report.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
result = report.as_dict()
# Dataset-level drift
dataset_result = result["metrics"][0]["result"]
drift_detected = 1 if dataset_result["dataset_drift"] else 0
DATASET_DRIFT_GAUGE.labels(
model_name=model_name,
model_version=model_version
).set(drift_detected)
DRIFT_FEATURES_COUNT.labels(
model_name=model_name
).set(dataset_result["number_of_drifted_columns"])
# Per-feature PSI
feature_results = result["metrics"][1]["result"]["drift_by_columns"]
for feature_name, feature_data in feature_results.items():
psi_value = feature_data.get("stattest_threshold", 0)
actual_stat = feature_data.get("drift_score", 0)
DRIFT_GAUGE.labels(
feature_name=feature_name,
model_name=model_name,
model_version=model_version
).set(actual_stat)
logger.info(f"Drift metrics updated for {model_name} v{model_version}")
return drift_detected
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/drift/check")
async def trigger_drift_check(background_tasks: BackgroundTasks):
"""Trigger manuale del drift check."""
background_tasks.add_task(run_drift_check_job)
return {"status": "drift check started"}
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
Configurația Prometheus și Grafana
Configurarea Prometheus pentru scraping metrics ML este simplă: adăugați serviciul de monitorizare ca țintă în fișierul de configurare.
# prometheus.yml
global:
scrape_interval: 60s
evaluation_interval: 60s
rule_files:
- "ml_drift_alerts.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ["alertmanager:9093"]
scrape_configs:
- job_name: "ml-monitoring"
static_configs:
- targets: ["ml-monitoring-service:8000"]
metrics_path: "/metrics"
scrape_interval: 60s
- job_name: "model-serving"
static_configs:
- targets: ["fastapi-serving:8080"]
metrics_path: "/metrics"
---
# ml_drift_alerts.yml
groups:
- name: ml_drift_alerts
rules:
- alert: HighFeatureDrift
expr: ml_feature_drift_psi{} > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "High drift detected on feature {{ $labels.feature_name }}"
description: "PSI = {{ $value | humanize }} for feature {{ $labels.feature_name }}"
- alert: DatasetDriftDetected
expr: ml_dataset_drift_detected == 1
for: 10m
labels:
severity: critical
annotations:
summary: "Dataset-level drift detected for model {{ $labels.model_name }}"
description: "Model performance may be degraded. Consider retraining."
- alert: LowEstimatedAUC
expr: ml_estimated_auc < 0.70
for: 15m
labels:
severity: critical
annotations:
summary: "Estimated AUC dropped below threshold"
description: "Estimated AUC = {{ $value | humanize }} for model {{ $labels.model_name }}"
Tabloul de bord Grafana: valori cheie de monitorizat
- PSI pentru caracteristici: hartă termică cu praguri colorate 0,1/0,2 (verde/galben/roșu)
- Scorul de deriva în timp: Grafic cu linii pentru caracteristici critice
- AUC estimat (CBPE): serii temporale cu benzi de încredere
- Numărul de caracteristici în derivă: indicator cu prag de alertă
- Distribuția predicțiilor: histograma scorului de probabilitate
- Latență și debit: panou standard pentru monitorizarea SLA
Conducta de recalificare automată
Detectarea derivei este necesară, dar nu suficientă: trebuie să reacționați și automat. O conductă de recalificare automată trebuie să fie declanșată de alerte de deriva, validați noul model inainte de a-l inlocui pe cel aflat in productie si de a asigura rollback in caz regresia performanței.
# retraining_pipeline.py
# Pipeline di retraining automatico con MLflow
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score
from datetime import datetime
import logging
import requests
logger = logging.getLogger(__name__)
MLFLOW_TRACKING_URI = "http://mlflow-server:5000"
MODEL_NAME = "churn-prediction"
MIN_AUC_THRESHOLD = 0.72 # AUC minima per promuovere in produzione
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
def load_fresh_training_data() -> pd.DataFrame:
"""Carica dati aggiornati per il retraining."""
# In produzione: query al feature store o data warehouse
df = pd.read_parquet("data/training_data_fresh.parquet")
logger.info(f"Loaded {len(df)} training samples")
return df
def train_new_model(df: pd.DataFrame) -> tuple:
"""Addestra un nuovo modello con i dati freschi."""
feature_columns = [
"age", "tenure_months", "monthly_charges",
"total_charges", "num_support_tickets",
"contract_type_encoded", "payment_method_encoded"
]
target_column = "churned"
X = df[feature_columns]
y = df[target_column]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=4,
learning_rate=0.05,
subsample=0.8,
random_state=42
)
model.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_val)[:, 1]
y_pred = model.predict(X_val)
metrics = {
"auc": roc_auc_score(y_val, y_pred_proba),
"f1": f1_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred),
"recall": recall_score(y_val, y_pred),
"val_samples": len(X_val)
}
return model, metrics, feature_columns
def register_and_promote_model(
model,
metrics: dict,
feature_columns: list,
trigger_reason: str
) -> bool:
"""Registra il modello in MLflow e promuovilo in produzione se supera la soglia."""
with mlflow.start_run(run_name=f"retrain_{datetime.utcnow().strftime('%Y%m%d_%H%M')}") as run:
# Log params
mlflow.log_param("trigger_reason", trigger_reason)
mlflow.log_param("training_timestamp", datetime.utcnow().isoformat())
mlflow.log_param("features", feature_columns)
# Log metrics
for metric_name, metric_value in metrics.items():
if isinstance(metric_value, (int, float)):
mlflow.log_metric(metric_name, metric_value)
# Log model
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name=MODEL_NAME
)
run_id = run.info.run_id
logger.info(f"Model registered with run_id={run_id}, AUC={metrics['auc']:.4f}")
# Promuovi in produzione se supera la soglia
if metrics["auc"] >= MIN_AUC_THRESHOLD:
client = mlflow.tracking.MlflowClient()
latest_version = client.get_latest_versions(MODEL_NAME, stages=["None"])[0]
client.transition_model_version_stage(
name=MODEL_NAME,
version=latest_version.version,
stage="Production",
archive_existing_versions=True
)
logger.info(f"Model v{latest_version.version} promoted to Production")
send_slack_notification(f"Model retrained and promoted. AUC={metrics['auc']:.4f}")
return True
else:
logger.warning(f"Model AUC {metrics['auc']:.4f} below threshold {MIN_AUC_THRESHOLD}. Not promoting.")
send_slack_notification(
f"Retraining completed but model below threshold. AUC={metrics['auc']:.4f}. Manual review needed.",
level="warning"
)
return False
def send_slack_notification(message: str, level: str = "info"):
"""Invia notifica Slack (o webhook generico)."""
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
color = "#36a64f" if level == "info" else "#ff0000"
payload = {
"attachments": [{
"color": color,
"title": "MLOps Retraining Alert",
"text": message,
"footer": f"ML Platform | {datetime.utcnow().isoformat()}"
}]
}
try:
requests.post(webhook_url, json=payload, timeout=5)
except Exception as e:
logger.error(f"Failed to send Slack notification: {e}")
def run_retraining_pipeline(trigger_reason: str = "drift_detected"):
"""Entry point della pipeline di retraining."""
logger.info(f"Starting retraining pipeline. Trigger: {trigger_reason}")
df = load_fresh_training_data()
model, metrics, feature_columns = train_new_model(df)
promoted = register_and_promote_model(model, metrics, feature_columns, trigger_reason)
logger.info(f"Retraining pipeline completed. Promoted: {promoted}")
return promoted
if __name__ == "__main__":
run_retraining_pipeline(trigger_reason="manual_trigger")
Strategii de declanșare pentru recalificare
Defini Când recalificarea este la fel de importantă ca ca fă-o. Există trei strategii principale, fiecare cu avantaje și limitări:
Strategii de recalificare comparate
- Pe baza de orar (calendar): Recalificare periodică fixă (săptămânal, lunar). Simplu de implementat, dar ineficient: se recalifică chiar și atunci când nu este necesar și ar putea nerecalificarea suficient de des în perioadele de derivă rapidă.
- Bazat pe performanță: Reantrenați-vă când valorile de performanță scad sub un prag. Necesită adevărul de bază disponibil rapid. Ideal pentru modelele cu buclă rapidă de feedback (de exemplu, rata de clic, conversie).
- Bazat pe deriva: Reantrenare atunci când este detectată o deriva statistică semnificative în caracteristici sau predicții. Nu necesită etichete. Abordare proactivă care previne degradarea înainte de a afecta performanța. Risc de fals pozitive.
- Hibrid (recomandat): Combinați detectarea derivei ca declanșator principal cu validarea performanței ca o poartă de calitate înainte de promovarea la producție. De asemenea, adaugă o reinstruire periodică de rezervă.
Configurare completă cu Docker Compose
Pentru mediile de dezvoltare și de organizare, Docker Compose vă permite să lansați întreaga stivă monitorizare rapidă și reproductibilă.
# docker-compose.monitoring.yml
version: "3.8"
services:
# ML Monitoring Service (FastAPI + Evidently)
ml-monitoring:
build: ./monitoring_service
ports:
- "8001:8000"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- REFERENCE_DATA_PATH=/data/reference.parquet
volumes:
- ./data:/data
- ./reports:/reports
depends_on:
- mlflow
# MLflow Tracking Server
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: >
mlflow server
--host 0.0.0.0
--port 5000
--backend-store-uri postgresql://mlflow:mlflow@postgres/mlflow
--default-artifact-root s3://mlflow-artifacts/
depends_on:
- postgres
# PostgreSQL per MLflow
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=mlflow
- POSTGRES_PASSWORD=mlflow
- POSTGRES_DB=mlflow
volumes:
- postgres_data:/var/lib/postgresql/data
# Prometheus
prometheus:
image: prom/prometheus:v2.50.1
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- ./monitoring/alerts.yml:/etc/prometheus/alerts.yml
- prometheus_data:/prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.retention.time=30d"
# Grafana
grafana:
image: grafana/grafana:10.3.3
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
- grafana_data:/var/lib/grafana
depends_on:
- prometheus
# Alertmanager
alertmanager:
image: prom/alertmanager:v0.27.0
ports:
- "9093:9093"
volumes:
- ./monitoring/alertmanager.yml:/etc/alertmanager/alertmanager.yml
volumes:
postgres_data:
prometheus_data:
grafana_data:
Buget <5K EUR/an pentru IMM-uri
Un sistem complet de detectare a derivei nu necesită un buget al întreprinderii. Cu abordarea open-source și cloud-native, este posibil să se mențină un sistem robust cu costuri minime:
- Evident AI + NannyML: Open-source, gratuit
- MLflow (auto-găzduit): Open-source, numai costuri de infrastructură
- Prometheus + Grafana: Open-source, gratuit
- Calcul (VPS/cloud): ~50-100 EUR/lună pentru VM medie (600-1200 EUR/an)
- Stocare compatibilă cu S3: ~20 EUR/lună pentru 500 GB (240 EUR/an)
- Total estimat: ~1000-2000 EUR/an pentru stiva completă
Cele mai bune practici pentru detectarea derivei în producție
Lista de verificare a producției
- Definiți linia de referință statistică înainte de implementare: Rulați detectarea derivei față de sine pe setul de validare pentru a calibra pragurile. Un PSI > 0 pe date staționar indică supraajustarea pragului.
- Utilizați ferestre de timp adecvate: Nu compara tot traficul istoric cu azi. Folosiți ferestre glisante (7/14/30 de zile) pentru a capta deriva recentă.
- Prioritizează funcțiile după importanță: Monitorizați mai agresiv Caracteristici SHAP de mare impact. Nu toate derivele sunt la fel de critice.
- Distingeți deriva tehnică de deriva semantică: O schimbare de format a unui câmp (de exemplu, de la șir la număr) și o eroare de inginerie, nu deriva ML. Adăugați verificări separate ale calității datelor.
- Evitați oboseala alertă: Setați inițial praguri conservatoare și se rafinează în timp. Prea multe alerte duc la ignorarea lor pe toate.
- Înregistrarea deciziilor de recalificare: Fiecare recalificare trebuie să fie reprezentat cu MLflow, inclusiv motivul declanșării, valorile pre/post și versiunea model promovată.
- Testarea detectorului în sine: Verificați periodic dacă sistemul detectarea funcționează corect cu testarea injecției de date (injectarea derivă sintetică și verificați dacă este detectat).
Anti-modele de evitat
- Recalificare automată fără poartă de calitate: Nu promovați în producerea unui model nou antrenat fără validarea performanței. Recalificarea datelor contaminate poate înrăutăți modelul.
- Doar ieșire de monitorizare: Monitorizați doar previziunile fără caracteristicile de intrare fac imposibilă diagnosticarea cauzei derivei.
- Praguri fixe pentru toate modelele: Fiecare model are sensibilitate diferit de a deriva. PSI > 0,2 poate fi catastrofal pentru un model critic și irelevant pentru un model cu prioritate scăzută.
- Ignorați deriva conceptului: Dacă etichetele de feedback nu sunt colectate din modelul de producție, este imposibil să se detecteze direct deriva conceptului. Investește în infrastructura buclei de feedback.
Concluzii și pașii următori
Un sistem automat de detectare și reinstruire a derivei este inima fiecărui MLOps matur. Fără monitorizare activă, modelele ML aflate în producție se degradează, generând decizii greșite care pot costa mult mai mult decât costul sistemului de monitorizare în sine.
În acest ghid am construit un sistem complet: de la înțelegerea teoretică a patru tipuri de deriva, la implementare practică cu Evidently AI pentru rapoarte interactive, NannyML pentru estimarea performanței fără etichete și Alibi Detect pentru detectare multivariat avansat. Am integrat totul cu Prometheus, Grafana și o conductă recalificare automată cu MLflow.
Următorul pas este să integrăm acest sistem cu servirea FastAPI pe care am văzut-o în articolul precedent și cu scalarea Kubernetes pe care o vom vedea în următorul. Cu acestea componente, veți avea un sistem MLOps complet, de calitate de producție și care poate fi întreținut.
Seria MLOps continuă
- Articolul precedent: Urmărirea experimentelor cu MLflow: Ghid complet - înregistrarea experimentelor și compararea modelelor
- Articolul următor: Modele de servire: FastAPI + Uvicorn în producție - construiți API-uri de inferență scalabile
- Informații suplimentare: Scalare ML pe Kubernetes - orchestrați implementarea cu KubeFlow și Seldon
- Serii înrudite: Învățare profundă avansată - monitorizarea modelelor neuronale complexe







