Wykrywanie dryfu modelu i automatyczne przekwalifikowanie w produkcji
W końcu wdrożyłeś swój model do produkcji. Wskaźniki są doskonałe, zespół jest zadowolony a zainteresowane strony biją brawo. Kilka tygodni później ktoś zauważa, że przewidywania wydają się mniej dokładne. Po miesiącu model jest wyraźnie zdegradowany. Witamy w najbardziej podstępnym problemie uczenia maszynowego w produkcji: dryf modelu.
Według badań Gartnera poza 65% modeli ML znajdujących się w produkcji ulega znacznemu pogorszeniu istotne w ciągu 12 miesięcy od wdrożenia, często bez zauważenia tego przez zespoły na czas. Dane są jeszcze bardziej niepokojące w przypadku handlu detalicznego i finansów, gdzie zmieniają się rozkłady danych szybko reagując na trendy rynkowe, sezonowość i zachowania użytkowników.
W tym przewodniku zbudujemy kompletny system wykrywanie dryfu i automatyczne przekwalifikowanie: zrozumiemy różne rodzaje dryfu, wdrożymy detektory z Evidently AI, NannyML i Alibi Detect, skonfigurujemy testy statystyczne (KS, PSI, Chi-Square), zintegrujemy Prometheus i Grafana do monitoringu ciągły i utworzymy automatyczne potoki przekwalifikowania uruchamiane przez alerty.
Czego się nauczysz
- Różnica między dryfem danych, dryfem koncepcji, dryfem funkcji i dryfem etykiety
- Testy statystyczne do wykrywania dryftu: test KS, PSI, Chi-Square, MMD
- Praktyczne wdrożenie z Evidently AI, NannyML i Alibi Detect
- Panel monitorowania z Prometheusem i Grafaną
- Automatyczne powiadamianie i ponowne szkolenie za pomocą MLflow
- Najlepsze praktyki w zakresie niskobudżetowych MLOpsów klasy produkcyjnej
Dlaczego dryf jest krytycznym problemem
Prawdziwy świat nie jest statyczny. Dane, które model zobaczył podczas uczenia, zostały odzwierciedlone określony rozkład statystyczny, „migawka” świata w danym momencie. Ale świat ciągle się zmienia: nawyki użytkowników ewoluują, rynki się zmieniają, systemy wyższego szczebla zmienić format danych, wystąpią nieoczekiwane zdarzenia, takie jak pandemia lub kryzys gospodarczy.
Podstawowym problemem dryfu jest cicha degradacja: model zatrzymuje się być dokładne, ale nadal generować prognozy bez błędów technicznych. Serwis odpowiada w przypadku protokołu HTTP 200 dzienniki nie wykazują żadnych wyjątków, ale decyzje oparte na tych przewidywaniach tak coraz bardziej się myli. Bez aktywnego systemu monitorowania ta degradacja może pozostać niezauważonym przez wiele miesięcy.
Ekonomiczne skutki niewykrytego dryfu
Zepsuty model wykrywania oszustw może sprawić, że oszukańcze transakcje pozostaną niewykryte. Zmieniający się system cenowy może kosztować miliony w wyniku niekonkurencyjnych cen. Modelka przewidywania pogorszenia rezygnacji prowadzi do marnowania kampanii retencyjnych na niewłaściwych klientów. Koszt monitorowania jest zawsze niższy niż koszt niewykrytego dryfu.
Taksonomia dryfu: cztery podstawowe typy
Przed wdrożeniem rozwiązań konieczne jest zrozumienie Co to dryfuje. Istnieją cztery główne kategorie dryfu, każda z innymi przyczynami i różnymi strategiami wykrywania.
1. Dryf danych (przesunięcie współzmiennej)
Il dryf danych, znany również jako przesunięcie współzmiennej, dzieje się, gdy rozkład cech wejściowych P(X) zmienia się w porównaniu do uczenia, ale związek pomiędzy cecha i etykieta P(Y|X) pozostają stabilne. Klasyczny przykład: model był trenowany użytkowników w określonej grupie wiekowej, ale produkt jest akceptowany przez nową grupę demograficzną.
Dryf danych jest najczęstszym typem i najłatwiejszym do wykrycia, ponieważ wymaga jedynie monitorowania rozkłady cech wejściowych, bez potrzeby stosowania etykiet. Można go również wykryć w w czasie rzeczywistym, zanim wyniki wpłyną na prognozy.
2. Dryf koncepcji
Il dryf koncepcji i bardziej podstępny: związek P(Y|X) pomiędzy cechami i etykietami zmienia się, nawet jeśli rozkład cechy X pozostaje stabilny. Przykład: model Analiza nastrojów wytrenowana na tweetach z 2022 roku nie rozumie żargonu roku 2025. Semantyka słów (X) uległa zmianie, więc mapowanie X → Y jest inne.
Dryfowanie koncepcji wymaga bezpośredniego wykrycia podstawowej prawdy: należy ją porównać prognozy z prawdziwymi etykietami. Kiedy te spóźniają się (jak w scenariuszach przewidywania rezygnacji z 90-dniowych okien obserwacyjnych), stosuje się metryki zastępcze takie jak dryft przewidywań lub rozkłady wyników prawdopodobieństwa.
3. Dryf funkcji
Il dryf cech oraz podzbiór dryfu danych dotyczący specyfikacji cechy krytyczne modelu. Nie wszystkie funkcje mają ten sam wpływ: funkcję o wysokim znaczeniu, które dryfuje i jest znacznie bardziej krytyczne niż cecha o niskim znaczeniu. Narzędzia ważności funkcji (SHAP, znaczenie permutacji) pomagają ustalić priorytety monitorowania.
4. Dryf etykiety (wcześniejsze przesunięcie prawdopodobieństwa)
Il dryf etykiety występuje, gdy rozkład etykiet docelowych P(Y) zmienić. W binarnym modelu klasyfikacji (spam/nie-spam), jeśli nagle 90% wiadomości to spam zamiast zwykłych 10%, model jest skalibrowany dla jednej dystrybucji się różnić, a prognozy zostaną zniekształcone. Ten typ dryfu jest powszechny w scenariuszach z Nierównowaga klas zmienna w czasie.
Podsumowanie typów dryfu
- Przesunięcie daty: Zmiany P(X), P(Y|X) stabilne. Możliwość odnalezienia bez etykiety.
- Dryf koncepcji: Zmiany P(Y|X). Wymaga metryk etykiet lub serwerów proxy.
- Funkcje driftu: Zmieniają się określone funkcje. Priorytet oparty na ważności.
- Dryf etykiety: Zmiany P(Y). Monitoruj dystrybucję prognoz.
Testy statystyczne do wykrywania dryfu
Statystyczne wykrywanie dryfu opiera się na porównaniu dwóch rozkładów: rozkładu referencja (szkolenie lub stabilny okres produkcyjny) i bieżący rozkład (okno monitorowania). Różne testy statystyczne mają różne cechy pod względem czułości, interpretowalności i kosztów obliczeniowych.
Test Kołmogorowa-Smirnowa (KS)
Il Próba KS i najczęściej używany do funkcji ciągłych. Zmierz maksymalną odległość pomiędzy skumulowanymi funkcjami dystrybucji (CDF) dwóch rozkładów. Uzyskana wartość p wskazuje prawdopodobieństwo, że dwie próbki pochodzą z tego samego rozkładu: niska wartość p (zwykle < 0,05) sygnalizuje statystycznie istotny dryf.
Zalety: nie zakłada określonego rozkładu (nieparametrycznego), solidny, łatwy w obsłudze zinterpretować wizualnie. Ograniczenia: wrażliwe na ogony dystrybucyjne, słabsza moc w przypadku małych próbek może dawać fałszywe wyniki w przypadku dużych zbiorów danych.
Wskaźnik stabilności populacji (PSI)
Il PSI i urodzony w sektorze bankowym w celu monitorowania stabilności rozkład ocen ryzyka. Dzieli oba rozkłady na segmenty i oblicza suma ważonych różnic pomiędzy proporcjami. Standardowa interpretacja to:
- PSI < 0,1: brak znaczących zmian
- PSI 0,1 - 0,2: niewielka zmiana, monitor
- PSI > 0,2: Znacząca zmiana, wymagane działanie
PSI jest bardzo intuicyjny dla interesariuszy biznesowych i dotyczy obu funkcji ciągłych (z dyskretyzacją na decyle) i kategoryczne. Szczególnie popularne w modelach scoringu kredytowego i wykrywania oszustw.
Test chi-kwadrat
Il Test chi-kwadrat oraz test bazowy dla cech kategorycznych. Porównaj obserwowane częstotliwości z oczekiwanymi i daje wartość p. I właściwe, gdy funkcje mają ograniczoną liczbę kategorii, a próbki są wystarczająco duże (częstotliwość poczekaj > 5 dla każdej kategorii). W przypadku obiektów o dużej liczności zalecane jest grupowanie rzadkie kategorie.
Maksymalna średnia rozbieżność (MMD)
L'MMD oraz test oparty na jądrze, który mierzy odległość między dwiema dystrybucjami w przestrzeni Hilberta. Jest szczególnie skuteczny w wykrywaniu różnic w strukturach wielowymiarowy i jest używany przez Alibi Detect do dryfowania danych tabelarycznych, obrazów i tekstu. Zaletą jest to, że nie wymaga wyboru segmentów ani parametrów dyskretyzacji.
Wdrożenie z Evidently AI
Ewidentnie AI stała się standardową biblioteką typu open source do monitorowania modeli ML w Pythonie, z ponad 20 milionami pobrań. Oferuje predefiniowane ustawienia wstępne dla w najczęstszych przypadkach użycia i integruje się z dowolnym koordynatorem przepływu pracy.
# Installazione
pip install evidently
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, ClassificationPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
ColumnSummaryMetric
)
# --- Setup dati di riferimento e produzione ---
# Carica training data (reference)
reference_data = pd.read_parquet("data/training_features.parquet")
# Carica batch produzione ultimo mese
current_data = pd.read_parquet("data/production_batch_2025_02.parquet")
# Feature columns
feature_columns = [
"age", "tenure_months", "monthly_charges",
"total_charges", "num_support_tickets",
"contract_type", "payment_method"
]
# --- Report Data Drift ---
drift_report = Report(metrics=[
DatasetDriftMetric(), # overall drift summary
DataDriftTable(), # per-feature drift table
ColumnDriftMetric(column_name="monthly_charges"),
ColumnDriftMetric(column_name="contract_type"),
ColumnSummaryMetric(column_name="monthly_charges"),
])
drift_report.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Salva report HTML interattivo
drift_report.save_html("reports/drift_report_2025_02.html")
# Estrai metriche programmaticamente
report_dict = drift_report.as_dict()
dataset_drift = report_dict["metrics"][0]["result"]
print(f"Dataset drift detected: {dataset_drift['dataset_drift']}")
print(f"Features drifted: {dataset_drift['number_of_drifted_columns']}/{dataset_drift['number_of_columns']}")
print(f"Share of drifted features: {dataset_drift['share_of_drifted_columns']:.1%}")
Ewidentnie generuje interaktywne raporty HTML z wizualizacjami rozkładów, histogramami nakładki i tabele podsumowujące. Dla każdej cechy raportowany jest test statystyczny używana (automatycznie wybierana na podstawie typu danych), wartość p lub statystyka testowa, oraz flaga dryfu/braku dryfu.
Zestaw testów z niestandardowymi progami
Aby zintegrować Evidently z potokiem CI/CD lub przepływem pracy Airflow/Prefect, plik Zestaw testowy of Evidently jest właściwym narzędziem: pozwala na zdefiniowanie progów precyzyjne i programowo zwraca wynik pozytywny/nieudany.
from evidently.test_suite import TestSuite
from evidently.tests import (
TestNumberOfDriftedColumns,
TestShareOfDriftedColumns,
TestColumnDrift,
TestDatasetDrift
)
# --- Test Suite con soglie personalizzate ---
drift_test_suite = TestSuite(tests=[
# Non più del 20% delle feature deve driftare
TestShareOfDriftedColumns(lt=0.2),
# Feature critiche: test individuali con soglie aggressive
TestColumnDrift(
column_name="monthly_charges",
stattest="ks",
stattest_threshold=0.05
),
TestColumnDrift(
column_name="contract_type",
stattest="chi2",
stattest_threshold=0.05
),
TestColumnDrift(
column_name="num_support_tickets",
stattest="psi",
stattest_threshold=0.1 # PSI < 0.1 = no drift
),
# Dataset-level drift test
TestDatasetDrift(stattest_threshold=0.05),
])
drift_test_suite.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Risultato pass/fail per la pipeline
test_result = drift_test_suite.as_dict()
all_passed = all(
test["status"] == "SUCCESS"
for test in test_result["tests"]
)
if not all_passed:
print("DRIFT DETECTED - Pipeline triggering retraining...")
for test in test_result["tests"]:
if test["status"] != "SUCCESS":
print(f" FAILED: {test['name']} - {test['description']}")
# Trigger retraining (vedi sezione retraining)
trigger_retraining_pipeline()
else:
print("All drift tests passed - Model healthy")
Monitorowanie za pomocą NannyML: wydajność bez etykiet
NianiaML rozwiązuje jeden z najtrudniejszych problemów w monitorowaniu modeli: oszacuj wydajność modelu, gdy rzeczywiste etykiety nie są jeszcze dostępne. W modelu przewidywania rezygnacji etykiety (czy klient rzeczywiście zrezygnował) mogą przybyć zaledwie 90 dni po przewidywaniu. NannyML korzysta z tej metody Szacowanie wydajności oparte na pewności (CBPE) do oszacowania dokładności, F1 i AUC w czasie rzeczywistym, korzystając wyłącznie z rozkładów punktacji.
pip install nannyml
import nannyml as nml
import pandas as pd
# Carica i dati
reference_df = pd.read_parquet("data/reference_with_targets.parquet")
analysis_df = pd.read_parquet("data/production_last_30_days.parquet")
# --- CBPE: Stima delle performance senza label ---
estimator = nml.CBPE(
y_pred_proba="churn_probability",
y_pred="churn_predicted",
y_true="churned", # presente solo nel reference
timestamp_column_name="prediction_date",
problem_type="binary_classification",
metrics=["roc_auc", "f1", "precision", "recall"],
chunk_size=500 # 500 predizioni per chunk temporale
)
estimator.fit(reference_df)
results = estimator.estimate(analysis_df)
# Visualizza risultati con alert automatici
figure = results.plot()
figure.show()
# Estrai metriche per alerting
estimated_metrics = results.to_df()
latest_chunk = estimated_metrics.tail(1)
auc_lower = latest_chunk["estimated_roc_auc_lower_confidence_boundary"].values[0]
if auc_lower < 0.70:
print(f"ALERT: AUC stimato < 0.70 (lower bound: {auc_lower:.3f})")
trigger_retraining_pipeline()
# --- Univariate Drift Detection ---
univariate_calc = nml.UnivariateDriftCalculator(
column_names=["monthly_charges", "tenure_months", "num_tickets"],
timestamp_column_name="prediction_date",
continuous_methods=["kolmogorov_smirnov", "jensen_shannon"],
categorical_methods=["chi2", "jensen_shannon"],
chunk_size=500
)
univariate_calc.fit(reference_df)
drift_results = univariate_calc.calculate(analysis_df)
# Plotta il drift nel tempo per ogni feature
drift_figure = drift_results.filter(period="analysis").plot()
drift_figure.show()
NannyML tworzy wykresy czasowe przedstawiające ewolucję dryfu w czasie, z pasmami pewność siebie i alerty wizualne. Jest to szczególnie przydatne do zrozumienia Gdy czy dryf się rozpoczął i czy się pogarsza, czy stabilizuje.
Alibi Detect: Zaawansowane wykrywanie dryfu za pomocą MMD i LSDD
Wykrycie Alibi (autor: Seldon) i biblioteka referencyjna do zaawansowanego wykrywania co wykracza poza statystykę jednoczynnikową. Obsługuje MMD (maksymalna rozbieżność średnia) dla danych tabele i obrazy, LSDD (różnica gęstości najmniejszych kwadratów) i wykrywanie wartości odstających. Jest to idealne rozwiązanie, gdy trzeba wykryć złożony dryft wielowymiarowy.
pip install alibi-detect
import numpy as np
from alibi_detect.cd import MMDDrift, KSDrift, TabularDrift
from alibi_detect.saving import save_detector, load_detector
# Carica dati di riferimento (numpy array)
X_ref = reference_data[feature_columns].values.astype(np.float32)
X_current = current_data[feature_columns].values.astype(np.float32)
# --- KS Drift per feature continue ---
ks_detector = KSDrift(
x_ref=X_ref,
p_val=0.05, # soglia p-value
alternative="two-sided"
)
ks_preds = ks_detector.predict(
X_current,
drift_type="batch",
return_p_val=True,
return_distance=True
)
print("KS Drift Results:")
print(f" Drift detected: {ks_preds['data']['is_drift']}")
print(f" p-values per feature: {ks_preds['data']['p_val']}")
print(f" Features drifted: {ks_preds['data']['is_drift'].sum()}")
# --- MMD Drift per rilevazione multivariata ---
# Più potente per distribuzioni complesse
mmd_detector = MMDDrift(
x_ref=X_ref,
backend="pytorch", # o "tensorflow"
p_val=0.05,
n_permutations=200 # più alto = più preciso ma più lento
)
mmd_preds = mmd_detector.predict(
X_current,
return_p_val=True,
return_distance=True
)
print(f"\nMMD Drift (multivariato):")
print(f" Drift detected: {mmd_preds['data']['is_drift']}")
print(f" p-value: {mmd_preds['data']['p_val']:.4f}")
print(f" MMD^2 statistic: {mmd_preds['data']['distance']:.6f}")
# --- TabularDrift: test ottimizzato per dati tabulari misti ---
tabular_detector = TabularDrift(
x_ref=X_ref,
p_val=0.05,
categories_per_feature={
4: None, # feature index 4 = contract_type (categorica)
6: None # feature index 6 = payment_method (categorica)
},
)
# Salva detector per riutilizzo
save_detector(tabular_detector, "models/drift_detector/")
# Successivamente carica e usa
# loaded_detector = load_detector("models/drift_detector/")
Architektura systemu monitorowania
System monitorowania klasy produkcyjnej wymaga wielu zintegrowanych komponentów: warstwy zbieranie metryk, przechowywanie szeregów czasowych, system wizualizacji i silnik ostrzegania. Kombinacja Prometeusz + Grafana oraz standard open source dla tego przypadku użycia, z szeroką integracją z ekosystemem Kubernetes.
# monitoring_service.py
# Servizio FastAPI che espone metriche di drift per Prometheus
from fastapi import FastAPI, BackgroundTasks
from prometheus_client import Counter, Gauge, Histogram, generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
import pandas as pd
import schedule
import threading
import time
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
app = FastAPI(title="ML Monitoring Service")
# --- Prometheus Metrics ---
DRIFT_GAUGE = Gauge(
"ml_feature_drift_psi",
"Population Stability Index per feature",
labelnames=["feature_name", "model_name", "model_version"]
)
DATASET_DRIFT_GAUGE = Gauge(
"ml_dataset_drift_detected",
"1 se drift rilevato a livello dataset, 0 altrimenti",
labelnames=["model_name", "model_version"]
)
DRIFT_FEATURES_COUNT = Gauge(
"ml_drifted_features_count",
"Numero di feature che mostrano drift",
labelnames=["model_name"]
)
ESTIMATED_AUC = Gauge(
"ml_estimated_auc",
"AUC stimato via CBPE (NannyML)",
labelnames=["model_name", "model_version"]
)
PREDICTION_COUNT = Counter(
"ml_predictions_total",
"Numero totale di predizioni",
labelnames=["model_name", "outcome"]
)
INFERENCE_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Latenza inference in secondi",
labelnames=["model_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]
)
# --- Funzione di calcolo drift ---
def calculate_and_update_drift_metrics(
model_name: str,
model_version: str,
reference_data: pd.DataFrame,
current_data: pd.DataFrame,
feature_columns: list
):
"""Calcola PSI per ogni feature e aggiorna gauge Prometheus."""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric, DataDriftTable
report = Report(metrics=[
DatasetDriftMetric(stattest="psi"),
DataDriftTable(stattest="psi"),
])
report.run(
reference_data=reference_data[feature_columns],
current_data=current_data[feature_columns]
)
result = report.as_dict()
# Dataset-level drift
dataset_result = result["metrics"][0]["result"]
drift_detected = 1 if dataset_result["dataset_drift"] else 0
DATASET_DRIFT_GAUGE.labels(
model_name=model_name,
model_version=model_version
).set(drift_detected)
DRIFT_FEATURES_COUNT.labels(
model_name=model_name
).set(dataset_result["number_of_drifted_columns"])
# Per-feature PSI
feature_results = result["metrics"][1]["result"]["drift_by_columns"]
for feature_name, feature_data in feature_results.items():
psi_value = feature_data.get("stattest_threshold", 0)
actual_stat = feature_data.get("drift_score", 0)
DRIFT_GAUGE.labels(
feature_name=feature_name,
model_name=model_name,
model_version=model_version
).set(actual_stat)
logger.info(f"Drift metrics updated for {model_name} v{model_version}")
return drift_detected
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/drift/check")
async def trigger_drift_check(background_tasks: BackgroundTasks):
"""Trigger manuale del drift check."""
background_tasks.add_task(run_drift_check_job)
return {"status": "drift check started"}
@app.get("/health")
async def health():
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
Konfiguracja Prometeusza i Grafany
Konfigurowanie Prometheusa do skrobania metryk ML jest proste: dodaj plik usługę monitorowania jako docelową w pliku konfiguracyjnym.
# prometheus.yml
global:
scrape_interval: 60s
evaluation_interval: 60s
rule_files:
- "ml_drift_alerts.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ["alertmanager:9093"]
scrape_configs:
- job_name: "ml-monitoring"
static_configs:
- targets: ["ml-monitoring-service:8000"]
metrics_path: "/metrics"
scrape_interval: 60s
- job_name: "model-serving"
static_configs:
- targets: ["fastapi-serving:8080"]
metrics_path: "/metrics"
---
# ml_drift_alerts.yml
groups:
- name: ml_drift_alerts
rules:
- alert: HighFeatureDrift
expr: ml_feature_drift_psi{} > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "High drift detected on feature {{ $labels.feature_name }}"
description: "PSI = {{ $value | humanize }} for feature {{ $labels.feature_name }}"
- alert: DatasetDriftDetected
expr: ml_dataset_drift_detected == 1
for: 10m
labels:
severity: critical
annotations:
summary: "Dataset-level drift detected for model {{ $labels.model_name }}"
description: "Model performance may be degraded. Consider retraining."
- alert: LowEstimatedAUC
expr: ml_estimated_auc < 0.70
for: 15m
labels:
severity: critical
annotations:
summary: "Estimated AUC dropped below threshold"
description: "Estimated AUC = {{ $value | humanize }} for model {{ $labels.model_name }}"
Pulpit nawigacyjny Grafana: kluczowe wskaźniki do monitorowania
- PSI dla funkcji: mapa cieplna z kolorowymi progami 0,1/0,2 (zielony/żółty/czerwony)
- Wynik dryfu w czasie: Wykres liniowy dla funkcji krytycznych
- Szacunkowe AUC (CBPE): szeregi czasowe z przedziałami ufności
- Liczba dryfowanych obiektów: miernik z progiem alarmowym
- Rozkład prognoz: histogram wyniku prawdopodobieństwa
- Opóźnienie i przepustowość: standardowy panel do monitorowania SLA
Automatyczny rurociąg przekwalifikowania
Wykrywanie dryfu jest konieczne, ale niewystarczające: należy także reagować automatycznie. Automatyczny potok ponownego szkolenia musi zostać wywołany przez alerty dryfu, zatwierdź nowy model przed wymianą produkowanego modelu i zapewnienie wycofania w razie potrzeby regresja wydajności.
# retraining_pipeline.py
# Pipeline di retraining automatico con MLflow
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score
from datetime import datetime
import logging
import requests
logger = logging.getLogger(__name__)
MLFLOW_TRACKING_URI = "http://mlflow-server:5000"
MODEL_NAME = "churn-prediction"
MIN_AUC_THRESHOLD = 0.72 # AUC minima per promuovere in produzione
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
def load_fresh_training_data() -> pd.DataFrame:
"""Carica dati aggiornati per il retraining."""
# In produzione: query al feature store o data warehouse
df = pd.read_parquet("data/training_data_fresh.parquet")
logger.info(f"Loaded {len(df)} training samples")
return df
def train_new_model(df: pd.DataFrame) -> tuple:
"""Addestra un nuovo modello con i dati freschi."""
feature_columns = [
"age", "tenure_months", "monthly_charges",
"total_charges", "num_support_tickets",
"contract_type_encoded", "payment_method_encoded"
]
target_column = "churned"
X = df[feature_columns]
y = df[target_column]
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
model = GradientBoostingClassifier(
n_estimators=200,
max_depth=4,
learning_rate=0.05,
subsample=0.8,
random_state=42
)
model.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_val)[:, 1]
y_pred = model.predict(X_val)
metrics = {
"auc": roc_auc_score(y_val, y_pred_proba),
"f1": f1_score(y_val, y_pred),
"precision": precision_score(y_val, y_pred),
"recall": recall_score(y_val, y_pred),
"val_samples": len(X_val)
}
return model, metrics, feature_columns
def register_and_promote_model(
model,
metrics: dict,
feature_columns: list,
trigger_reason: str
) -> bool:
"""Registra il modello in MLflow e promuovilo in produzione se supera la soglia."""
with mlflow.start_run(run_name=f"retrain_{datetime.utcnow().strftime('%Y%m%d_%H%M')}") as run:
# Log params
mlflow.log_param("trigger_reason", trigger_reason)
mlflow.log_param("training_timestamp", datetime.utcnow().isoformat())
mlflow.log_param("features", feature_columns)
# Log metrics
for metric_name, metric_value in metrics.items():
if isinstance(metric_value, (int, float)):
mlflow.log_metric(metric_name, metric_value)
# Log model
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name=MODEL_NAME
)
run_id = run.info.run_id
logger.info(f"Model registered with run_id={run_id}, AUC={metrics['auc']:.4f}")
# Promuovi in produzione se supera la soglia
if metrics["auc"] >= MIN_AUC_THRESHOLD:
client = mlflow.tracking.MlflowClient()
latest_version = client.get_latest_versions(MODEL_NAME, stages=["None"])[0]
client.transition_model_version_stage(
name=MODEL_NAME,
version=latest_version.version,
stage="Production",
archive_existing_versions=True
)
logger.info(f"Model v{latest_version.version} promoted to Production")
send_slack_notification(f"Model retrained and promoted. AUC={metrics['auc']:.4f}")
return True
else:
logger.warning(f"Model AUC {metrics['auc']:.4f} below threshold {MIN_AUC_THRESHOLD}. Not promoting.")
send_slack_notification(
f"Retraining completed but model below threshold. AUC={metrics['auc']:.4f}. Manual review needed.",
level="warning"
)
return False
def send_slack_notification(message: str, level: str = "info"):
"""Invia notifica Slack (o webhook generico)."""
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
color = "#36a64f" if level == "info" else "#ff0000"
payload = {
"attachments": [{
"color": color,
"title": "MLOps Retraining Alert",
"text": message,
"footer": f"ML Platform | {datetime.utcnow().isoformat()}"
}]
}
try:
requests.post(webhook_url, json=payload, timeout=5)
except Exception as e:
logger.error(f"Failed to send Slack notification: {e}")
def run_retraining_pipeline(trigger_reason: str = "drift_detected"):
"""Entry point della pipeline di retraining."""
logger.info(f"Starting retraining pipeline. Trigger: {trigger_reason}")
df = load_fresh_training_data()
model, metrics, feature_columns = train_new_model(df)
promoted = register_and_promote_model(model, metrics, feature_columns, trigger_reason)
logger.info(f"Retraining pipeline completed. Promoted: {promoted}")
return promoted
if __name__ == "__main__":
run_retraining_pipeline(trigger_reason="manual_trigger")
Strategie wyzwalania przekwalifikowania
Określić Gdy przekwalifikowanie jest równie ważne jak Jak zrób to. Istnieją trzy główne strategie, każda ma zalety i ograniczenia:
Porównanie strategii przekwalifikowania
- Na podstawie harmonogramu (kalendarz): Stałe okresowe przekwalifikowanie (co tydzień, co miesiąc). Prosty do wdrożenia, ale nieefektywny: przekwalifikowuje się nawet wtedy, gdy nie jest potrzebny i mógłby niewystarczająco częste przekwalifikowanie się w okresach szybkiego dryfu.
- Oparte na wydajności: Ponowne szkolenie, gdy wskaźniki wydajności spadną poniżej progu. Wymaga szybkiego dostępu do prawdy. Idealny dla modeli z szybka pętla informacji zwrotnej (np. współczynnik klikalności, konwersja).
- Oparte na dryfie: Ponowne szkolenie w przypadku wykrycia dryfu statystycznego istotne pod względem cech lub przewidywań. Nie wymaga etykiet. Proaktywne podejście co zapobiega degradacji, zanim wpłynie to na wydajność. Ryzyko fałszywych alarmów.
- Hybryda (zalecana): Połącz wykrywanie dryfu jako główny wyzwalacz z walidacją wydajności jako bramą jakości przed awansem produkcja. Dodaje również okresowe przekwalifikowanie awaryjne.
Zakończ konfigurację za pomocą Docker Compose
W środowiskach programistycznych i przejściowych Docker Compose umożliwia uruchomienie całego stosu szybkie i powtarzalne monitorowanie.
# docker-compose.monitoring.yml
version: "3.8"
services:
# ML Monitoring Service (FastAPI + Evidently)
ml-monitoring:
build: ./monitoring_service
ports:
- "8001:8000"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
- REFERENCE_DATA_PATH=/data/reference.parquet
volumes:
- ./data:/data
- ./reports:/reports
depends_on:
- mlflow
# MLflow Tracking Server
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: >
mlflow server
--host 0.0.0.0
--port 5000
--backend-store-uri postgresql://mlflow:mlflow@postgres/mlflow
--default-artifact-root s3://mlflow-artifacts/
depends_on:
- postgres
# PostgreSQL per MLflow
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=mlflow
- POSTGRES_PASSWORD=mlflow
- POSTGRES_DB=mlflow
volumes:
- postgres_data:/var/lib/postgresql/data
# Prometheus
prometheus:
image: prom/prometheus:v2.50.1
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- ./monitoring/alerts.yml:/etc/prometheus/alerts.yml
- prometheus_data:/prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.retention.time=30d"
# Grafana
grafana:
image: grafana/grafana:10.3.3
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
- grafana_data:/var/lib/grafana
depends_on:
- prometheus
# Alertmanager
alertmanager:
image: prom/alertmanager:v0.27.0
ports:
- "9093:9093"
volumes:
- ./monitoring/alertmanager.yml:/etc/alertmanager/alertmanager.yml
volumes:
postgres_data:
prometheus_data:
grafana_data:
Budżet <5 tys. EUR/rok dla MŚP
Kompletny system wykrywania dryftu nie wymaga budżetu przedsiębiorstwa. Z podejściem open source i natywnie chmurowy, możliwe jest utrzymanie solidnego systemu przy minimalnych kosztach:
- Ewidentnie AI + NannyML: Oprogramowanie typu open source, bezpłatne
- MLflow (własny hosting): Oprogramowanie typu open source, tylko koszty infrastruktury
- Prometeusz + Grafana: Oprogramowanie typu open source, bezpłatne
- Obliczenia (VPS/chmura): ~50-100 EUR/miesiąc dla średniej maszyny wirtualnej (600-1200 EUR/rok)
- Pamięć zgodna z S3: ~20 EUR/miesiąc za 500 GB (240 EUR/rok)
- Szacunkowa suma: ~1000-2000 EUR/rok za pełny stos
Najlepsze praktyki w zakresie wykrywania dryfu w produkcji
Lista kontrolna produkcji
- Zdefiniuj statystyczną podstawę przed wdrożeniem: Uruchom wykrywanie dryfu względem siebie w zestawie walidacyjnym służącym do kalibracji progów. PSI > 0 na danych stacjonarna oznacza przekroczenie progu.
- Użyj odpowiednich okien czasowych: Nie porównuj całego ruchu historyczny z dniem dzisiejszym. Użyj przesuwanych okien (7/14/30 dni), aby uchwycić ostatni dryf.
- Nadaj priorytet funkcjom według ważności: Monitoruj bardziej agresywnie Funkcje o dużym wpływie SHAP. Nie wszystkie dryfy są równie krytyczne.
- Odróżnij dryf techniczny od dryfu semantycznego: Zmiana formatu pola (np. z ciągu znaków na liczbę) i błąd inżynieryjny, a nie dryf ML. Dodaj oddzielne kontrole jakości danych.
- Unikaj czujnego zmęczenia: Ustaw początkowo konserwatywne progi i udoskonala się z biegiem czasu. Zbyt wiele alertów prowadzi do ignorowania ich wszystkich.
- Rejestrowanie decyzji o przekwalifikowaniu: Każde przekwalifikowanie musi takie być wykreślone za pomocą MLflow, łącznie z przyczyną wyzwalacza, metrykami przed/po i promowana wersja modelu.
- Testowanie samego detektora: Okresowo sprawdzaj, czy system wykrywanie działa poprawnie z testowaniem wstrzykiwania danych (wstrzykiwanie syntetycznego dryfu i sprawdź, czy został wykryty).
Anty-wzorce, których należy unikać
- Automatyczne przekwalifikowanie bez bramek jakościowych: Nie promuj się tworzenie nowo wyszkolonego modelu bez sprawdzania wydajności. Ponowne uczenie się na zanieczyszczonych danych może pogorszyć model.
- Tylko wyjście monitorujące: Monitoruj tylko prognozy bez cechy wejściowe uniemożliwiają zdiagnozowanie przyczyny dryfu.
- Naprawiono progi dla wszystkich modeli: Każdy model ma czułość różni się od driftu. PSI > 0,2 może być katastrofalne dla krytycznego modelu i nieistotne dla modelu o niskim priorytecie.
- Ignoruj dryf koncepcji: Jeśli etykiety z opiniami nie są zbierane z modelu produkcyjnego nie da się bezpośrednio wykryć dryfu koncepcji. Zainwestuj w infrastrukturę pętli sprzężenia zwrotnego.
Wnioski i dalsze kroki
Sercem każdego dojrzałego MLOpsa jest automatyczny system wykrywania dryfu i ponownego szkolenia. Bez aktywnego monitorowania modele ML w produkcji po cichu ulegają degradacji, generując błędne decyzje, które mogą kosztować znacznie więcej niż koszt samego systemu monitorowania.
W tym przewodniku zbudowaliśmy kompletny system: od teoretycznego zrozumienia cztery rodzaje driftu, do praktycznego wdrożenia z Evidently AI do interaktywnych raportów, NannyML do szacowania wydajności bez etykiet i Alibi Detect do wykrywania zaawansowane wielowymiarowe. Wszystko zintegrowaliśmy z Prometheusem, Grafaną i rurociągiem automatyczne przekwalifikowanie za pomocą MLflow.
Następnym krokiem jest integracja tego systemu z serwującą FastAPI, którą widzieliśmy w poprzednim artykule oraz ze skalowaniem Kubernetesa, które zobaczymy w następnym. Z tymi komponentów, otrzymasz kompletny, nadający się do produkcji i łatwy w utrzymaniu system MLOps.
Seria MLOps trwa
- Poprzedni artykuł: Śledzenie eksperymentów za pomocą MLflow: kompletny przewodnik - nagrywać eksperymenty i porównywać modele
- Następny artykuł: Obsługa modeli: FastAPI + Uvicorn w produkcji - budować skalowalne interfejsy API wnioskowania
- Dalsze informacje: Skalowanie ML na Kubernetesie - koordynuj wdrażanie za pomocą KubeFlow i Seldon
- Powiązane serie: Zaawansowane głębokie uczenie się - monitorowanie złożonych modeli neuronowych







