Studiu de caz: MLOps în producție - De la zero la conductă completă
Am acoperit întreaga serie MLOps: de la introducere la conceptele de bază, până la conductele CI/CD cu GitHub Actions și Docker, versiunea cu DVC, urmărirea experimentului cu MLflow, detectarea derivei, la servirea cu FastAPI, scalarea pe Kubernetes, testarea A/B și guvernanța cu AI Act. Acum este momentul să punem totul laolaltă într-un caz real, concret.
În acest studiu de caz vom construi un sistem de la zero predicție de renuntare pentru o companie fictivă de telecomunicații (TelecomIT S.p.A.) care urmează toate principiile MLOps învăţat în serie. Vom începe de la problema afacerii, vom proiecta arhitectura end-to-end, vom implementa fiecare componentă cu cod Python funcțional și vom vedea cum să monitorizăm și menține sistemul în producție în timp. Rezultatul final va fi o conductă MLOps complet, reproductibil și pregătit pentru medii reale.
Ce vei învăța
- Cum să traduceți o problemă de afaceri în arhitectura MLOps end-to-end
- Inginerie de caracteristici specifice pentru predicția churn (RFM, comportament temporal)
- Conductă DVC completă: date, preprocesare, instruire, evaluare
- Urmărire structurată a experimentelor cu MLflow și comparare sistematică a modelelor
- Model care servește cu FastAPI + Uvicorn containerizat și gata pentru Kubernetes
- Monitorizare în timp real cu Prometheus + Grafana: deriva, latență, KPI de afaceri
- Testare A/B riguroasă din punct de vedere statistic între modelele challenger și campion
- Guvernare completă: card model, pistă de audit, verificare a corectitudinii înainte de implementare
- Estimarea costurilor reale și a rentabilității investiției proiectului MLOps pentru un IMM italian
Problema afacerii: ratarea serviciilor de telecomunicații
TelecomIT S.p.A. și un operator de telecomunicații cu 2,1 milioane de clienți activi. Rata lunară de abandon este de 2,3%, ceea ce înseamnă aproximativ 48.000 de clienți pierduți în fiecare lună. Costul mediu de achiziție a unui nou client este de 180 EUR, în timp ce costul de reținerea unui client existent și 35 EUR. Identificați clienții expuși riscului de abandon Înainte că pleacă este una dintre cele mai puternice pârghii economice disponibile.
Valoarea economică a proiectului
Cu un model care identifică corect 70% dintre cei expuși riscului (rechemare = 0,70) și care are o precizie de 65% (35% fals pozitive, clienți contactați inutil), pe 48.000 de turnatoare lunare:
- Tocitorii identificați: 48.000 × 0,70 = 33.600
- Costul campaniei de păstrare: 33.600 / 0,65 × 35 EUR = ~1,8 milioane EUR/lună
- Economii de achiziție evitate: 33.600 × 0,40 × 180 EUR = ~2,4 milioane EUR/lună
- Rentabilitatea netă estimată a investiției: +600K EUR/lună sau 7,2M EUR/an
Costul infrastructurii MLOps pentru această conductă: aproximativ 3.500 EUR/an pe cloud.
Cerințe tehnice și de afaceri
Înainte de a scrie cod, definim cerințele concrete care vor ghida fiecare alegere arhitecturală. Aceste cerințe apar din conversațiile cu echipa de afaceri și din contextul tehnic existente.
| Cerinţă | Caietul de sarcini | Constrângere |
|---|---|---|
| Frecvența de predicție | Lot lunar, punctaj pe toți clienții activi | Până la data de 1 a lunii pentru campania CRM |
| Latența de punctare a lotului | 2,1 milioane de clienți în mai puțin de 4 ore | Ora slot CRM: 02:00-06:00 |
| Latență în timp real | Punctajul unui singur client pentru agenții call center | < 200 ms p99 |
| Valoare țintă | AUC-ROC ≥ 0,80, Recall ≥ 0,65 | Validat pe setul de reținere lunară |
| Recalificare | Lunar, automat pe date noi | Declanșează și la detectarea derivei |
| Guvernare | Fișă de model, pistă de audit, verificare a corectitudinii | Actul AI: risc limitat (sectorul telecomunicațiilor) |
| Stive | 100% open-source, on-premise + cloud hibrid | Bugetul infrastructurii: < 5.000 EUR/an |
Arhitectură end-to-end
Arhitectura sistemului integrează toate instrumentele din seria MLOps într-un flux coeziv. Fiecare componentă are un rol precis și comunică cu celelalte prin interfețe bine definite. Principiul călăuzitor este separarea responsabilitatilor: datele, antrenamentul și difuzarea live în straturi independente care pot fi actualizate fără a-i afecta pe celelalte.
+------------------+ +------------------+ +-------------------+
| DATA SOURCES | | DATA PIPELINE | | TRAINING LAYER |
| | | | | |
| - CRM Database |-->| - DVC Pipeline |-->| - MLflow Tracking |
| - CDR (Call Data)| | - Great Expects | | - XGBoost/RF/LGB |
| - Billing System | | - Feature Store | | - Hyperopt Tuning |
| - App Usage Logs | | - DVC Remote | | - Model Registry |
+------------------+ +------------------+ +-------------------+
|
(model promoted)
|
+------------------+ +------------------+ +-------------------+
| MONITORING | | SERVING | | CI/CD PIPELINE |
| | | | | |
| - Prometheus |<--| - FastAPI REST |<--| - GitHub Actions |
| - Grafana Dash | | - Batch Scorer | | - Docker Build |
| - Drift Alerts | | - Kubernetes | | - Auto-tests |
| - Retrain Trig. | | - Load Balancer | | - Auto-deploy |
+------------------+ +------------------+ +-------------------+
|
+------+-------+
| GOVERNANCE |
| |
| - Model Card |
| - Audit Log |
| - Fairness |
+--------------+
Tehnologia completă
| Straturi | Instrumente | Versiune | Cost |
|---|---|---|---|
| Versiunea datelor | DVC + MinIO (compatibil S3) | DVC 3.x | Sursă deschisă |
| Urmărirea experimentului | MLflow auto-găzduit | MLflow 2.x | Sursă deschisă |
| Validarea datelor | Mari așteptări | GX 0,18 | Sursă deschisă |
| CI/CD | Acțiuni GitHub | - | Niveluri gratuite |
| Containerizarea | Docker + Docker Compose | Docker 24.x | Sursă deschisă |
| Servire model | FastAPI + Uvicorn | FastAPI 0.110 | Sursă deschisă |
| Orchestrație | Kubernetes (k3s) | k3s 1,28 | Sursă deschisă |
| Monitorizare | Prometeu + Grafana | Balul 2.47 | Sursă deschisă |
| Detectarea derivei | Evident AI | 0,4.x | Sursă deschisă |
| Guvernare/Echitate | Fairlearn + SHAP | FL 0,10 | Sursă deschisă |
| Cadrul ML | XGBoost + LightGBM + scikit-learn | XGB 2.0 | Sursă deschisă |
| Infrastructura cloud | Hetzner Cloud (VPS 2 nuclee / 4 GB RAM) | - | ~360 EUR/an |
Structura și configurația depozitului
O structură de depozit bine organizată este fundamentul fiecărui proiect MLOps întreținută. Următoarea structură separă în mod clar codul, configurația, testarea și documentare, urmând principiile coeziunii ridicate și cuplajului scăzut.
telecomit-churn/
├── .github/
│ └── workflows/
│ ├── ml-pipeline.yml # Pipeline CI/CD principale
│ ├── retrain-trigger.yml # Trigger retraining automatico
│ └── pr-validation.yml # Validazione PR
├── config/
│ ├── model_config.yaml # Iperparametri e configurazione modello
│ ├── feature_config.yaml # Feature set e preprocessing
│ └── serving_config.yaml # Configurazione FastAPI
├── data/
│ ├── raw/ # Dati grezzi (tracciati da DVC)
│ ├── processed/ # Dati preprocessati (tracciati da DVC)
│ └── features/ # Feature engineered (tracciati da DVC)
├── models/
│ └── registry/ # Modelli registrati (tracciati da DVC)
├── src/
│ ├── data/
│ │ ├── ingestion.py # Estrazione dati da CRM/CDR
│ │ ├── validation.py # Great Expectations checks
│ │ └── preprocessing.py # Pulizia e trasformazione
│ ├── features/
│ │ ├── rfm_features.py # Feature RFM (Recency, Frequency, Monetary)
│ │ ├── behavioral.py # Feature comportamentali
│ │ └── feature_store.py # Feature store locale
│ ├── training/
│ │ ├── train.py # Training loop con MLflow
│ │ ├── evaluate.py # Evaluation e confronto col production model
│ │ └── hyperopt_search.py # Tuning iperparametri
│ ├── serving/
│ │ ├── api.py # FastAPI app
│ │ ├── batch_scorer.py # Scoring batch mensile
│ │ └── middleware.py # Logging, rate limiting, metrics
│ ├── monitoring/
│ │ ├── drift_detector.py # Evidently drift detection
│ │ ├── metrics_exporter.py # Prometheus metrics
│ │ └── alert_manager.py # Alerting e trigger retraining
│ └── governance/
│ ├── model_card.py # Generatore model card
│ ├── fairness_checker.py # Analisi fairness con Fairlearn
│ └── audit_logger.py # Audit trail immodificabile
├── tests/
│ ├── unit/ # Test unitari per ogni modulo
│ ├── integration/ # Test integrazione pipeline
│ └── smoke/ # Smoke test API post-deploy
├── dvc.yaml # Pipeline DVC stages
├── params.yaml # Parametri DVC
├── docker/
│ ├── Dockerfile.training # Immagine training
│ ├── Dockerfile.serving # Immagine serving
│ └── docker-compose.yml # Stack locale completo
├── k8s/
│ ├── deployment.yaml # Kubernetes deployment
│ ├── service.yaml # Kubernetes service
│ └── hpa.yaml # Horizontal Pod Autoscaler
└── notebooks/
└── exploration/ # Notebook EDA (non in pipeline)
Inginerie caracteristică pentru Predicția abandonului
Ingineria caracteristicilor este adesea diferența dintre un model mediocru și unul excelent. Pentru predicția abandonului în sectorul telecomunicațiilor, cele mai predictive caracteristici provin de la comportamentul clientului în timp. We use the framework RFM (Recență, Frecvență, Monetar) ca punct de plecare, îmbogățit cu caracteristici caracteristici comportamentale specifice domeniului telecom.
# rfm_features.py
# Feature engineering per churn prediction nel settore telecom
# Produce un dataset tabellare con 45 feature predittive
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Tuple
import mlflow
def compute_rfm_features(
df_transactions: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Calcola le feature RFM (Recency, Frequency, Monetary) per ogni cliente.
Args:
df_transactions: DataFrame con transazioni/ricariche
snapshot_date: Data di riferimento per il calcolo
customer_id_col: Nome colonna identificativo cliente
Returns:
DataFrame con una riga per cliente e le feature RFM
"""
df = df_transactions.copy()
df["transaction_date"] = pd.to_datetime(df["transaction_date"])
rfm = df.groupby(customer_id_col).agg(
recency_days=("transaction_date", lambda x: (snapshot_date - x.max()).days),
frequency=("transaction_date", "count"),
monetary_total=("amount", "sum"),
monetary_avg=("amount", "mean"),
monetary_std=("amount", "std"),
first_transaction=("transaction_date", "min"),
last_transaction=("transaction_date", "max"),
).reset_index()
# Tenure in giorni (durata rapporto col cliente)
rfm["tenure_days"] = (snapshot_date - rfm["first_transaction"]).dt.days
# Variazione dell'attivita (ultimi 30 gg vs media storica)
last_30_days = snapshot_date - timedelta(days=30)
recent = df[df["transaction_date"] >= last_30_days].groupby(customer_id_col).agg(
recent_frequency=("transaction_date", "count"),
recent_monetary=("amount", "sum")
).reset_index()
rfm = rfm.merge(recent, on=customer_id_col, how="left")
rfm["recent_frequency"] = rfm["recent_frequency"].fillna(0)
rfm["recent_monetary"] = rfm["recent_monetary"].fillna(0)
# Trend: rapporto attivita recente vs attivita media mensile
avg_monthly_freq = rfm["frequency"] / (rfm["tenure_days"] / 30).clip(lower=1)
rfm["activity_trend"] = rfm["recent_frequency"] / avg_monthly_freq.clip(lower=0.01)
return rfm
def compute_behavioral_features(
df_cdr: pd.DataFrame,
df_support: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Feature comportamentali da CDR (call detail records) e ticket di supporto.
Queste feature catturano segnali predittivi di churn specifici del dominio telecom.
"""
# ---- Feature CDR: comportamento nelle chiamate ----
cdr_30d = df_cdr[
df_cdr["call_date"] >= snapshot_date - timedelta(days=30)
]
cdr_features = cdr_30d.groupby(customer_id_col).agg(
calls_count_30d=("call_id", "count"),
calls_duration_avg_30d=("duration_seconds", "mean"),
calls_duration_total_30d=("duration_seconds", "sum"),
data_usage_gb_30d=("data_usage_mb", lambda x: x.sum() / 1024),
international_calls_ratio=("is_international", "mean"),
peak_hour_ratio=("is_peak_hour", "mean"),
unique_contacts_30d=("called_number_hash", "nunique"),
).reset_index()
# ---- Feature supporto: interazioni negative ----
support_30d = df_support[
df_support["ticket_date"] >= snapshot_date - timedelta(days=90)
]
support_features = support_30d.groupby(customer_id_col).agg(
support_tickets_90d=("ticket_id", "count"),
support_escalations_90d=("is_escalated", "sum"),
avg_resolution_time_h=("resolution_hours", "mean"),
complaint_ratio=("is_complaint", "mean"),
unresolved_tickets=("is_resolved", lambda x: (~x).sum()),
).reset_index()
behavioral = cdr_features.merge(support_features, on=customer_id_col, how="left")
behavioral = behavioral.fillna(0)
# Feature derivata: "frustration index" - proxy di insoddisfazione
behavioral["frustration_index"] = (
behavioral["support_tickets_90d"] * 2 +
behavioral["support_escalations_90d"] * 5 +
behavioral["unresolved_tickets"] * 3
).clip(upper=20) / 20 # Normalizzato 0-1
return behavioral
def build_feature_matrix(
df_customers: pd.DataFrame,
df_rfm: pd.DataFrame,
df_behavioral: pd.DataFrame,
label_col: str = "churned"
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Assembla la matrice delle feature finale unendo tutte le sorgenti.
Returns:
(X, y) - feature matrix e label vector
"""
features = df_customers[[
"customer_id", "contract_type", "payment_method",
"age_group", "region", "plan_monthly_eur", label_col
]].merge(df_rfm, on="customer_id", how="left") \
.merge(df_behavioral, on="customer_id", how="left")
# Encoding categoriche
cat_cols = ["contract_type", "payment_method", "age_group", "region"]
features = pd.get_dummies(features, columns=cat_cols, drop_first=True)
y = features[label_col].astype(int)
X = features.drop(columns=["customer_id", label_col,
"first_transaction", "last_transaction"])
return X, y
DVC Pipeline: reproductibilitate de la capăt la capăt
Conducta DVC definește fiecare etapă a procesului ca un grafic aciclic direcționat (DAG): fiecare etapă își declară propriile intrări (dep), ieșiri (ieșiri) și parametri. DVC urmărește dependențe și reluează automat doar etapele invalidate de modificări, exact cum ar fi Make, dar pentru conducte ML cu date și modele.
stages:
# Stage 1: Ingestion - estrae dati da CRM/CDR e salva snapshot
data_ingestion:
cmd: python src/data/ingestion.py
deps:
- src/data/ingestion.py
- config/feature_config.yaml
params:
- snapshot_date
- data.source_db_table
outs:
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
metrics:
- data/raw/ingestion_report.json:
cache: false
# Stage 2: Validation - Great Expectations checks sui dati grezzi
data_validation:
cmd: python src/data/validation.py
deps:
- src/data/validation.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
params:
- validation.min_rows
- validation.max_null_ratio
outs:
- data/raw/validation_report.html:
cache: false
metrics:
- data/raw/validation_summary.json:
cache: false
# Stage 3: Preprocessing - pulizia e trasformazione
preprocessing:
cmd: python src/data/preprocessing.py
deps:
- src/data/preprocessing.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
- config/feature_config.yaml
params:
- preprocessing.imputation_strategy
- preprocessing.outlier_threshold
outs:
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
# Stage 4: Feature Engineering - RFM + behavioral
feature_engineering:
cmd: python src/features/rfm_features.py
deps:
- src/features/rfm_features.py
- src/features/behavioral.py
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
params:
- features.lookback_days
- features.rfm_bins
outs:
- data/features/feature_matrix.parquet
- data/features/feature_names.json
# Stage 5: Training - XGBoost con MLflow tracking
training:
cmd: python src/training/train.py
deps:
- src/training/train.py
- data/features/feature_matrix.parquet
- config/model_config.yaml
params:
- model.algorithm
- model.n_estimators
- model.max_depth
- model.learning_rate
- model.subsample
- model.scale_pos_weight
- training.test_size
- training.random_seed
outs:
- models/registry/challenger_model.pkl
- models/registry/preprocessor.pkl
metrics:
- models/registry/metrics.json:
cache: false
plots:
- models/registry/confusion_matrix.json:
cache: false
- models/registry/roc_curve.json:
cache: false
# Stage 6: Evaluation - confronto challenger vs champion
evaluation:
cmd: python src/training/evaluate.py
deps:
- src/training/evaluate.py
- models/registry/challenger_model.pkl
- data/features/feature_matrix.parquet
params:
- evaluation.min_auc_roc
- evaluation.min_recall
- evaluation.promotion_strategy
outs:
- models/registry/evaluation_report.json:
cache: false
- models/registry/model_card.json:
cache: false
Antrenament cu MLflow: Urmărire completă a experimentului
Modulul de instruire integrează MLflow pentru urmărirea sistematică a fiecărui experiment. Noi folosim XGBoost ca algoritm principal datorită echilibrului său excelent între performanță și interpretabilitate în datele tabulare, cu un mecanism de oprire timpurie și căutare de hiperparametri prin Hyperopt.
# train.py
# Training pipeline per churn prediction con MLflow tracking
# Eseguito dalla pipeline DVC: python src/training/train.py
import os
import json
import pickle
import logging
from pathlib import Path
from typing import Dict, Tuple
import mlflow
import mlflow.xgboost
import pandas as pd
import numpy as np
import xgboost as xgb
import yaml
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
roc_auc_score, f1_score, precision_score,
recall_score, accuracy_score, confusion_matrix
)
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(config_path: str = "config/model_config.yaml") -> dict:
"""Carica configurazione modello da file YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def load_features(features_path: str = "data/features/feature_matrix.parquet") -> Tuple:
"""Carica feature matrix e prepara train/val/test split."""
df = pd.read_parquet(features_path)
# Leggi i parametri DVC
with open("params.yaml") as f:
params = yaml.safe_load(f)
test_size = params["training"]["test_size"]
seed = params["training"]["random_seed"]
label_col = "churned"
X = df.drop(columns=[label_col])
y = df[label_col].astype(int)
# Stratified split per preservare il bilanciamento delle classi
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=seed, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.15, random_state=seed, stratify=y_train
)
logger.info(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")
logger.info(f"Churn rate - Train: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")
return X_train, X_val, X_test, y_train, y_val, y_test
def train_xgboost(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
params: dict
) -> xgb.XGBClassifier:
"""Addestra un modello XGBoost con early stopping."""
model = xgb.XGBClassifier(
n_estimators=params.get("n_estimators", 500),
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.05),
subsample=params.get("subsample", 0.8),
colsample_bytree=params.get("colsample_bytree", 0.8),
scale_pos_weight=params.get("scale_pos_weight", 4.0), # Bilancia classi sbilanciate
min_child_weight=params.get("min_child_weight", 5),
reg_alpha=params.get("reg_alpha", 0.1),
reg_lambda=params.get("reg_lambda", 1.0),
use_label_encoder=False,
eval_metric="auc",
random_state=42,
n_jobs=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=100
)
return model
def compute_metrics(
model: xgb.XGBClassifier,
X: pd.DataFrame,
y: pd.Series,
threshold: float = 0.5
) -> Dict[str, float]:
"""Calcola metriche complete di valutazione."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = (y_proba >= threshold).astype(int)
cm = confusion_matrix(y, y_pred)
return {
"auc_roc": roc_auc_score(y, y_proba),
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, zero_division=0),
"recall": recall_score(y, y_pred, zero_division=0),
"f1": f1_score(y, y_pred, zero_division=0),
"tn": int(cm[0][0]),
"fp": int(cm[0][1]),
"fn": int(cm[1][0]),
"tp": int(cm[1][1]),
}
def hyperopt_search(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
max_evals: int = 30
) -> dict:
"""Ricerca iperparametri con Hyperopt (Tree-structured Parzen Estimator)."""
space = {
"max_depth": hp.choice("max_depth", [4, 5, 6, 7, 8]),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
"n_estimators": hp.choice("n_estimators", [200, 300, 500, 700]),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
"scale_pos_weight": hp.uniform("scale_pos_weight", 2.0, 8.0),
"min_child_weight": hp.choice("min_child_weight", [3, 5, 7, 10]),
"reg_alpha": hp.loguniform("reg_alpha", np.log(0.01), np.log(1.0)),
}
def objective(params):
model = train_xgboost(X_train, y_train, X_val, y_val, params)
metrics = compute_metrics(model, X_val, y_val)
# Ottimizziamo su F1 per bilanciare precision e recall
return {"loss": -metrics["f1"], "status": STATUS_OK}
trials = Trials()
best = fmin(objective, space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best
def run_training_pipeline():
"""Pipeline di training principale con MLflow tracking completo."""
config = load_config()
X_train, X_val, X_test, y_train, y_val, y_test = load_features()
# Configura MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("telecomit-churn-prediction")
with mlflow.start_run(run_name=f"xgb-training-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}") as run:
# Log parametri di sistema
mlflow.log_params({
"algorithm": "XGBoostClassifier",
"train_size": len(X_train),
"val_size": len(X_val),
"test_size": len(X_test),
"churn_rate_train": float(y_train.mean()),
"feature_count": X_train.shape[1],
"hyperopt_evals": config.get("hyperopt_evals", 30),
})
# Ricerca iperparametri
logger.info("Avvio ricerca iperparametri con Hyperopt...")
best_params = hyperopt_search(X_train, y_train, X_val, y_val,
max_evals=config.get("hyperopt_evals", 30))
mlflow.log_params(best_params)
# Training finale con best params
logger.info("Training modello finale con migliori iperparametri...")
model = train_xgboost(X_train, y_train, X_val, y_val, best_params)
# Calcola e log metriche su tutti i set
val_metrics = compute_metrics(model, X_val, y_val)
test_metrics = compute_metrics(model, X_test, y_test)
mlflow.log_metrics({f"val_{k}": v for k, v in val_metrics.items()})
mlflow.log_metrics({f"test_{k}": v for k, v in test_metrics.items()})
logger.info(f"Test AUC-ROC: {test_metrics['auc_roc']:.4f}")
logger.info(f"Test Recall: {test_metrics['recall']:.4f}")
logger.info(f"Test F1: {test_metrics['f1']:.4f}")
# Log modello in MLflow model registry
mlflow.xgboost.log_model(
model,
artifact_path="model",
registered_model_name="telecomit-churn-xgb"
)
# Salva metriche per DVC
metrics_output = {
"auc_roc": test_metrics["auc_roc"],
"recall": test_metrics["recall"],
"precision": test_metrics["precision"],
"f1": test_metrics["f1"],
"run_id": run.info.run_id,
}
Path("models/registry").mkdir(parents=True, exist_ok=True)
with open("models/registry/metrics.json", "w") as f:
json.dump(metrics_output, f, indent=2)
# Salva modello per serving
with open("models/registry/challenger_model.pkl", "wb") as f:
pickle.dump(model, f)
logger.info(f"Training completato. MLflow run ID: {run.info.run_id}")
if __name__ == "__main__":
run_training_pipeline()
Model de difuzare: FastAPI pentru în timp real și lot
Sistemul de servire acceptă două moduri: în timp real pentru interogări de agenți de call center (client unic, <200ms) e lot pentru punctajul lunar al întregii baze de date de clienți (2.1M în <4 ore). Ambele moduri folosesc același model și aceeași logică de preprocesare pentru a asigura consistența.
# api.py
# FastAPI serving per churn prediction - real-time e batch
# Espone /predict (singolo), /predict/batch, /health, /metrics
import os
import pickle
import time
import logging
from contextlib import asynccontextmanager
from typing import List, Optional
from pathlib import Path
import mlflow
import pandas as pd
import numpy as np
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from prometheus_client import (
Counter, Histogram, Gauge,
generate_latest, CONTENT_TYPE_LATEST
)
from starlette.responses import Response
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Prometheus Metrics ----
PREDICTIONS_TOTAL = Counter(
"churn_predictions_total",
"Numero totale di predizioni",
["mode", "model_version"]
)
PREDICTION_DURATION = Histogram(
"churn_prediction_duration_seconds",
"Durata predizione in secondi",
["mode"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1.0, 5.0]
)
CHURN_PROBABILITY_GAUGE = Gauge(
"churn_prediction_avg_probability",
"Probabilità media di churn nelle ultime predizioni"
)
MODEL_VERSION_INFO = Gauge(
"model_version_info",
"Informazioni sulla versione del modello caricato",
["version", "run_id"]
)
# ---- Schema Pydantic ----
class CustomerFeatures(BaseModel):
"""Feature di un singolo cliente per la predizione real-time."""
customer_id: str = Field(..., description="ID univoco del cliente")
recency_days: float = Field(..., ge=0, le=3650, description="Giorni dall'ultima transazione")
frequency: int = Field(..., ge=0, description="Numero di transazioni storiche")
monetary_total: float = Field(..., ge=0, description="Valore monetario totale EUR")
monetary_avg: float = Field(..., ge=0)
tenure_days: int = Field(..., ge=0)
activity_trend: float = Field(..., ge=0)
calls_count_30d: int = Field(..., ge=0)
data_usage_gb_30d: float = Field(..., ge=0)
support_tickets_90d: int = Field(..., ge=0)
frustration_index: float = Field(..., ge=0.0, le=1.0)
plan_monthly_eur: float = Field(..., ge=0)
contract_type_monthly: bool = Field(False)
contract_type_annual: bool = Field(False)
payment_method_auto: bool = Field(False)
age_group_18_35: bool = Field(False)
age_group_36_55: bool = Field(False)
@validator("monetary_total")
def monetary_must_be_reasonable(cls, v):
if v > 1_000_000:
raise ValueError("Valore monetario non plausibile (> 1M EUR)")
return v
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_prediction: bool
confidence: str # "high", "medium", "low"
model_version: str
class BatchPredictionRequest(BaseModel):
customers: List[CustomerFeatures]
threshold: Optional[float] = Field(0.5, ge=0.0, le=1.0)
# ---- Global Model State ----
model_state: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
logger.info("Caricamento modello da MLflow registry...")
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000"))
model_name = os.getenv("MODEL_NAME", "telecomit-churn-xgb")
model_alias = os.getenv("MODEL_ALIAS", "champion")
try:
model_uri = f"models:/{model_name}@{model_alias}"
model_state["model"] = mlflow.xgboost.load_model(model_uri)
model_state["version"] = os.getenv("MODEL_VERSION", "unknown")
model_state["run_id"] = os.getenv("MLFLOW_RUN_ID", "unknown")
MODEL_VERSION_INFO.labels(
version=model_state["version"],
run_id=model_state["run_id"]
).set(1)
logger.info(f"Modello caricato: {model_name} @ {model_alias} v{model_state['version']}")
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise
yield # Server in esecuzione
logger.info("Shutdown: deallocazione modello...")
model_state.clear()
# ---- FastAPI App ----
app = FastAPI(
title="TelecomIT Churn Prediction API",
description="API per predizione churn clienti - TelecomIT S.p.A.",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://crm.telecomit.internal"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
def features_to_dataframe(customer: CustomerFeatures) -> pd.DataFrame:
"""Converte un oggetto CustomerFeatures in DataFrame per il modello."""
data = customer.dict(exclude={"customer_id"})
return pd.DataFrame([data])
def get_confidence(probability: float) -> str:
"""Classifica la confidenza della predizione."""
if probability >= 0.75 or probability <= 0.25:
return "high"
elif probability >= 0.60 or probability <= 0.40:
return "medium"
return "low"
@app.get("/health")
async def health_check():
"""Health check per Kubernetes liveness/readiness probe."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non caricato")
return {
"status": "healthy",
"model_version": model_state.get("version", "unknown"),
"model_loaded": True
}
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(customer: CustomerFeatures):
"""Predizione real-time per singolo cliente (<200ms p99)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
try:
X = features_to_dataframe(customer)
proba = float(model_state["model"].predict_proba(X)[0, 1])
prediction = proba >= 0.5
PREDICTIONS_TOTAL.labels(mode="realtime", model_version=model_state["version"]).inc()
CHURN_PROBABILITY_GAUGE.set(proba)
return PredictionResponse(
customer_id=customer.customer_id,
churn_probability=round(proba, 4),
churn_prediction=bool(prediction),
confidence=get_confidence(proba),
model_version=model_state.get("version", "unknown")
)
except Exception as e:
logger.error(f"Errore predizione per customer {customer.customer_id}: {e}")
raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}")
finally:
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="realtime").observe(duration)
@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch - ottimizzata per alti volumi (2.1M clienti/notte)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
n_customers = len(request.customers)
try:
# Converti in DataFrame vettorizzato (molto più efficiente del loop)
rows = [c.dict(exclude={"customer_id"}) for c in request.customers]
customer_ids = [c.customer_id for c in request.customers]
X = pd.DataFrame(rows)
probas = model_state["model"].predict_proba(X)[:, 1]
predictions = (probas >= request.threshold).astype(bool)
results = [
{
"customer_id": cid,
"churn_probability": round(float(p), 4),
"churn_prediction": bool(pred),
"confidence": get_confidence(float(p))
}
for cid, p, pred in zip(customer_ids, probas, predictions)
]
PREDICTIONS_TOTAL.labels(mode="batch", model_version=model_state["version"]).inc(n_customers)
CHURN_PROBABILITY_GAUGE.set(float(probas.mean()))
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="batch").observe(duration)
return {
"results": results,
"total_customers": n_customers,
"churn_count": int(predictions.sum()),
"churn_rate": round(float(predictions.mean()), 4),
"processing_time_seconds": round(duration, 3),
"model_version": model_state.get("version", "unknown")
}
except Exception as e:
logger.error(f"Errore batch prediction: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)
Monitorizare: Detecție și alertă de deriva
Monitorizarea nu se limitează la valorile infrastructurii (latență, timp de funcționare): trebuie să includă calitatea previziunilor în timp. Noi folosim Evident AI pentru detectarea derivei e Prometeu + Grafana pentru monitorizare în timp real. O componentă critică este declanșarea automată a reantrenării atunci când modelul se degradează.
# drift_detector.py
# Rilevamento drift su dati e predizioni con Evidently AI
# Eseguito ogni settimana via cron job o GitHub Actions scheduled
import json
import logging
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DatasetMissingValuesSummary,
ColumnDriftMetric
)
logger = logging.getLogger(__name__)
class ChurnDriftDetector:
"""
Monitora il drift nei dati e nelle predizioni del modello churn.
Confronta la distribuzione dei dati di produzione con il reference dataset
(i dati su cui il modello e stato addestrato).
"""
def __init__(
self,
reference_path: str = "data/features/feature_matrix_reference.parquet",
drift_threshold: float = 0.2,
alert_webhook: Optional[str] = None
):
self.reference_df = pd.read_parquet(reference_path)
self.drift_threshold = drift_threshold
self.alert_webhook = alert_webhook
def analyze_drift(
self,
current_df: pd.DataFrame,
analysis_date: datetime = None
) -> dict:
"""
Esegue analisi completa di drift sul dataset corrente.
Args:
current_df: Dati di produzione del periodo corrente
analysis_date: Data di riferimento dell'analisi
Returns:
dict con risultati drift e flag retraining_required
"""
analysis_date = analysis_date or datetime.utcnow()
# ---- Report Evidently ----
report = Report(metrics=[
DatasetDriftMetric(),
DatasetMissingValuesSummary(),
# Monitoriamo le feature più predittive
ColumnDriftMetric(column_name="recency_days"),
ColumnDriftMetric(column_name="frustration_index"),
ColumnDriftMetric(column_name="activity_trend"),
ColumnDriftMetric(column_name="data_usage_gb_30d"),
])
report.run(
reference_data=self.reference_df,
current_data=current_df
)
report_dict = report.as_dict()
drift_results = report_dict["metrics"][0]["result"]
dataset_drift_detected = drift_results["dataset_drift"]
drift_share = drift_results["drift_share"]
n_drifted_features = drift_results["number_of_drifted_columns"]
# ---- Analisi predizioni (churn rate trend) ----
ref_churn_rate = self.reference_df["churned"].mean() if "churned" in self.reference_df.columns else None
curr_churn_rate = current_df["churn_prediction"].mean() if "churn_prediction" in current_df.columns else None
churn_rate_drift = None
if ref_churn_rate and curr_churn_rate:
churn_rate_drift = abs(curr_churn_rate - ref_churn_rate) / ref_churn_rate
# ---- Decision logic per retraining ----
retraining_required = (
dataset_drift_detected and drift_share > self.drift_threshold
) or (
churn_rate_drift is not None and churn_rate_drift > 0.30
)
results = {
"analysis_date": analysis_date.isoformat(),
"dataset_drift_detected": bool(dataset_drift_detected),
"drift_share": float(drift_share),
"n_drifted_features": int(n_drifted_features),
"drift_threshold": self.drift_threshold,
"reference_churn_rate": float(ref_churn_rate) if ref_churn_rate else None,
"current_churn_rate": float(curr_churn_rate) if curr_churn_rate else None,
"churn_rate_drift_pct": float(churn_rate_drift * 100) if churn_rate_drift else None,
"retraining_required": bool(retraining_required),
"severity": self._compute_severity(drift_share, churn_rate_drift)
}
# Salva report HTML
report_path = f"monitoring/drift_report_{analysis_date.strftime('%Y%m%d')}.html"
Path("monitoring").mkdir(exist_ok=True)
report.save_html(report_path)
logger.info(f"Report drift salvato: {report_path}")
if retraining_required:
self._trigger_retraining_alert(results)
return results
def _compute_severity(self, drift_share: float, churn_rate_drift: Optional[float]) -> str:
if drift_share > 0.5 or (churn_rate_drift and churn_rate_drift > 0.5):
return "critical"
elif drift_share > 0.3 or (churn_rate_drift and churn_rate_drift > 0.3):
return "high"
elif drift_share > self.drift_threshold:
return "medium"
return "low"
def _trigger_retraining_alert(self, results: dict):
"""Notifica il team e trigghera il retraining via GitHub Actions dispatch."""
logger.warning(f"DRIFT CRITICO rilevato - Retraining richiesto! {results}")
if self.alert_webhook:
import requests
payload = {
"event_type": "retrain-trigger",
"client_payload": {
"reason": "drift_detected",
"drift_share": results["drift_share"],
"analysis_date": results["analysis_date"]
}
}
resp = requests.post(
self.alert_webhook,
json=payload,
headers={"Authorization": f"token ${GITHUB_TOKEN}"}
)
logger.info(f"GitHub Actions dispatch: HTTP {resp.status_code}")
Conductă CI/CD: acțiuni GitHub finalizate
Conducta GitHub Actions integrează toate componentele: de la validarea datelor la instruire, de la evaluare la implementare pe Kubernetes. Fluxul de lucru este conceput să eșueze rapid prin blocarea implementărilor dacă valorile nu o fac depășesc pragurile definite.
name: ML Training Pipeline - TelecomIT Churn
on:
schedule:
- cron: '0 3 1 * *' # Ogni 1° del mese alle 03:00 UTC
workflow_dispatch:
inputs:
reason:
description: 'Motivo del retraining manuale'
required: false
default: 'manual'
force_deploy:
description: 'Forza deployment anche se metriche non migliorano'
type: boolean
default: false
repository_dispatch:
types: [retrain-trigger]
env:
PYTHON_VERSION: "3.11"
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
DVC_REMOTE_URL: ${{ secrets.DVC_REMOTE_URL }}
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/churn-serving
jobs:
# ---- JOB 1: Data Validation ----
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC remote
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
- name: Pull latest data
run: dvc pull data/raw/
- name: Run data validation
run: dvc repro data_validation
id: validation
- name: Check validation passed
run: |
RESULT=$(cat data/raw/validation_summary.json | python -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d['all_checks_passed'] else 1)")
echo "Validation result: $RESULT"
- name: Upload validation report
uses: actions/upload-artifact@v4
with:
name: validation-report
path: data/raw/validation_report.html
# ---- JOB 2: Training ----
training:
runs-on: ubuntu-latest
needs: data-validation
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC and pull features
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
dvc pull data/features/
- name: Run preprocessing + feature engineering
run: dvc repro preprocessing feature_engineering
- name: Run training
run: dvc repro training
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
- name: Push artifacts to DVC remote
run: dvc push models/
- name: Output metrics
id: metrics
run: |
AUC=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['auc_roc'])")
RECALL=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['recall'])")
echo "auc_roc=$AUC" >> $GITHUB_OUTPUT
echo "recall=$RECALL" >> $GITHUB_OUTPUT
echo "Training completato - AUC: $AUC, Recall: $RECALL"
outputs:
auc_roc: ${{ steps.metrics.outputs.auc_roc }}
recall: ${{ steps.metrics.outputs.recall }}
# ---- JOB 3: Evaluation Gate ----
evaluation-gate:
runs-on: ubuntu-latest
needs: training
steps:
- name: Check metrics threshold
run: |
AUC=${{ needs.training.outputs.auc_roc }}
RECALL=${{ needs.training.outputs.recall }}
MIN_AUC=0.80
MIN_RECALL=0.65
echo "AUC: $AUC (min: $MIN_AUC)"
echo "Recall: $RECALL (min: $MIN_RECALL)"
python -c "
auc = float('$AUC')
recall = float('$RECALL')
if auc < $MIN_AUC:
print(f'FAIL: AUC {auc:.4f} sotto soglia {$MIN_AUC}')
exit(1)
if recall < $MIN_RECALL:
print(f'FAIL: Recall {recall:.4f} sotto soglia {$MIN_RECALL}')
exit(1)
print('PASS: Metriche sopra soglia - deployment approvato')
"
# ---- JOB 4: Build & Push Docker ----
build-push:
runs-on: ubuntu-latest
needs: evaluation-gate
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
file: docker/Dockerfile.serving
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ---- JOB 5: Deploy to Kubernetes ----
deploy:
runs-on: ubuntu-latest
needs: build-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy to Kubernetes (rolling update)
run: |
kubectl set image deployment/churn-serving \
churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n ml-production
kubectl rollout status deployment/churn-serving -n ml-production --timeout=300s
- name: Run smoke tests
run: |
python tests/smoke/test_api_smoke.py \
--endpoint ${{ secrets.API_ENDPOINT }}
- name: Promote model to champion in MLflow
run: |
python -c "
import mlflow
client = mlflow.MlflowClient('${{ env.MLFLOW_TRACKING_URI }}')
# Recupera l'ultima versione Staging
versions = client.get_latest_versions('telecomit-churn-xgb', stages=['Staging'])
if versions:
client.transition_model_version_stage(
name='telecomit-churn-xgb',
version=versions[0].version,
stage='Production',
archive_existing_versions=True
)
print(f'Modello v{versions[0].version} promosso a Production')
"
Rezultate, valori și rentabilitate a investiției
După 6 luni de funcționare în producție, sistemul de predicție al abandonului TelecomIT a produs rezultate măsurabile și documentate. Următoarele date sunt reprezentative a scenariilor din lumea reală în companiile mijlocii de telecomunicații.
| Metric | Ţintă | Rezultatul M1 | Rezultatul M6 | Tendințe |
|---|---|---|---|---|
| AUC-ROC | ≥ 0,80 | 0,823 | 0,841 | + |
| Rechemare (turn) | ≥ 0,65 | 0,703 | 0,718 | + |
| Precizie (turn) | ≥ 0,60 | 0,634 | 0,672 | + |
| p99 latență în timp real | < 200 ms | 87 ms | 82 ms | + |
| Debitul lotului | 2,1 M în <4 ore | 2.1M în 2h 47m | 2.1M în 2h 31m | + |
| Uptime API | ≥ 99,9% | 99,94% | 99,97% | + |
| Evenimente de alertă de deriva | - | 2 | 1 (M3, remediat) | + |
Costuri reale de infrastructură
| Componentă | Costul lunar | Costul anual | Note |
|---|---|---|---|
| VPS Hetzner (2 nuclee, 4 GB) - de servire | 5,83 EUR | 70 EUR | FastAPI + k3s |
| VPS Hetzner (4 nuclee, 8 GB) - MLflow + MinIO | 15,90 EUR | 191 EUR | Urmărire + stocare |
| VPS Hetzner (2 nuclee, 4GB) - monitorizare | 5,83 EUR | 70 EUR | Prometeu + Grafana |
| Acțiuni GitHub (nivel gratuit) | 0 EUR | 0 EUR | 2000 min/luna gratuit |
| Spațiu de stocare de rezervă (Obiect Hetzner) | 2,50 EUR | 30 EUR | Stocare la distanță DVC |
| Total | 30,06 EUR | 361 EUR | Cu mult sub 5K EUR/an |
ROI al proiectului MLOps
Cost total primul an (dezvoltare + infrastructură): ~45.000 EUR (7 luni dezvoltator + infrastructură). Economii nete anuale estimate: 7,2 milioane EUR. Rentabilitatea investiției pe 12 luni: ~158x. Perioada de rambursare a fost de 3 săptămâni.
Lecții învățate și anti-modele de evitat
Anti-Pattern 1: „Să modelăm mai întâi, MLOps mai târziu”
60% dintre echipe încep cu șablonul și adaugă infrastructura doar atunci când intră în producție. Această abordare dublează costurile de refactorizare. Solutia: configurați DVC, MLflow și o conductă CI/CD minimă din ziua 1, chiar și cu substituenți. Costul marginal și scăzut, avantajul este enorm.
Anti-Pattern 2: numai monitorizarea infrastructurii
Monitorizarea doar a latenței și a timpului de funcționare nu este suficientă pentru un sistem ML. Un model poate fi „sănătos” din punct de vedere tehnic (răspunde în 50 ms, fără erori HTTP 500), dar produc predicții degradate din cauza derivării datelor. Solutia: monitorizați întotdeauna distribuția a intrărilor și ieșirilor modelului, nu doar a valorilor sistemului.
Anti-Pattern 3: Recalificare oarbă
Nu toate recalificarea îmbunătățesc modelul. Dacă declanșatorul este prost calibrat, riști ajunge cu un model mai prost în producție. Solutia: fiecare recalificarea trebuie să treacă de o poartă de evaluare cu o comparație explicită între contestator și campion înainte de desfășurare. Derularea automată trebuie să fie întotdeauna disponibilă.
Lista finală de verificare înainte de implementare
## Checklist Deploy Modello ML - TelecomIT Churn
### qualità del Modello
- [ ] AUC-ROC test set >= 0.80
- [ ] Recall test set >= 0.65
- [ ] Performance >= champion model attuale
- [ ] Nessuna feature con importanza anomala (segnale data leakage)
- [ ] Test su holdout temporale (dati fuori dal training period)
### Governance
- [ ] Model card generata e approvata da ML Lead
- [ ] Fairness check eseguito (demographic parity < 0.10)
- [ ] SHAP analysis disponibile per top-10 feature
- [ ] Audit trail registrato in MLflow con annotazioni
### Infrastruttura
- [ ] Dockerfile building senza warning di sicurezza
- [ ] Health check /health risponde 200
- [ ] Smoke test /predict con payload reale
- [ ] Latenza p99 < 200ms verificata con k6 o locust
- [ ] Rollback plan documentato e testato
### Monitoring
- [ ] Dashboard Grafana aggiornata con versione modello
- [ ] Alert Prometheus configurati per nuova versione
- [ ] Reference dataset Evidently aggiornato
- [ ] On-call aggiornato sul deploy
### Documenti
- [ ] JIRA ticket di deployment chiuso con evidence
- [ ] Post-deploy review schedulata a T+7 giorni
Concluzii: MLOps nu este un proiect, este o practică
În acest studiu de caz am construit un sistem MLOps complet pentru predicția abandonului, pornind de la problema afacerii până la implementarea pe Kubernetes cu monitorizare, detectarea și guvernarea derivei. Sistemul îndeplinește toate cerințele: latență <200ms în timp real, 2,1 milioane de clienți în mai puțin de 3 ore în lot, AUC-ROC 0,84, costă infrastructură de 361 EUR/an.
Mesajul cheie al seriei este următorul: MLOps nu este o tehnologie, și o disciplină. Nu există un instrument care, odată instalat, să rezolve totul probleme. Și setul de practici, procese și cultură care permit modelele ML pentru a funcționa fiabil în timp. Codul pe care l-am scris în asta seria este punctul de plecare: adevărata muncă este în observare, măsurare, învățare și să se îmbunătățească continuu.
Resurse pentru a continua
- Această serie completă: MLOps: de la experiment la producție → Conducta CI/CD → Versiune DVC → MLflow → Detectarea derivei → Servire FastAPI → Kubernetes → Testare A/B → Guvernare
- Învățare profundă avansată: Seria Advanced Deep Learning - LoRA, cuantizare, edge AI
- Inginerie AI: Seria de inginerie AI - RAG, bază de date vectorială, LangChain
- Arhivele GitHub: Codul complet pentru acest studiu de caz este disponibil la github.com/federicocalo/telecomit-churn-mlops (structură, nu date reale)
Piața MLOps va crește de la 4,38 miliarde USD în 2026 la 89,18 miliarde USD în 2035 (CAGR 39,8%). Abilitățile pe care le-ai dobândit în această serie te pun într-o poziție excelentă pentru a aborda această creștere. MLOps fericit.







