Případová studie: MLOps ve výrobě – od nuly po kompletní potrubí
Pokryli jsme celou sérii MLOps: od úvodu k základním konceptům až po CI/CD potrubí s GitHub Actions and Docker, verzování pomocí DVC, sledování experimentů pomocí MLflow, detekce posunu, k poskytování s FastAPI, škálování na Kubernetes, A/B testování a řízení se zákonem o AI. Nyní je čas dát to všechno dohromady do skutečného, konkrétního případu.
V této případové studii vybudujeme systém od nuly předpověď churn pro fiktivní telekomunikační společnost (TelecomIT S.p.A.), která se řídí všemi principy MLOps naučil v seriálu. Začneme od obchodního problému, navrhneme end-to-end architekturu, implementujeme každou komponentu s fungujícím kódem Pythonu a uvidíme, jak monitorovat a udržet systém ve výrobě v průběhu času. Konečným výsledkem bude potrubí MLOps kompletní, reprodukovatelné a připravené pro reálné prostředí.
Co se naučíte
- Jak převést obchodní problém do end-to-end architektury MLOps
- Specifické funkce pro predikci odchodu (RFM, dočasné chování)
- Kompletní DVC potrubí: data, předzpracování, školení, hodnocení
- Sledování strukturovaného experimentu s MLflow a systematickým porovnáváním modelů
- Modelové servírování s FastAPI + Uvicorn kontejnerované a připravené pro Kubernetes
- Monitorování v reálném čase s Prometheus + Grafana: drift, latence, business KPI
- Statisticky přísné A/B testování mezi modely vyzyvatele a šampiona
- Kompletní správa: modelová karta, auditní záznam, kontrola poctivosti před nasazením
- Odhad skutečných nákladů a ROI projektu MLOps pro italský MSP
Obchodní problém: Chun in Telecom Services
TelecomIT S.p.A. a telekomunikační operátor s 2,1 miliony aktivních zákazníků. Měsíční míra odchodu zákazníků je 2,3 %, což znamená, že každý měsíc přijde přibližně 48 000 zákazníků. Průměrné náklady na získání nového zákazníka jsou 180 EUR, přičemž náklady na udržení stávajícího zákazníka a 35 EUR. Identifikujte zákazníky, kterým hrozí odchod Před že odejdou, je jednou z nejmocnějších dostupných ekonomických pák.
Ekonomická hodnota projektu
S modelem, který správně identifikuje 70 % rizikových churnerů (stažení = 0,70) a který má přesnost 65 % (35 % falešně pozitivních, zákazníci kontaktovali zbytečně), na 48 000 měsíčních uživateli:
- Identifikovaní churners: 48 000 × 0,70 = 33 600
- Náklady na udržovací kampaň: 33 600 / 0,65 × 35 EUR = ~1,8 mil. EUR/měsíc
- Vyvarované akviziční úspory: 33 600 × 0,40 × 180 EUR = ~2,4 milionu EUR/měsíc
- Odhadovaná čistá návratnost investic: +600 tis. EUR/měsíc nebo 7,2 mil. EUR/rok
Náklady na infrastrukturu MLOps pro tento plynovod: přibližně 3 500 EUR/rok v cloudu.
Technické a obchodní požadavky
Před psaním kódu definujeme konkrétní požadavky, kterými se bude řídit každá architektonická volba. Tyto požadavky vyplývají z rozhovorů s obchodním týmem az technického kontextu existující.
| Požadavek | Specifikace | Omezení |
|---|---|---|
| Předpovědní frekvence | Měsíční dávka, bodování u všech aktivních zákazníků | Do 1. dne v měsíci pro CRM kampaň |
| Latence hromadného bodování | 2,1 milionu zákazníků za méně než 4 hodiny | Časový úsek CRM: 02:00-06:00 |
| Latence v reálném čase | Bodování jednoho zákazníka pro agenty call centra | < 200 ms p99 |
| Cílová metrika | AUC-ROC ≥ 0,80, Recall ≥ 0,65 | Ověřeno na měsíční výdrž |
| Rekvalifikace | Měsíčně, automaticky na nová data | Spustit také při detekci posunu |
| Vládnutí | Karta modelu, audit trail, kontrola poctivosti | AI Act: omezené riziko (telekomunikační sektor) |
| Hromady | 100% open-source, on-premise + hybridní cloud | Rozpočet infrastruktury: < 5 000 EUR/rok |
End-to-End architektura
Architektura systému integruje všechny nástroje řady MLOps do soudržného toku. Každá komponenta má přesnou roli a komunikuje s ostatními prostřednictvím dobře definovaných rozhraní. Vůdčím principem je oddělení odpovědnosti: data, školení a živé poskytování v nezávislých vrstvách, které lze aktualizovat bez dopadu na ostatní.
+------------------+ +------------------+ +-------------------+
| DATA SOURCES | | DATA PIPELINE | | TRAINING LAYER |
| | | | | |
| - CRM Database |-->| - DVC Pipeline |-->| - MLflow Tracking |
| - CDR (Call Data)| | - Great Expects | | - XGBoost/RF/LGB |
| - Billing System | | - Feature Store | | - Hyperopt Tuning |
| - App Usage Logs | | - DVC Remote | | - Model Registry |
+------------------+ +------------------+ +-------------------+
|
(model promoted)
|
+------------------+ +------------------+ +-------------------+
| MONITORING | | SERVING | | CI/CD PIPELINE |
| | | | | |
| - Prometheus |<--| - FastAPI REST |<--| - GitHub Actions |
| - Grafana Dash | | - Batch Scorer | | - Docker Build |
| - Drift Alerts | | - Kubernetes | | - Auto-tests |
| - Retrain Trig. | | - Load Balancer | | - Auto-deploy |
+------------------+ +------------------+ +-------------------+
|
+------+-------+
| GOVERNANCE |
| |
| - Model Card |
| - Audit Log |
| - Fairness |
+--------------+
Kompletní technologický zásobník
| Vrstvy | Nástroje | Verze | Náklady |
|---|---|---|---|
| Verze dat | DVC + MinIO (kompatibilní s S3) | DVC 3.x | Open source |
| Sledování experimentu | Samoobslužný MLflow | MLflow 2.x | Open source |
| Validace dat | Velká očekávání | GX 0,18 | Open source |
| CI/CD | Akce GitHubu | - | Volné úrovně |
| Kontejnerizace | Docker + Docker Compose | Docker 24.x | Open source |
| Servírování modelů | FastAPI + Uvicorn | FastAPI 0.110 | Open source |
| Orchestr | Kubernetes (k3s) | k3s 1,28 | Open source |
| Sledování | Prometheus + Grafana | Ples 2.47 | Open source |
| Detekce driftu | Evidentně AI | 0,4.x | Open source |
| Governance/Fairness | Fairlearn + SHAP | FL 0,10 | Open source |
| ML Framework | XGBoost + LightGBM + scikit-learn | XGB 2.0 | Open source |
| Cloudová infrastruktura | Hetzner Cloud (VPS 2 jádra / 4 GB RAM) | - | ~360 EUR/rok |
Struktura a konfigurace úložiště
Dobře organizovaná struktura úložiště je základem každého projektu MLOps udržovatelný. Následující struktura jasně odděluje kód, konfiguraci, testování a dokumentace, dodržující zásady vysoké soudržnosti a nízké vazby.
telecomit-churn/
├── .github/
│ └── workflows/
│ ├── ml-pipeline.yml # Pipeline CI/CD principale
│ ├── retrain-trigger.yml # Trigger retraining automatico
│ └── pr-validation.yml # Validazione PR
├── config/
│ ├── model_config.yaml # Iperparametri e configurazione modello
│ ├── feature_config.yaml # Feature set e preprocessing
│ └── serving_config.yaml # Configurazione FastAPI
├── data/
│ ├── raw/ # Dati grezzi (tracciati da DVC)
│ ├── processed/ # Dati preprocessati (tracciati da DVC)
│ └── features/ # Feature engineered (tracciati da DVC)
├── models/
│ └── registry/ # Modelli registrati (tracciati da DVC)
├── src/
│ ├── data/
│ │ ├── ingestion.py # Estrazione dati da CRM/CDR
│ │ ├── validation.py # Great Expectations checks
│ │ └── preprocessing.py # Pulizia e trasformazione
│ ├── features/
│ │ ├── rfm_features.py # Feature RFM (Recency, Frequency, Monetary)
│ │ ├── behavioral.py # Feature comportamentali
│ │ └── feature_store.py # Feature store locale
│ ├── training/
│ │ ├── train.py # Training loop con MLflow
│ │ ├── evaluate.py # Evaluation e confronto col production model
│ │ └── hyperopt_search.py # Tuning iperparametri
│ ├── serving/
│ │ ├── api.py # FastAPI app
│ │ ├── batch_scorer.py # Scoring batch mensile
│ │ └── middleware.py # Logging, rate limiting, metrics
│ ├── monitoring/
│ │ ├── drift_detector.py # Evidently drift detection
│ │ ├── metrics_exporter.py # Prometheus metrics
│ │ └── alert_manager.py # Alerting e trigger retraining
│ └── governance/
│ ├── model_card.py # Generatore model card
│ ├── fairness_checker.py # Analisi fairness con Fairlearn
│ └── audit_logger.py # Audit trail immodificabile
├── tests/
│ ├── unit/ # Test unitari per ogni modulo
│ ├── integration/ # Test integrazione pipeline
│ └── smoke/ # Smoke test API post-deploy
├── dvc.yaml # Pipeline DVC stages
├── params.yaml # Parametri DVC
├── docker/
│ ├── Dockerfile.training # Immagine training
│ ├── Dockerfile.serving # Immagine serving
│ └── docker-compose.yml # Stack locale completo
├── k8s/
│ ├── deployment.yaml # Kubernetes deployment
│ ├── service.yaml # Kubernetes service
│ └── hpa.yaml # Horizontal Pod Autoscaler
└── notebooks/
└── exploration/ # Notebook EDA (non in pipeline)
Funkce pro předvídání odchodů
Funkce je často rozdílem mezi průměrným a vynikajícím modelem. Pro predikci odchodu v telekomunikačním sektoru pocházejí nejvíce prediktivní funkce chování zákazníků v průběhu času. Používáme framework RFM (Aktuálnost, Frekvence, Peněžní) jako výchozí bod, obohacený o funkce charakteristiky chování specifické pro telekomunikační doménu.
# rfm_features.py
# Feature engineering per churn prediction nel settore telecom
# Produce un dataset tabellare con 45 feature predittive
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Tuple
import mlflow
def compute_rfm_features(
df_transactions: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Calcola le feature RFM (Recency, Frequency, Monetary) per ogni cliente.
Args:
df_transactions: DataFrame con transazioni/ricariche
snapshot_date: Data di riferimento per il calcolo
customer_id_col: Nome colonna identificativo cliente
Returns:
DataFrame con una riga per cliente e le feature RFM
"""
df = df_transactions.copy()
df["transaction_date"] = pd.to_datetime(df["transaction_date"])
rfm = df.groupby(customer_id_col).agg(
recency_days=("transaction_date", lambda x: (snapshot_date - x.max()).days),
frequency=("transaction_date", "count"),
monetary_total=("amount", "sum"),
monetary_avg=("amount", "mean"),
monetary_std=("amount", "std"),
first_transaction=("transaction_date", "min"),
last_transaction=("transaction_date", "max"),
).reset_index()
# Tenure in giorni (durata rapporto col cliente)
rfm["tenure_days"] = (snapshot_date - rfm["first_transaction"]).dt.days
# Variazione dell'attivita (ultimi 30 gg vs media storica)
last_30_days = snapshot_date - timedelta(days=30)
recent = df[df["transaction_date"] >= last_30_days].groupby(customer_id_col).agg(
recent_frequency=("transaction_date", "count"),
recent_monetary=("amount", "sum")
).reset_index()
rfm = rfm.merge(recent, on=customer_id_col, how="left")
rfm["recent_frequency"] = rfm["recent_frequency"].fillna(0)
rfm["recent_monetary"] = rfm["recent_monetary"].fillna(0)
# Trend: rapporto attivita recente vs attivita media mensile
avg_monthly_freq = rfm["frequency"] / (rfm["tenure_days"] / 30).clip(lower=1)
rfm["activity_trend"] = rfm["recent_frequency"] / avg_monthly_freq.clip(lower=0.01)
return rfm
def compute_behavioral_features(
df_cdr: pd.DataFrame,
df_support: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Feature comportamentali da CDR (call detail records) e ticket di supporto.
Queste feature catturano segnali predittivi di churn specifici del dominio telecom.
"""
# ---- Feature CDR: comportamento nelle chiamate ----
cdr_30d = df_cdr[
df_cdr["call_date"] >= snapshot_date - timedelta(days=30)
]
cdr_features = cdr_30d.groupby(customer_id_col).agg(
calls_count_30d=("call_id", "count"),
calls_duration_avg_30d=("duration_seconds", "mean"),
calls_duration_total_30d=("duration_seconds", "sum"),
data_usage_gb_30d=("data_usage_mb", lambda x: x.sum() / 1024),
international_calls_ratio=("is_international", "mean"),
peak_hour_ratio=("is_peak_hour", "mean"),
unique_contacts_30d=("called_number_hash", "nunique"),
).reset_index()
# ---- Feature supporto: interazioni negative ----
support_30d = df_support[
df_support["ticket_date"] >= snapshot_date - timedelta(days=90)
]
support_features = support_30d.groupby(customer_id_col).agg(
support_tickets_90d=("ticket_id", "count"),
support_escalations_90d=("is_escalated", "sum"),
avg_resolution_time_h=("resolution_hours", "mean"),
complaint_ratio=("is_complaint", "mean"),
unresolved_tickets=("is_resolved", lambda x: (~x).sum()),
).reset_index()
behavioral = cdr_features.merge(support_features, on=customer_id_col, how="left")
behavioral = behavioral.fillna(0)
# Feature derivata: "frustration index" - proxy di insoddisfazione
behavioral["frustration_index"] = (
behavioral["support_tickets_90d"] * 2 +
behavioral["support_escalations_90d"] * 5 +
behavioral["unresolved_tickets"] * 3
).clip(upper=20) / 20 # Normalizzato 0-1
return behavioral
def build_feature_matrix(
df_customers: pd.DataFrame,
df_rfm: pd.DataFrame,
df_behavioral: pd.DataFrame,
label_col: str = "churned"
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Assembla la matrice delle feature finale unendo tutte le sorgenti.
Returns:
(X, y) - feature matrix e label vector
"""
features = df_customers[[
"customer_id", "contract_type", "payment_method",
"age_group", "region", "plan_monthly_eur", label_col
]].merge(df_rfm, on="customer_id", how="left") \
.merge(df_behavioral, on="customer_id", how="left")
# Encoding categoriche
cat_cols = ["contract_type", "payment_method", "age_group", "region"]
features = pd.get_dummies(features, columns=cat_cols, drop_first=True)
y = features[label_col].astype(int)
X = features.drop(columns=["customer_id", label_col,
"first_transaction", "last_transaction"])
return X, y
DVC Pipeline: End-to-End reprodukovatelnost
Potrubí DVC definuje každou fázi procesu jako směrovaný acyklický graf (DAG): každý stupeň deklaruje své vlastní vstupy (deps), výstupy (outs) a parametry. DVC sleduje závislostí a automaticky znovu spustí pouze fáze, které byly změnami zrušeny, a to přesně jako Make, ale pro kanály ML s daty a modely.
stages:
# Stage 1: Ingestion - estrae dati da CRM/CDR e salva snapshot
data_ingestion:
cmd: python src/data/ingestion.py
deps:
- src/data/ingestion.py
- config/feature_config.yaml
params:
- snapshot_date
- data.source_db_table
outs:
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
metrics:
- data/raw/ingestion_report.json:
cache: false
# Stage 2: Validation - Great Expectations checks sui dati grezzi
data_validation:
cmd: python src/data/validation.py
deps:
- src/data/validation.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
params:
- validation.min_rows
- validation.max_null_ratio
outs:
- data/raw/validation_report.html:
cache: false
metrics:
- data/raw/validation_summary.json:
cache: false
# Stage 3: Preprocessing - pulizia e trasformazione
preprocessing:
cmd: python src/data/preprocessing.py
deps:
- src/data/preprocessing.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
- config/feature_config.yaml
params:
- preprocessing.imputation_strategy
- preprocessing.outlier_threshold
outs:
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
# Stage 4: Feature Engineering - RFM + behavioral
feature_engineering:
cmd: python src/features/rfm_features.py
deps:
- src/features/rfm_features.py
- src/features/behavioral.py
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
params:
- features.lookback_days
- features.rfm_bins
outs:
- data/features/feature_matrix.parquet
- data/features/feature_names.json
# Stage 5: Training - XGBoost con MLflow tracking
training:
cmd: python src/training/train.py
deps:
- src/training/train.py
- data/features/feature_matrix.parquet
- config/model_config.yaml
params:
- model.algorithm
- model.n_estimators
- model.max_depth
- model.learning_rate
- model.subsample
- model.scale_pos_weight
- training.test_size
- training.random_seed
outs:
- models/registry/challenger_model.pkl
- models/registry/preprocessor.pkl
metrics:
- models/registry/metrics.json:
cache: false
plots:
- models/registry/confusion_matrix.json:
cache: false
- models/registry/roc_curve.json:
cache: false
# Stage 6: Evaluation - confronto challenger vs champion
evaluation:
cmd: python src/training/evaluate.py
deps:
- src/training/evaluate.py
- models/registry/challenger_model.pkl
- data/features/feature_matrix.parquet
params:
- evaluation.min_auc_roc
- evaluation.min_recall
- evaluation.promotion_strategy
outs:
- models/registry/evaluation_report.json:
cache: false
- models/registry/model_card.json:
cache: false
Školení s MLflow: Kompletní sledování experimentů
Tréninkový modul integruje MLflow pro systematické sledování každého experimentu. Používáme XGBoost jako hlavní algoritmus díky své vynikající vyváženosti mezi výkonem a interpretovatelností v tabulkových datech s mechanismem včasného zastavení a vyhledávání hyperparametrů přes Hyperopt.
# train.py
# Training pipeline per churn prediction con MLflow tracking
# Eseguito dalla pipeline DVC: python src/training/train.py
import os
import json
import pickle
import logging
from pathlib import Path
from typing import Dict, Tuple
import mlflow
import mlflow.xgboost
import pandas as pd
import numpy as np
import xgboost as xgb
import yaml
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
roc_auc_score, f1_score, precision_score,
recall_score, accuracy_score, confusion_matrix
)
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(config_path: str = "config/model_config.yaml") -> dict:
"""Carica configurazione modello da file YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def load_features(features_path: str = "data/features/feature_matrix.parquet") -> Tuple:
"""Carica feature matrix e prepara train/val/test split."""
df = pd.read_parquet(features_path)
# Leggi i parametri DVC
with open("params.yaml") as f:
params = yaml.safe_load(f)
test_size = params["training"]["test_size"]
seed = params["training"]["random_seed"]
label_col = "churned"
X = df.drop(columns=[label_col])
y = df[label_col].astype(int)
# Stratified split per preservare il bilanciamento delle classi
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=seed, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.15, random_state=seed, stratify=y_train
)
logger.info(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")
logger.info(f"Churn rate - Train: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")
return X_train, X_val, X_test, y_train, y_val, y_test
def train_xgboost(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
params: dict
) -> xgb.XGBClassifier:
"""Addestra un modello XGBoost con early stopping."""
model = xgb.XGBClassifier(
n_estimators=params.get("n_estimators", 500),
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.05),
subsample=params.get("subsample", 0.8),
colsample_bytree=params.get("colsample_bytree", 0.8),
scale_pos_weight=params.get("scale_pos_weight", 4.0), # Bilancia classi sbilanciate
min_child_weight=params.get("min_child_weight", 5),
reg_alpha=params.get("reg_alpha", 0.1),
reg_lambda=params.get("reg_lambda", 1.0),
use_label_encoder=False,
eval_metric="auc",
random_state=42,
n_jobs=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=100
)
return model
def compute_metrics(
model: xgb.XGBClassifier,
X: pd.DataFrame,
y: pd.Series,
threshold: float = 0.5
) -> Dict[str, float]:
"""Calcola metriche complete di valutazione."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = (y_proba >= threshold).astype(int)
cm = confusion_matrix(y, y_pred)
return {
"auc_roc": roc_auc_score(y, y_proba),
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, zero_division=0),
"recall": recall_score(y, y_pred, zero_division=0),
"f1": f1_score(y, y_pred, zero_division=0),
"tn": int(cm[0][0]),
"fp": int(cm[0][1]),
"fn": int(cm[1][0]),
"tp": int(cm[1][1]),
}
def hyperopt_search(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
max_evals: int = 30
) -> dict:
"""Ricerca iperparametri con Hyperopt (Tree-structured Parzen Estimator)."""
space = {
"max_depth": hp.choice("max_depth", [4, 5, 6, 7, 8]),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
"n_estimators": hp.choice("n_estimators", [200, 300, 500, 700]),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
"scale_pos_weight": hp.uniform("scale_pos_weight", 2.0, 8.0),
"min_child_weight": hp.choice("min_child_weight", [3, 5, 7, 10]),
"reg_alpha": hp.loguniform("reg_alpha", np.log(0.01), np.log(1.0)),
}
def objective(params):
model = train_xgboost(X_train, y_train, X_val, y_val, params)
metrics = compute_metrics(model, X_val, y_val)
# Ottimizziamo su F1 per bilanciare precision e recall
return {"loss": -metrics["f1"], "status": STATUS_OK}
trials = Trials()
best = fmin(objective, space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best
def run_training_pipeline():
"""Pipeline di training principale con MLflow tracking completo."""
config = load_config()
X_train, X_val, X_test, y_train, y_val, y_test = load_features()
# Configura MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("telecomit-churn-prediction")
with mlflow.start_run(run_name=f"xgb-training-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}") as run:
# Log parametri di sistema
mlflow.log_params({
"algorithm": "XGBoostClassifier",
"train_size": len(X_train),
"val_size": len(X_val),
"test_size": len(X_test),
"churn_rate_train": float(y_train.mean()),
"feature_count": X_train.shape[1],
"hyperopt_evals": config.get("hyperopt_evals", 30),
})
# Ricerca iperparametri
logger.info("Avvio ricerca iperparametri con Hyperopt...")
best_params = hyperopt_search(X_train, y_train, X_val, y_val,
max_evals=config.get("hyperopt_evals", 30))
mlflow.log_params(best_params)
# Training finale con best params
logger.info("Training modello finale con migliori iperparametri...")
model = train_xgboost(X_train, y_train, X_val, y_val, best_params)
# Calcola e log metriche su tutti i set
val_metrics = compute_metrics(model, X_val, y_val)
test_metrics = compute_metrics(model, X_test, y_test)
mlflow.log_metrics({f"val_{k}": v for k, v in val_metrics.items()})
mlflow.log_metrics({f"test_{k}": v for k, v in test_metrics.items()})
logger.info(f"Test AUC-ROC: {test_metrics['auc_roc']:.4f}")
logger.info(f"Test Recall: {test_metrics['recall']:.4f}")
logger.info(f"Test F1: {test_metrics['f1']:.4f}")
# Log modello in MLflow model registry
mlflow.xgboost.log_model(
model,
artifact_path="model",
registered_model_name="telecomit-churn-xgb"
)
# Salva metriche per DVC
metrics_output = {
"auc_roc": test_metrics["auc_roc"],
"recall": test_metrics["recall"],
"precision": test_metrics["precision"],
"f1": test_metrics["f1"],
"run_id": run.info.run_id,
}
Path("models/registry").mkdir(parents=True, exist_ok=True)
with open("models/registry/metrics.json", "w") as f:
json.dump(metrics_output, f, indent=2)
# Salva modello per serving
with open("models/registry/challenger_model.pkl", "wb") as f:
pickle.dump(model, f)
logger.info(f"Training completato. MLflow run ID: {run.info.run_id}")
if __name__ == "__main__":
run_training_pipeline()
Model Serving: FastAPI pro Real-Time a Batch
Servírovací systém podporuje dva režimy: v reálném čase pro dotazy agentů call centra (jeden zákazník, <200 ms) e šarže pro měsíční skóring celé zákaznické databáze (2,1 milionu za <4 hodiny). Oba režimy používají stejný model a logiku předběžného zpracování k zajištění konzistence.
# api.py
# FastAPI serving per churn prediction - real-time e batch
# Espone /predict (singolo), /predict/batch, /health, /metrics
import os
import pickle
import time
import logging
from contextlib import asynccontextmanager
from typing import List, Optional
from pathlib import Path
import mlflow
import pandas as pd
import numpy as np
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from prometheus_client import (
Counter, Histogram, Gauge,
generate_latest, CONTENT_TYPE_LATEST
)
from starlette.responses import Response
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Prometheus Metrics ----
PREDICTIONS_TOTAL = Counter(
"churn_predictions_total",
"Numero totale di predizioni",
["mode", "model_version"]
)
PREDICTION_DURATION = Histogram(
"churn_prediction_duration_seconds",
"Durata predizione in secondi",
["mode"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1.0, 5.0]
)
CHURN_PROBABILITY_GAUGE = Gauge(
"churn_prediction_avg_probability",
"Probabilità media di churn nelle ultime predizioni"
)
MODEL_VERSION_INFO = Gauge(
"model_version_info",
"Informazioni sulla versione del modello caricato",
["version", "run_id"]
)
# ---- Schema Pydantic ----
class CustomerFeatures(BaseModel):
"""Feature di un singolo cliente per la predizione real-time."""
customer_id: str = Field(..., description="ID univoco del cliente")
recency_days: float = Field(..., ge=0, le=3650, description="Giorni dall'ultima transazione")
frequency: int = Field(..., ge=0, description="Numero di transazioni storiche")
monetary_total: float = Field(..., ge=0, description="Valore monetario totale EUR")
monetary_avg: float = Field(..., ge=0)
tenure_days: int = Field(..., ge=0)
activity_trend: float = Field(..., ge=0)
calls_count_30d: int = Field(..., ge=0)
data_usage_gb_30d: float = Field(..., ge=0)
support_tickets_90d: int = Field(..., ge=0)
frustration_index: float = Field(..., ge=0.0, le=1.0)
plan_monthly_eur: float = Field(..., ge=0)
contract_type_monthly: bool = Field(False)
contract_type_annual: bool = Field(False)
payment_method_auto: bool = Field(False)
age_group_18_35: bool = Field(False)
age_group_36_55: bool = Field(False)
@validator("monetary_total")
def monetary_must_be_reasonable(cls, v):
if v > 1_000_000:
raise ValueError("Valore monetario non plausibile (> 1M EUR)")
return v
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_prediction: bool
confidence: str # "high", "medium", "low"
model_version: str
class BatchPredictionRequest(BaseModel):
customers: List[CustomerFeatures]
threshold: Optional[float] = Field(0.5, ge=0.0, le=1.0)
# ---- Global Model State ----
model_state: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
logger.info("Caricamento modello da MLflow registry...")
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000"))
model_name = os.getenv("MODEL_NAME", "telecomit-churn-xgb")
model_alias = os.getenv("MODEL_ALIAS", "champion")
try:
model_uri = f"models:/{model_name}@{model_alias}"
model_state["model"] = mlflow.xgboost.load_model(model_uri)
model_state["version"] = os.getenv("MODEL_VERSION", "unknown")
model_state["run_id"] = os.getenv("MLFLOW_RUN_ID", "unknown")
MODEL_VERSION_INFO.labels(
version=model_state["version"],
run_id=model_state["run_id"]
).set(1)
logger.info(f"Modello caricato: {model_name} @ {model_alias} v{model_state['version']}")
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise
yield # Server in esecuzione
logger.info("Shutdown: deallocazione modello...")
model_state.clear()
# ---- FastAPI App ----
app = FastAPI(
title="TelecomIT Churn Prediction API",
description="API per predizione churn clienti - TelecomIT S.p.A.",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://crm.telecomit.internal"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
def features_to_dataframe(customer: CustomerFeatures) -> pd.DataFrame:
"""Converte un oggetto CustomerFeatures in DataFrame per il modello."""
data = customer.dict(exclude={"customer_id"})
return pd.DataFrame([data])
def get_confidence(probability: float) -> str:
"""Classifica la confidenza della predizione."""
if probability >= 0.75 or probability <= 0.25:
return "high"
elif probability >= 0.60 or probability <= 0.40:
return "medium"
return "low"
@app.get("/health")
async def health_check():
"""Health check per Kubernetes liveness/readiness probe."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non caricato")
return {
"status": "healthy",
"model_version": model_state.get("version", "unknown"),
"model_loaded": True
}
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(customer: CustomerFeatures):
"""Predizione real-time per singolo cliente (<200ms p99)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
try:
X = features_to_dataframe(customer)
proba = float(model_state["model"].predict_proba(X)[0, 1])
prediction = proba >= 0.5
PREDICTIONS_TOTAL.labels(mode="realtime", model_version=model_state["version"]).inc()
CHURN_PROBABILITY_GAUGE.set(proba)
return PredictionResponse(
customer_id=customer.customer_id,
churn_probability=round(proba, 4),
churn_prediction=bool(prediction),
confidence=get_confidence(proba),
model_version=model_state.get("version", "unknown")
)
except Exception as e:
logger.error(f"Errore predizione per customer {customer.customer_id}: {e}")
raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}")
finally:
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="realtime").observe(duration)
@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch - ottimizzata per alti volumi (2.1M clienti/notte)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
n_customers = len(request.customers)
try:
# Converti in DataFrame vettorizzato (molto più efficiente del loop)
rows = [c.dict(exclude={"customer_id"}) for c in request.customers]
customer_ids = [c.customer_id for c in request.customers]
X = pd.DataFrame(rows)
probas = model_state["model"].predict_proba(X)[:, 1]
predictions = (probas >= request.threshold).astype(bool)
results = [
{
"customer_id": cid,
"churn_probability": round(float(p), 4),
"churn_prediction": bool(pred),
"confidence": get_confidence(float(p))
}
for cid, p, pred in zip(customer_ids, probas, predictions)
]
PREDICTIONS_TOTAL.labels(mode="batch", model_version=model_state["version"]).inc(n_customers)
CHURN_PROBABILITY_GAUGE.set(float(probas.mean()))
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="batch").observe(duration)
return {
"results": results,
"total_customers": n_customers,
"churn_count": int(predictions.sum()),
"churn_rate": round(float(predictions.mean()), 4),
"processing_time_seconds": round(duration, 3),
"model_version": model_state.get("version", "unknown")
}
except Exception as e:
logger.error(f"Errore batch prediction: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)
Monitorování: Detekce posunu a upozornění
Monitorování není omezeno na metriky infrastruktury (latence, uptime): musí zahrnovat kvalitu předpovědí v čase. Používáme Evidentně AI pro detekce posunu e Prometheus + Grafana pro sledování v reálném čase. Důležitou součástí je automatické přeškolení, když model degraduje.
# drift_detector.py
# Rilevamento drift su dati e predizioni con Evidently AI
# Eseguito ogni settimana via cron job o GitHub Actions scheduled
import json
import logging
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DatasetMissingValuesSummary,
ColumnDriftMetric
)
logger = logging.getLogger(__name__)
class ChurnDriftDetector:
"""
Monitora il drift nei dati e nelle predizioni del modello churn.
Confronta la distribuzione dei dati di produzione con il reference dataset
(i dati su cui il modello e stato addestrato).
"""
def __init__(
self,
reference_path: str = "data/features/feature_matrix_reference.parquet",
drift_threshold: float = 0.2,
alert_webhook: Optional[str] = None
):
self.reference_df = pd.read_parquet(reference_path)
self.drift_threshold = drift_threshold
self.alert_webhook = alert_webhook
def analyze_drift(
self,
current_df: pd.DataFrame,
analysis_date: datetime = None
) -> dict:
"""
Esegue analisi completa di drift sul dataset corrente.
Args:
current_df: Dati di produzione del periodo corrente
analysis_date: Data di riferimento dell'analisi
Returns:
dict con risultati drift e flag retraining_required
"""
analysis_date = analysis_date or datetime.utcnow()
# ---- Report Evidently ----
report = Report(metrics=[
DatasetDriftMetric(),
DatasetMissingValuesSummary(),
# Monitoriamo le feature più predittive
ColumnDriftMetric(column_name="recency_days"),
ColumnDriftMetric(column_name="frustration_index"),
ColumnDriftMetric(column_name="activity_trend"),
ColumnDriftMetric(column_name="data_usage_gb_30d"),
])
report.run(
reference_data=self.reference_df,
current_data=current_df
)
report_dict = report.as_dict()
drift_results = report_dict["metrics"][0]["result"]
dataset_drift_detected = drift_results["dataset_drift"]
drift_share = drift_results["drift_share"]
n_drifted_features = drift_results["number_of_drifted_columns"]
# ---- Analisi predizioni (churn rate trend) ----
ref_churn_rate = self.reference_df["churned"].mean() if "churned" in self.reference_df.columns else None
curr_churn_rate = current_df["churn_prediction"].mean() if "churn_prediction" in current_df.columns else None
churn_rate_drift = None
if ref_churn_rate and curr_churn_rate:
churn_rate_drift = abs(curr_churn_rate - ref_churn_rate) / ref_churn_rate
# ---- Decision logic per retraining ----
retraining_required = (
dataset_drift_detected and drift_share > self.drift_threshold
) or (
churn_rate_drift is not None and churn_rate_drift > 0.30
)
results = {
"analysis_date": analysis_date.isoformat(),
"dataset_drift_detected": bool(dataset_drift_detected),
"drift_share": float(drift_share),
"n_drifted_features": int(n_drifted_features),
"drift_threshold": self.drift_threshold,
"reference_churn_rate": float(ref_churn_rate) if ref_churn_rate else None,
"current_churn_rate": float(curr_churn_rate) if curr_churn_rate else None,
"churn_rate_drift_pct": float(churn_rate_drift * 100) if churn_rate_drift else None,
"retraining_required": bool(retraining_required),
"severity": self._compute_severity(drift_share, churn_rate_drift)
}
# Salva report HTML
report_path = f"monitoring/drift_report_{analysis_date.strftime('%Y%m%d')}.html"
Path("monitoring").mkdir(exist_ok=True)
report.save_html(report_path)
logger.info(f"Report drift salvato: {report_path}")
if retraining_required:
self._trigger_retraining_alert(results)
return results
def _compute_severity(self, drift_share: float, churn_rate_drift: Optional[float]) -> str:
if drift_share > 0.5 or (churn_rate_drift and churn_rate_drift > 0.5):
return "critical"
elif drift_share > 0.3 or (churn_rate_drift and churn_rate_drift > 0.3):
return "high"
elif drift_share > self.drift_threshold:
return "medium"
return "low"
def _trigger_retraining_alert(self, results: dict):
"""Notifica il team e trigghera il retraining via GitHub Actions dispatch."""
logger.warning(f"DRIFT CRITICO rilevato - Retraining richiesto! {results}")
if self.alert_webhook:
import requests
payload = {
"event_type": "retrain-trigger",
"client_payload": {
"reason": "drift_detected",
"drift_share": results["drift_share"],
"analysis_date": results["analysis_date"]
}
}
resp = requests.post(
self.alert_webhook,
json=payload,
headers={"Authorization": f"token ${GITHUB_TOKEN}"}
)
logger.info(f"GitHub Actions dispatch: HTTP {resp.status_code}")
CI/CD kanál: Akce GitHubu dokončeny
Kanál akcí GitHub integruje všechny komponenty: od ověřování dat po školení, od hodnocení po nasazení na Kubernetes. Pracovní postup je navržen rychle selhat blokováním nasazení, pokud tomu tak není překročit stanovené prahové hodnoty.
name: ML Training Pipeline - TelecomIT Churn
on:
schedule:
- cron: '0 3 1 * *' # Ogni 1° del mese alle 03:00 UTC
workflow_dispatch:
inputs:
reason:
description: 'Motivo del retraining manuale'
required: false
default: 'manual'
force_deploy:
description: 'Forza deployment anche se metriche non migliorano'
type: boolean
default: false
repository_dispatch:
types: [retrain-trigger]
env:
PYTHON_VERSION: "3.11"
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
DVC_REMOTE_URL: ${{ secrets.DVC_REMOTE_URL }}
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/churn-serving
jobs:
# ---- JOB 1: Data Validation ----
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC remote
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
- name: Pull latest data
run: dvc pull data/raw/
- name: Run data validation
run: dvc repro data_validation
id: validation
- name: Check validation passed
run: |
RESULT=$(cat data/raw/validation_summary.json | python -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d['all_checks_passed'] else 1)")
echo "Validation result: $RESULT"
- name: Upload validation report
uses: actions/upload-artifact@v4
with:
name: validation-report
path: data/raw/validation_report.html
# ---- JOB 2: Training ----
training:
runs-on: ubuntu-latest
needs: data-validation
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC and pull features
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
dvc pull data/features/
- name: Run preprocessing + feature engineering
run: dvc repro preprocessing feature_engineering
- name: Run training
run: dvc repro training
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
- name: Push artifacts to DVC remote
run: dvc push models/
- name: Output metrics
id: metrics
run: |
AUC=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['auc_roc'])")
RECALL=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['recall'])")
echo "auc_roc=$AUC" >> $GITHUB_OUTPUT
echo "recall=$RECALL" >> $GITHUB_OUTPUT
echo "Training completato - AUC: $AUC, Recall: $RECALL"
outputs:
auc_roc: ${{ steps.metrics.outputs.auc_roc }}
recall: ${{ steps.metrics.outputs.recall }}
# ---- JOB 3: Evaluation Gate ----
evaluation-gate:
runs-on: ubuntu-latest
needs: training
steps:
- name: Check metrics threshold
run: |
AUC=${{ needs.training.outputs.auc_roc }}
RECALL=${{ needs.training.outputs.recall }}
MIN_AUC=0.80
MIN_RECALL=0.65
echo "AUC: $AUC (min: $MIN_AUC)"
echo "Recall: $RECALL (min: $MIN_RECALL)"
python -c "
auc = float('$AUC')
recall = float('$RECALL')
if auc < $MIN_AUC:
print(f'FAIL: AUC {auc:.4f} sotto soglia {$MIN_AUC}')
exit(1)
if recall < $MIN_RECALL:
print(f'FAIL: Recall {recall:.4f} sotto soglia {$MIN_RECALL}')
exit(1)
print('PASS: Metriche sopra soglia - deployment approvato')
"
# ---- JOB 4: Build & Push Docker ----
build-push:
runs-on: ubuntu-latest
needs: evaluation-gate
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
file: docker/Dockerfile.serving
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ---- JOB 5: Deploy to Kubernetes ----
deploy:
runs-on: ubuntu-latest
needs: build-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy to Kubernetes (rolling update)
run: |
kubectl set image deployment/churn-serving \
churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n ml-production
kubectl rollout status deployment/churn-serving -n ml-production --timeout=300s
- name: Run smoke tests
run: |
python tests/smoke/test_api_smoke.py \
--endpoint ${{ secrets.API_ENDPOINT }}
- name: Promote model to champion in MLflow
run: |
python -c "
import mlflow
client = mlflow.MlflowClient('${{ env.MLFLOW_TRACKING_URI }}')
# Recupera l'ultima versione Staging
versions = client.get_latest_versions('telecomit-churn-xgb', stages=['Staging'])
if versions:
client.transition_model_version_stage(
name='telecomit-churn-xgb',
version=versions[0].version,
stage='Production',
archive_existing_versions=True
)
print(f'Modello v{versions[0].version} promosso a Production')
"
Výsledky, metriky a návratnost investic
Po 6 měsících provozu ve výrobě, TelecomIT systém predikce churn přineslo měřitelné a zdokumentované výsledky. Následující údaje jsou reprezentativní scénářů reálného světa ve středně velkých telekomunikačních společnostech.
| Metrický | Cíl | Výsledek M1 | Výsledek M6 | Trendy |
|---|---|---|---|---|
| AUC-ROC | ≥ 0,80 | 0,823 | 0,841 | + |
| Odvolat (odvolat) | ≥ 0,65 | 0,703 | 0,718 | + |
| Přesnost (chun) | ≥ 0,60 | 0,634 | 0,672 | + |
| p99 latence v reálném čase | < 200 ms | 87 ms | 82 ms | + |
| Dávková propustnost | 2,1 M za <4h | 2,1M za 2h 47m | 2,1M za 2h 31m | + |
| Uptime API | ≥ 99,9 % | 99,94 % | 99,97 % | + |
| Události upozornění na posun | - | 2 | 1 (M3, opraveno) | + |
Skutečné náklady na infrastrukturu
| Komponent | Měsíční náklady | Roční náklady | Poznámky |
|---|---|---|---|
| VPS Hetzner (2 jádra, 4GB) - obsluhující | 5,83 EUR | 70 EUR | FastAPI + k3s |
| VPS Hetzner (4 jádra, 8GB) - MLflow + MinIO | 15,90 EUR | 191 EUR | Sledování + úložiště |
| VPS Hetzner (2 jádra, 4GB) - monitoring | 5,83 EUR | 70 EUR | Prometheus + Grafana |
| Akce GitHub (bezplatná úroveň) | 0 EUR | 0 EUR | 2000 min/měsíc zdarma |
| Záložní úložiště (objekt Hetzner) | 2,50 EUR | 30 EUR | Vzdálené úložiště DVC |
| Celkový | 30,06 EUR | 361 EUR | Hodně pod 5 000 EUR/rok |
ROI projektu MLOps
Celkové náklady za první rok (vývoj + infrastruktura): ~45 000 EUR (7 měsíců vývojář + infrastruktura). Odhadované roční čisté úspory: 7,2 mil. EUR. ROI za 12 měsíců: ~158x. Doba návratnosti byla 3 týdny.
Poučení a anti-vzorce, kterým je třeba se vyhnout
Anti-Pattern 1: „Nejdřív modelujeme, později MLO“
60 % týmů začíná se šablonou a infrastrukturu přidává až ve chvíli, kdy jde do výroby. Tento přístup zdvojnásobuje náklady na refaktoring. Řešení: konfigurovat DVC, MLflow a minimální kanál CI/CD od 1. dne, i se zástupnými symboly. Mezní náklady a nízké, výhoda je obrovská.
Anti-Pattern 2: Pouze monitorování infrastruktury
Sledování pouze latence a doby provozuschopnosti systému ML nestačí. Model může být technicky „zdravé“ (odpovídá za 50 ms, žádné chyby HTTP 500), ale vytváří předpovědi degradován v důsledku datového driftu. Řešení: vždy sledovat distribuci vstupů a výstupů modelu, nejen systémových metrik.
Anti-Vzor 3: Přeškolení naslepo
Ne všechny rekvalifikace vylepšují model. Pokud je spoušť špatně zkalibrovaná, riskujete skončit s horším modelem ve výrobě. Řešení: každý rekvalifikace musí projít hodnotící bránou s explicitním srovnáním mezi vyzyvatelem a šampionem před nasazením. Automatické vrácení musí být vždy k dispozici.
Konečný kontrolní seznam před nasazením
## Checklist Deploy Modello ML - TelecomIT Churn
### qualità del Modello
- [ ] AUC-ROC test set >= 0.80
- [ ] Recall test set >= 0.65
- [ ] Performance >= champion model attuale
- [ ] Nessuna feature con importanza anomala (segnale data leakage)
- [ ] Test su holdout temporale (dati fuori dal training period)
### Governance
- [ ] Model card generata e approvata da ML Lead
- [ ] Fairness check eseguito (demographic parity < 0.10)
- [ ] SHAP analysis disponibile per top-10 feature
- [ ] Audit trail registrato in MLflow con annotazioni
### Infrastruttura
- [ ] Dockerfile building senza warning di sicurezza
- [ ] Health check /health risponde 200
- [ ] Smoke test /predict con payload reale
- [ ] Latenza p99 < 200ms verificata con k6 o locust
- [ ] Rollback plan documentato e testato
### Monitoring
- [ ] Dashboard Grafana aggiornata con versione modello
- [ ] Alert Prometheus configurati per nuova versione
- [ ] Reference dataset Evidently aggiornato
- [ ] On-call aggiornato sul deploy
### Documenti
- [ ] JIRA ticket di deployment chiuso con evidence
- [ ] Post-deploy review schedulata a T+7 giorni
Závěry: MLOps není projekt, je to praxe
V této případové studii jsme vybudovali kompletní systém MLOps pro predikci odchodu, od obchodního problému až po nasazení na Kubernetes s monitorováním, detekce posunu a řízení. Systém splňuje všechny požadavky: latence <200ms v reálném čase, 2,1 milionu zákazníků za méně než 3 hodiny v dávce, AUC-ROC 0,84, náklady infrastruktura 361 EUR/rok.
Klíčové poselství seriálu je toto: MLOps není technologie, a disciplína. Neexistuje žádný nástroj, který by po instalaci vše vyřešil problémy. A soubor postupů, procesů a kultury, které umožňují modely ML aby časem spolehlivě fungovaly. Kód, který jsme napsali v tomto série je výchozím bodem: skutečná práce je v pozorování, měření, učení a neustále se zlepšovat.
Zdroje pro pokračování
- Tato kompletní série: MLOps: Od experimentu k produkci → CI/CD potrubí → Verze DVC → MLflow → Detekce driftu → Služba FastAPI → Kubernetes → A/B testování → Vládnutí
- Pokročilé hluboké učení: Pokročilá řada hlubokého učení - LoRA, kvantování, hranová AI
- AI inženýrství: AI inženýrská řada - RAG, vektorová databáze, LangChain
- Úložiště GitHub: Kompletní kód pro tuto případovou studii je k dispozici na github.com/federicocalo/telecomit-churn-mlops (struktura, nikoli skutečná data)
Trh MLOps poroste ze 4,38 miliardy dolarů v roce 2026 na 89,18 miliardy dolarů v roce 2035 (CAGR 39,8 %). Dovednosti, které jste získali v této sérii, vás staví do vynikající pozice řešit tento růst. Šťastní MLOps.







