Studium przypadku: MLOps w produkcji – od zera do kompletnego rurociągu
Omówiliśmy całą serię MLOps: od wprowadzenia, przez podstawowe koncepcje, po potoki CI/CD GitHub Actions i Docker, wersjonowanie za pomocą DVC, śledzenie eksperymentów za pomocą MLflow, wykrywanie dryfu, do obsługi FastAPI, skalowania na Kubernetesie, testów A/B i zarządzania za pomocą AI Act. Nadszedł czas, aby połączyć to wszystko w prawdziwą, konkretną sprawę.
W tym studium przypadku zbudujemy system od podstaw przewidywanie rezygnacji dla fikcyjna firma telekomunikacyjna (TelecomIT S.p.A.), która przestrzega wszystkich zasad MLOps nauczyłem się w serialu. Zaczniemy od problemu biznesowego, zaprojektujemy kompleksową architekturę, zaimplementujemy każdy komponent z działającym kodem Pythona i zobaczymy, jak monitorować i utrzymanie systemu w fazie produkcyjnej przez dłuższy czas. Efektem końcowym będzie potok MLOps kompletne, odtwarzalne i gotowe do użycia w rzeczywistych środowiskach.
Czego się nauczysz
- Jak przełożyć problem biznesowy na kompleksową architekturę MLOps
- Specyficzna inżynieria funkcji do przewidywania rezygnacji (RFM, zachowanie tymczasowe)
- Kompletny potok DVC: dane, przetwarzanie wstępne, szkolenie, ocena
- Ustrukturyzowane śledzenie eksperymentów za pomocą MLflow i systematyczne porównywanie modeli
- Model udostępniany za pomocą FastAPI + Uvicorn skonteneryzowany i gotowy na Kubernetes
- Monitoring w czasie rzeczywistym za pomocą Prometheus + Grafana: dryft, opóźnienia, biznesowe KPI
- Statystycznie rygorystyczne testy A/B pomiędzy modelami pretendentów i mistrzów
- Pełne zarządzanie: karta modelu, ścieżka audytu, kontrola rzetelności przed wdrożeniem
- Oszacowanie rzeczywistych kosztów i ROI projektu MLOps dla włoskiego MŚP
Problem biznesowy: rezygnacja z usług telekomunikacyjnych
TelecomIT S.p.A. i operator telekomunikacyjny z 2,1 milionami aktywnych klientów. Miesięczny wskaźnik odejść klientów wynosi 2,3%, co oznacza utratę około 48 000 klientów miesięcznie. Średni koszt pozyskania nowego klienta to 180 EUR, natomiast koszt utrzymanie istniejącego klienta i 35 EUR. Zidentyfikuj klientów narażonych na ryzyko odejścia Zanim opuszczenie UE, jest jedną z najpotężniejszych dostępnych dźwigni gospodarczych.
Wartość ekonomiczna projektu
Z modelem, który prawidłowo identyfikuje 70% osób, które odchodzą z grupy ryzyka (przypomnij sobie = 0,70) i który ma precyzję 65% (35% wyników fałszywie pozytywnych, klienci kontaktowali się niepotrzebnie), w przypadku 48 000 osób, które co miesiąc odchodzą:
- Zidentyfikowani ubijacze: 48 000 × 0,70 = 33 600
- Koszt kampanii retencyjnej: 33 600 / 0,65 × 35 EUR = ~1,8 mln EUR/miesiąc
- Uniknięte oszczędności związane z przejęciem: 33 600 × 0,40 × 180 EUR = ~2,4 mln EUR/miesiąc
- Szacunkowy zwrot z inwestycji netto: +600 tys. EUR/miesiąc lub 7,2 mln EUR/rok
Koszt infrastruktury MLOps dla tego rurociągu: około 3500 EUR/rok w chmurze.
Wymagania techniczne i biznesowe
Przed napisaniem kodu definiujemy konkretne wymagania, które będą kierować każdym wyborem architektonicznym. Wymagania te wynikają z rozmów z zespołem biznesowym i kontekstu technicznego istniejące.
| Wymóg | Specyfikacja | Ograniczenie |
|---|---|---|
| Częstotliwość przewidywania | Miesięczna porcja, scoring dla wszystkich aktywnych klientów | Do pierwszego dnia miesiąca w przypadku kampanii CRM |
| Opóźnienie punktacji wsadowej | 2,1 miliona klientów w niecałe 4 godziny | Godziny otwarcia CRM: 02:00-06:00 |
| Opóźnienie w czasie rzeczywistym | Punktacja pojedynczego klienta dla agentów call center | < 200 ms p99 |
| Metryka docelowa | AUC-ROC ≥ 0,80, Przypomnienie ≥ 0,65 | Zatwierdzono w ramach miesięcznego zestawu wstrzymań |
| Przekwalifikowanie | Co miesiąc, automatycznie w przypadku nowych danych | Wyzwalaj także w przypadku wykrycia dryfu |
| Zarządzanie | Karta modelu, ścieżka audytu, kontrola rzetelności | Ustawa o AI: ograniczone ryzyko (sektor telekomunikacyjny) |
| Półki na książki | W 100% open source, lokalnie + chmura hybrydowa | Budżet na infrastrukturę: < 5 000 EUR/rok |
Architektura kompleksowa
Architektura systemu integruje wszystkie narzędzia serii MLOps w spójny przepływ. Każdy komponent ma określoną rolę i komunikuje się z innymi poprzez dobrze zdefiniowane interfejsy. Naczelną zasadą jest rozdzielenie obowiązków: dane, szkolenie i udostępnianie na żywo w niezależnych warstwach, które można aktualizować bez wpływu na inne.
+------------------+ +------------------+ +-------------------+
| DATA SOURCES | | DATA PIPELINE | | TRAINING LAYER |
| | | | | |
| - CRM Database |-->| - DVC Pipeline |-->| - MLflow Tracking |
| - CDR (Call Data)| | - Great Expects | | - XGBoost/RF/LGB |
| - Billing System | | - Feature Store | | - Hyperopt Tuning |
| - App Usage Logs | | - DVC Remote | | - Model Registry |
+------------------+ +------------------+ +-------------------+
|
(model promoted)
|
+------------------+ +------------------+ +-------------------+
| MONITORING | | SERVING | | CI/CD PIPELINE |
| | | | | |
| - Prometheus |<--| - FastAPI REST |<--| - GitHub Actions |
| - Grafana Dash | | - Batch Scorer | | - Docker Build |
| - Drift Alerts | | - Kubernetes | | - Auto-tests |
| - Retrain Trig. | | - Load Balancer | | - Auto-deploy |
+------------------+ +------------------+ +-------------------+
|
+------+-------+
| GOVERNANCE |
| |
| - Model Card |
| - Audit Log |
| - Fairness |
+--------------+
Kompletny stos technologii
| Warstwy | Narzędzia | Wersja | Koszt |
|---|---|---|---|
| Wersjonowanie danych | DVC + MinIO (kompatybilny z S3) | DVC 3.x | Otwarte źródło |
| Śledzenie eksperymentu | MLflow na własnym serwerze | MLflow 2.x | Otwarte źródło |
| Walidacja danych | Wielkie oczekiwania | GX 0,18 | Otwarte źródło |
| CI/CD | Akcje GitHuba | - | Darmowe poziomy |
| Konteneryzacja | Doker + Tworzenie Dokera | Doker 24.x | Otwarte źródło |
| Porcja modelowa | FastAPI + Uvicorn | FastAPI 0.110 | Otwarte źródło |
| Orkiestracja | Kubernetes (k3s) | k3s 1.28 | Otwarte źródło |
| Monitorowanie | Prometeusz + Grafana | Studniówka 2.47 | Otwarte źródło |
| Wykrywanie dryfu | Ewidentnie AI | 0.4.x | Otwarte źródło |
| Zarządzanie/uczciwość | Fairlearn + SHAP | FL 0,10 | Otwarte źródło |
| Ramy ML | XGBoost + LightGBM + nauka scikit | XGB 2.0 | Otwarte źródło |
| Infrastruktura chmurowa | Chmura Hetzner (VPS 2 rdzenie / 4 GB RAM) | - | ~360 EUR/rok |
Struktura i konfiguracja repozytorium
Dobrze zorganizowana struktura repozytorium jest podstawą każdego projektu MLOps łatwe w utrzymaniu. Poniższa struktura wyraźnie oddziela kod, konfigurację, testowanie i dokumentacji, kierując się zasadami wysokiej spójności i niskiego sprzężenia.
telecomit-churn/
├── .github/
│ └── workflows/
│ ├── ml-pipeline.yml # Pipeline CI/CD principale
│ ├── retrain-trigger.yml # Trigger retraining automatico
│ └── pr-validation.yml # Validazione PR
├── config/
│ ├── model_config.yaml # Iperparametri e configurazione modello
│ ├── feature_config.yaml # Feature set e preprocessing
│ └── serving_config.yaml # Configurazione FastAPI
├── data/
│ ├── raw/ # Dati grezzi (tracciati da DVC)
│ ├── processed/ # Dati preprocessati (tracciati da DVC)
│ └── features/ # Feature engineered (tracciati da DVC)
├── models/
│ └── registry/ # Modelli registrati (tracciati da DVC)
├── src/
│ ├── data/
│ │ ├── ingestion.py # Estrazione dati da CRM/CDR
│ │ ├── validation.py # Great Expectations checks
│ │ └── preprocessing.py # Pulizia e trasformazione
│ ├── features/
│ │ ├── rfm_features.py # Feature RFM (Recency, Frequency, Monetary)
│ │ ├── behavioral.py # Feature comportamentali
│ │ └── feature_store.py # Feature store locale
│ ├── training/
│ │ ├── train.py # Training loop con MLflow
│ │ ├── evaluate.py # Evaluation e confronto col production model
│ │ └── hyperopt_search.py # Tuning iperparametri
│ ├── serving/
│ │ ├── api.py # FastAPI app
│ │ ├── batch_scorer.py # Scoring batch mensile
│ │ └── middleware.py # Logging, rate limiting, metrics
│ ├── monitoring/
│ │ ├── drift_detector.py # Evidently drift detection
│ │ ├── metrics_exporter.py # Prometheus metrics
│ │ └── alert_manager.py # Alerting e trigger retraining
│ └── governance/
│ ├── model_card.py # Generatore model card
│ ├── fairness_checker.py # Analisi fairness con Fairlearn
│ └── audit_logger.py # Audit trail immodificabile
├── tests/
│ ├── unit/ # Test unitari per ogni modulo
│ ├── integration/ # Test integrazione pipeline
│ └── smoke/ # Smoke test API post-deploy
├── dvc.yaml # Pipeline DVC stages
├── params.yaml # Parametri DVC
├── docker/
│ ├── Dockerfile.training # Immagine training
│ ├── Dockerfile.serving # Immagine serving
│ └── docker-compose.yml # Stack locale completo
├── k8s/
│ ├── deployment.yaml # Kubernetes deployment
│ ├── service.yaml # Kubernetes service
│ └── hpa.yaml # Horizontal Pod Autoscaler
└── notebooks/
└── exploration/ # Notebook EDA (non in pipeline)
Inżynieria funkcji do przewidywania rezygnacji
Inżynieria funkcji często stanowi różnicę między modelem przeciętnym a doskonałym. Najbardziej predykcyjne funkcje pochodzą z prognozy odejść w sektorze telekomunikacyjnym zachowanie klienta w czasie. Korzystamy z frameworka RFM (Od niedawna, Częstotliwość, Pieniądze) jako punkt wyjścia, wzbogacony o funkcje cechy behawioralne specyficzne dla domeny telekomunikacyjnej.
# rfm_features.py
# Feature engineering per churn prediction nel settore telecom
# Produce un dataset tabellare con 45 feature predittive
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Tuple
import mlflow
def compute_rfm_features(
df_transactions: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Calcola le feature RFM (Recency, Frequency, Monetary) per ogni cliente.
Args:
df_transactions: DataFrame con transazioni/ricariche
snapshot_date: Data di riferimento per il calcolo
customer_id_col: Nome colonna identificativo cliente
Returns:
DataFrame con una riga per cliente e le feature RFM
"""
df = df_transactions.copy()
df["transaction_date"] = pd.to_datetime(df["transaction_date"])
rfm = df.groupby(customer_id_col).agg(
recency_days=("transaction_date", lambda x: (snapshot_date - x.max()).days),
frequency=("transaction_date", "count"),
monetary_total=("amount", "sum"),
monetary_avg=("amount", "mean"),
monetary_std=("amount", "std"),
first_transaction=("transaction_date", "min"),
last_transaction=("transaction_date", "max"),
).reset_index()
# Tenure in giorni (durata rapporto col cliente)
rfm["tenure_days"] = (snapshot_date - rfm["first_transaction"]).dt.days
# Variazione dell'attivita (ultimi 30 gg vs media storica)
last_30_days = snapshot_date - timedelta(days=30)
recent = df[df["transaction_date"] >= last_30_days].groupby(customer_id_col).agg(
recent_frequency=("transaction_date", "count"),
recent_monetary=("amount", "sum")
).reset_index()
rfm = rfm.merge(recent, on=customer_id_col, how="left")
rfm["recent_frequency"] = rfm["recent_frequency"].fillna(0)
rfm["recent_monetary"] = rfm["recent_monetary"].fillna(0)
# Trend: rapporto attivita recente vs attivita media mensile
avg_monthly_freq = rfm["frequency"] / (rfm["tenure_days"] / 30).clip(lower=1)
rfm["activity_trend"] = rfm["recent_frequency"] / avg_monthly_freq.clip(lower=0.01)
return rfm
def compute_behavioral_features(
df_cdr: pd.DataFrame,
df_support: pd.DataFrame,
snapshot_date: datetime,
customer_id_col: str = "customer_id"
) -> pd.DataFrame:
"""
Feature comportamentali da CDR (call detail records) e ticket di supporto.
Queste feature catturano segnali predittivi di churn specifici del dominio telecom.
"""
# ---- Feature CDR: comportamento nelle chiamate ----
cdr_30d = df_cdr[
df_cdr["call_date"] >= snapshot_date - timedelta(days=30)
]
cdr_features = cdr_30d.groupby(customer_id_col).agg(
calls_count_30d=("call_id", "count"),
calls_duration_avg_30d=("duration_seconds", "mean"),
calls_duration_total_30d=("duration_seconds", "sum"),
data_usage_gb_30d=("data_usage_mb", lambda x: x.sum() / 1024),
international_calls_ratio=("is_international", "mean"),
peak_hour_ratio=("is_peak_hour", "mean"),
unique_contacts_30d=("called_number_hash", "nunique"),
).reset_index()
# ---- Feature supporto: interazioni negative ----
support_30d = df_support[
df_support["ticket_date"] >= snapshot_date - timedelta(days=90)
]
support_features = support_30d.groupby(customer_id_col).agg(
support_tickets_90d=("ticket_id", "count"),
support_escalations_90d=("is_escalated", "sum"),
avg_resolution_time_h=("resolution_hours", "mean"),
complaint_ratio=("is_complaint", "mean"),
unresolved_tickets=("is_resolved", lambda x: (~x).sum()),
).reset_index()
behavioral = cdr_features.merge(support_features, on=customer_id_col, how="left")
behavioral = behavioral.fillna(0)
# Feature derivata: "frustration index" - proxy di insoddisfazione
behavioral["frustration_index"] = (
behavioral["support_tickets_90d"] * 2 +
behavioral["support_escalations_90d"] * 5 +
behavioral["unresolved_tickets"] * 3
).clip(upper=20) / 20 # Normalizzato 0-1
return behavioral
def build_feature_matrix(
df_customers: pd.DataFrame,
df_rfm: pd.DataFrame,
df_behavioral: pd.DataFrame,
label_col: str = "churned"
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Assembla la matrice delle feature finale unendo tutte le sorgenti.
Returns:
(X, y) - feature matrix e label vector
"""
features = df_customers[[
"customer_id", "contract_type", "payment_method",
"age_group", "region", "plan_monthly_eur", label_col
]].merge(df_rfm, on="customer_id", how="left") \
.merge(df_behavioral, on="customer_id", how="left")
# Encoding categoriche
cat_cols = ["contract_type", "payment_method", "age_group", "region"]
features = pd.get_dummies(features, columns=cat_cols, drop_first=True)
y = features[label_col].astype(int)
X = features.drop(columns=["customer_id", label_col,
"first_transaction", "last_transaction"])
return X, y
Rurociąg DVC: powtarzalność od końca do końca
Potok DVC definiuje każdy etap procesu jako skierowany graf acykliczny (DAG): każdy stopień deklaruje własne wejścia (deps), wyjścia (wyjścia) i parametry. DVC śledzi zależności i automatycznie uruchamia dokładnie tylko te etapy, które zostały unieważnione przez zmiany jak Make, ale dla potoków ML z danymi i modelami.
stages:
# Stage 1: Ingestion - estrae dati da CRM/CDR e salva snapshot
data_ingestion:
cmd: python src/data/ingestion.py
deps:
- src/data/ingestion.py
- config/feature_config.yaml
params:
- snapshot_date
- data.source_db_table
outs:
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
metrics:
- data/raw/ingestion_report.json:
cache: false
# Stage 2: Validation - Great Expectations checks sui dati grezzi
data_validation:
cmd: python src/data/validation.py
deps:
- src/data/validation.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
params:
- validation.min_rows
- validation.max_null_ratio
outs:
- data/raw/validation_report.html:
cache: false
metrics:
- data/raw/validation_summary.json:
cache: false
# Stage 3: Preprocessing - pulizia e trasformazione
preprocessing:
cmd: python src/data/preprocessing.py
deps:
- src/data/preprocessing.py
- data/raw/customers.parquet
- data/raw/transactions.parquet
- data/raw/cdr.parquet
- data/raw/support_tickets.parquet
- config/feature_config.yaml
params:
- preprocessing.imputation_strategy
- preprocessing.outlier_threshold
outs:
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
# Stage 4: Feature Engineering - RFM + behavioral
feature_engineering:
cmd: python src/features/rfm_features.py
deps:
- src/features/rfm_features.py
- src/features/behavioral.py
- data/processed/customers_clean.parquet
- data/processed/transactions_clean.parquet
params:
- features.lookback_days
- features.rfm_bins
outs:
- data/features/feature_matrix.parquet
- data/features/feature_names.json
# Stage 5: Training - XGBoost con MLflow tracking
training:
cmd: python src/training/train.py
deps:
- src/training/train.py
- data/features/feature_matrix.parquet
- config/model_config.yaml
params:
- model.algorithm
- model.n_estimators
- model.max_depth
- model.learning_rate
- model.subsample
- model.scale_pos_weight
- training.test_size
- training.random_seed
outs:
- models/registry/challenger_model.pkl
- models/registry/preprocessor.pkl
metrics:
- models/registry/metrics.json:
cache: false
plots:
- models/registry/confusion_matrix.json:
cache: false
- models/registry/roc_curve.json:
cache: false
# Stage 6: Evaluation - confronto challenger vs champion
evaluation:
cmd: python src/training/evaluate.py
deps:
- src/training/evaluate.py
- models/registry/challenger_model.pkl
- data/features/feature_matrix.parquet
params:
- evaluation.min_auc_roc
- evaluation.min_recall
- evaluation.promotion_strategy
outs:
- models/registry/evaluation_report.json:
cache: false
- models/registry/model_card.json:
cache: false
Szkolenie z MLflow: pełne śledzenie eksperymentów
Moduł szkoleniowy integruje MLflow w celu systematycznego śledzenia każdego eksperymentu. Używamy XGBoost jako główny algorytm ze względu na doskonałą równowagę pomiędzy wydajnością a interpretowalnością danych tabelarycznych, z mechanizmem wczesnego zatrzymywania i wyszukiwanie hiperparametrów za pomocą Hyperopt.
# train.py
# Training pipeline per churn prediction con MLflow tracking
# Eseguito dalla pipeline DVC: python src/training/train.py
import os
import json
import pickle
import logging
from pathlib import Path
from typing import Dict, Tuple
import mlflow
import mlflow.xgboost
import pandas as pd
import numpy as np
import xgboost as xgb
import yaml
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
roc_auc_score, f1_score, precision_score,
recall_score, accuracy_score, confusion_matrix
)
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(config_path: str = "config/model_config.yaml") -> dict:
"""Carica configurazione modello da file YAML."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
def load_features(features_path: str = "data/features/feature_matrix.parquet") -> Tuple:
"""Carica feature matrix e prepara train/val/test split."""
df = pd.read_parquet(features_path)
# Leggi i parametri DVC
with open("params.yaml") as f:
params = yaml.safe_load(f)
test_size = params["training"]["test_size"]
seed = params["training"]["random_seed"]
label_col = "churned"
X = df.drop(columns=[label_col])
y = df[label_col].astype(int)
# Stratified split per preservare il bilanciamento delle classi
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=seed, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_train, y_train, test_size=0.15, random_state=seed, stratify=y_train
)
logger.info(f"Train: {len(X_train)} | Val: {len(X_val)} | Test: {len(X_test)}")
logger.info(f"Churn rate - Train: {y_train.mean():.3f} | Test: {y_test.mean():.3f}")
return X_train, X_val, X_test, y_train, y_val, y_test
def train_xgboost(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
params: dict
) -> xgb.XGBClassifier:
"""Addestra un modello XGBoost con early stopping."""
model = xgb.XGBClassifier(
n_estimators=params.get("n_estimators", 500),
max_depth=params.get("max_depth", 6),
learning_rate=params.get("learning_rate", 0.05),
subsample=params.get("subsample", 0.8),
colsample_bytree=params.get("colsample_bytree", 0.8),
scale_pos_weight=params.get("scale_pos_weight", 4.0), # Bilancia classi sbilanciate
min_child_weight=params.get("min_child_weight", 5),
reg_alpha=params.get("reg_alpha", 0.1),
reg_lambda=params.get("reg_lambda", 1.0),
use_label_encoder=False,
eval_metric="auc",
random_state=42,
n_jobs=-1
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=100
)
return model
def compute_metrics(
model: xgb.XGBClassifier,
X: pd.DataFrame,
y: pd.Series,
threshold: float = 0.5
) -> Dict[str, float]:
"""Calcola metriche complete di valutazione."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = (y_proba >= threshold).astype(int)
cm = confusion_matrix(y, y_pred)
return {
"auc_roc": roc_auc_score(y, y_proba),
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred, zero_division=0),
"recall": recall_score(y, y_pred, zero_division=0),
"f1": f1_score(y, y_pred, zero_division=0),
"tn": int(cm[0][0]),
"fp": int(cm[0][1]),
"fn": int(cm[1][0]),
"tp": int(cm[1][1]),
}
def hyperopt_search(
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
max_evals: int = 30
) -> dict:
"""Ricerca iperparametri con Hyperopt (Tree-structured Parzen Estimator)."""
space = {
"max_depth": hp.choice("max_depth", [4, 5, 6, 7, 8]),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.2)),
"n_estimators": hp.choice("n_estimators", [200, 300, 500, 700]),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
"scale_pos_weight": hp.uniform("scale_pos_weight", 2.0, 8.0),
"min_child_weight": hp.choice("min_child_weight", [3, 5, 7, 10]),
"reg_alpha": hp.loguniform("reg_alpha", np.log(0.01), np.log(1.0)),
}
def objective(params):
model = train_xgboost(X_train, y_train, X_val, y_val, params)
metrics = compute_metrics(model, X_val, y_val)
# Ottimizziamo su F1 per bilanciare precision e recall
return {"loss": -metrics["f1"], "status": STATUS_OK}
trials = Trials()
best = fmin(objective, space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best
def run_training_pipeline():
"""Pipeline di training principale con MLflow tracking completo."""
config = load_config()
X_train, X_val, X_test, y_train, y_val, y_test = load_features()
# Configura MLflow
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5000"))
mlflow.set_experiment("telecomit-churn-prediction")
with mlflow.start_run(run_name=f"xgb-training-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}") as run:
# Log parametri di sistema
mlflow.log_params({
"algorithm": "XGBoostClassifier",
"train_size": len(X_train),
"val_size": len(X_val),
"test_size": len(X_test),
"churn_rate_train": float(y_train.mean()),
"feature_count": X_train.shape[1],
"hyperopt_evals": config.get("hyperopt_evals", 30),
})
# Ricerca iperparametri
logger.info("Avvio ricerca iperparametri con Hyperopt...")
best_params = hyperopt_search(X_train, y_train, X_val, y_val,
max_evals=config.get("hyperopt_evals", 30))
mlflow.log_params(best_params)
# Training finale con best params
logger.info("Training modello finale con migliori iperparametri...")
model = train_xgboost(X_train, y_train, X_val, y_val, best_params)
# Calcola e log metriche su tutti i set
val_metrics = compute_metrics(model, X_val, y_val)
test_metrics = compute_metrics(model, X_test, y_test)
mlflow.log_metrics({f"val_{k}": v for k, v in val_metrics.items()})
mlflow.log_metrics({f"test_{k}": v for k, v in test_metrics.items()})
logger.info(f"Test AUC-ROC: {test_metrics['auc_roc']:.4f}")
logger.info(f"Test Recall: {test_metrics['recall']:.4f}")
logger.info(f"Test F1: {test_metrics['f1']:.4f}")
# Log modello in MLflow model registry
mlflow.xgboost.log_model(
model,
artifact_path="model",
registered_model_name="telecomit-churn-xgb"
)
# Salva metriche per DVC
metrics_output = {
"auc_roc": test_metrics["auc_roc"],
"recall": test_metrics["recall"],
"precision": test_metrics["precision"],
"f1": test_metrics["f1"],
"run_id": run.info.run_id,
}
Path("models/registry").mkdir(parents=True, exist_ok=True)
with open("models/registry/metrics.json", "w") as f:
json.dump(metrics_output, f, indent=2)
# Salva modello per serving
with open("models/registry/challenger_model.pkl", "wb") as f:
pickle.dump(model, f)
logger.info(f"Training completato. MLflow run ID: {run.info.run_id}")
if __name__ == "__main__":
run_training_pipeline()
Udostępnianie modelu: FastAPI w czasie rzeczywistym i wsadowo
System serwowania obsługuje dwa tryby: w czasie rzeczywistym dla zapytań agentów call center (pojedynczy klient, <200ms) e seria dla miesięczny scoring całej bazy klientów (2,1M w <4 godziny). Obydwa tryby używają tego samego modelu i logiki przetwarzania wstępnego, aby zapewnić spójność.
# api.py
# FastAPI serving per churn prediction - real-time e batch
# Espone /predict (singolo), /predict/batch, /health, /metrics
import os
import pickle
import time
import logging
from contextlib import asynccontextmanager
from typing import List, Optional
from pathlib import Path
import mlflow
import pandas as pd
import numpy as np
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from prometheus_client import (
Counter, Histogram, Gauge,
generate_latest, CONTENT_TYPE_LATEST
)
from starlette.responses import Response
import uvicorn
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Prometheus Metrics ----
PREDICTIONS_TOTAL = Counter(
"churn_predictions_total",
"Numero totale di predizioni",
["mode", "model_version"]
)
PREDICTION_DURATION = Histogram(
"churn_prediction_duration_seconds",
"Durata predizione in secondi",
["mode"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1.0, 5.0]
)
CHURN_PROBABILITY_GAUGE = Gauge(
"churn_prediction_avg_probability",
"Probabilità media di churn nelle ultime predizioni"
)
MODEL_VERSION_INFO = Gauge(
"model_version_info",
"Informazioni sulla versione del modello caricato",
["version", "run_id"]
)
# ---- Schema Pydantic ----
class CustomerFeatures(BaseModel):
"""Feature di un singolo cliente per la predizione real-time."""
customer_id: str = Field(..., description="ID univoco del cliente")
recency_days: float = Field(..., ge=0, le=3650, description="Giorni dall'ultima transazione")
frequency: int = Field(..., ge=0, description="Numero di transazioni storiche")
monetary_total: float = Field(..., ge=0, description="Valore monetario totale EUR")
monetary_avg: float = Field(..., ge=0)
tenure_days: int = Field(..., ge=0)
activity_trend: float = Field(..., ge=0)
calls_count_30d: int = Field(..., ge=0)
data_usage_gb_30d: float = Field(..., ge=0)
support_tickets_90d: int = Field(..., ge=0)
frustration_index: float = Field(..., ge=0.0, le=1.0)
plan_monthly_eur: float = Field(..., ge=0)
contract_type_monthly: bool = Field(False)
contract_type_annual: bool = Field(False)
payment_method_auto: bool = Field(False)
age_group_18_35: bool = Field(False)
age_group_36_55: bool = Field(False)
@validator("monetary_total")
def monetary_must_be_reasonable(cls, v):
if v > 1_000_000:
raise ValueError("Valore monetario non plausibile (> 1M EUR)")
return v
class PredictionResponse(BaseModel):
customer_id: str
churn_probability: float
churn_prediction: bool
confidence: str # "high", "medium", "low"
model_version: str
class BatchPredictionRequest(BaseModel):
customers: List[CustomerFeatures]
threshold: Optional[float] = Field(0.5, ge=0.0, le=1.0)
# ---- Global Model State ----
model_state: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
logger.info("Caricamento modello da MLflow registry...")
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000"))
model_name = os.getenv("MODEL_NAME", "telecomit-churn-xgb")
model_alias = os.getenv("MODEL_ALIAS", "champion")
try:
model_uri = f"models:/{model_name}@{model_alias}"
model_state["model"] = mlflow.xgboost.load_model(model_uri)
model_state["version"] = os.getenv("MODEL_VERSION", "unknown")
model_state["run_id"] = os.getenv("MLFLOW_RUN_ID", "unknown")
MODEL_VERSION_INFO.labels(
version=model_state["version"],
run_id=model_state["run_id"]
).set(1)
logger.info(f"Modello caricato: {model_name} @ {model_alias} v{model_state['version']}")
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise
yield # Server in esecuzione
logger.info("Shutdown: deallocazione modello...")
model_state.clear()
# ---- FastAPI App ----
app = FastAPI(
title="TelecomIT Churn Prediction API",
description="API per predizione churn clienti - TelecomIT S.p.A.",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://crm.telecomit.internal"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
def features_to_dataframe(customer: CustomerFeatures) -> pd.DataFrame:
"""Converte un oggetto CustomerFeatures in DataFrame per il modello."""
data = customer.dict(exclude={"customer_id"})
return pd.DataFrame([data])
def get_confidence(probability: float) -> str:
"""Classifica la confidenza della predizione."""
if probability >= 0.75 or probability <= 0.25:
return "high"
elif probability >= 0.60 or probability <= 0.40:
return "medium"
return "low"
@app.get("/health")
async def health_check():
"""Health check per Kubernetes liveness/readiness probe."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non caricato")
return {
"status": "healthy",
"model_version": model_state.get("version", "unknown"),
"model_loaded": True
}
@app.get("/metrics")
async def metrics():
"""Endpoint Prometheus metrics."""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(customer: CustomerFeatures):
"""Predizione real-time per singolo cliente (<200ms p99)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
try:
X = features_to_dataframe(customer)
proba = float(model_state["model"].predict_proba(X)[0, 1])
prediction = proba >= 0.5
PREDICTIONS_TOTAL.labels(mode="realtime", model_version=model_state["version"]).inc()
CHURN_PROBABILITY_GAUGE.set(proba)
return PredictionResponse(
customer_id=customer.customer_id,
churn_probability=round(proba, 4),
churn_prediction=bool(prediction),
confidence=get_confidence(proba),
model_version=model_state.get("version", "unknown")
)
except Exception as e:
logger.error(f"Errore predizione per customer {customer.customer_id}: {e}")
raise HTTPException(status_code=500, detail=f"Errore interno: {str(e)}")
finally:
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="realtime").observe(duration)
@app.post("/predict/batch")
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch - ottimizzata per alti volumi (2.1M clienti/notte)."""
if "model" not in model_state:
raise HTTPException(status_code=503, detail="Modello non disponibile")
start_time = time.time()
n_customers = len(request.customers)
try:
# Converti in DataFrame vettorizzato (molto più efficiente del loop)
rows = [c.dict(exclude={"customer_id"}) for c in request.customers]
customer_ids = [c.customer_id for c in request.customers]
X = pd.DataFrame(rows)
probas = model_state["model"].predict_proba(X)[:, 1]
predictions = (probas >= request.threshold).astype(bool)
results = [
{
"customer_id": cid,
"churn_probability": round(float(p), 4),
"churn_prediction": bool(pred),
"confidence": get_confidence(float(p))
}
for cid, p, pred in zip(customer_ids, probas, predictions)
]
PREDICTIONS_TOTAL.labels(mode="batch", model_version=model_state["version"]).inc(n_customers)
CHURN_PROBABILITY_GAUGE.set(float(probas.mean()))
duration = time.time() - start_time
PREDICTION_DURATION.labels(mode="batch").observe(duration)
return {
"results": results,
"total_customers": n_customers,
"churn_count": int(predictions.sum()),
"churn_rate": round(float(predictions.mean()), 4),
"processing_time_seconds": round(duration, 3),
"model_version": model_state.get("version", "unknown")
}
except Exception as e:
logger.error(f"Errore batch prediction: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080, workers=4)
Monitorowanie: wykrywanie znoszenia i ostrzeganie
Monitorowanie nie ogranicza się do wskaźników infrastruktury (opóźnień, czasu pracy): musi obejmować jakość prognoz w czasie. Używamy Ewidentnie AI dla wykrywanie dryfu, np Prometeusz + Grafana do monitorowania w czasie rzeczywistym. Krytycznym elementem jest wyzwalacz automatycznego ponownego uczenia się, gdy model ulega degradacji.
# drift_detector.py
# Rilevamento drift su dati e predizioni con Evidently AI
# Eseguito ogni settimana via cron job o GitHub Actions scheduled
import json
import logging
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from pathlib import Path
from typing import Optional
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DatasetMissingValuesSummary,
ColumnDriftMetric
)
logger = logging.getLogger(__name__)
class ChurnDriftDetector:
"""
Monitora il drift nei dati e nelle predizioni del modello churn.
Confronta la distribuzione dei dati di produzione con il reference dataset
(i dati su cui il modello e stato addestrato).
"""
def __init__(
self,
reference_path: str = "data/features/feature_matrix_reference.parquet",
drift_threshold: float = 0.2,
alert_webhook: Optional[str] = None
):
self.reference_df = pd.read_parquet(reference_path)
self.drift_threshold = drift_threshold
self.alert_webhook = alert_webhook
def analyze_drift(
self,
current_df: pd.DataFrame,
analysis_date: datetime = None
) -> dict:
"""
Esegue analisi completa di drift sul dataset corrente.
Args:
current_df: Dati di produzione del periodo corrente
analysis_date: Data di riferimento dell'analisi
Returns:
dict con risultati drift e flag retraining_required
"""
analysis_date = analysis_date or datetime.utcnow()
# ---- Report Evidently ----
report = Report(metrics=[
DatasetDriftMetric(),
DatasetMissingValuesSummary(),
# Monitoriamo le feature più predittive
ColumnDriftMetric(column_name="recency_days"),
ColumnDriftMetric(column_name="frustration_index"),
ColumnDriftMetric(column_name="activity_trend"),
ColumnDriftMetric(column_name="data_usage_gb_30d"),
])
report.run(
reference_data=self.reference_df,
current_data=current_df
)
report_dict = report.as_dict()
drift_results = report_dict["metrics"][0]["result"]
dataset_drift_detected = drift_results["dataset_drift"]
drift_share = drift_results["drift_share"]
n_drifted_features = drift_results["number_of_drifted_columns"]
# ---- Analisi predizioni (churn rate trend) ----
ref_churn_rate = self.reference_df["churned"].mean() if "churned" in self.reference_df.columns else None
curr_churn_rate = current_df["churn_prediction"].mean() if "churn_prediction" in current_df.columns else None
churn_rate_drift = None
if ref_churn_rate and curr_churn_rate:
churn_rate_drift = abs(curr_churn_rate - ref_churn_rate) / ref_churn_rate
# ---- Decision logic per retraining ----
retraining_required = (
dataset_drift_detected and drift_share > self.drift_threshold
) or (
churn_rate_drift is not None and churn_rate_drift > 0.30
)
results = {
"analysis_date": analysis_date.isoformat(),
"dataset_drift_detected": bool(dataset_drift_detected),
"drift_share": float(drift_share),
"n_drifted_features": int(n_drifted_features),
"drift_threshold": self.drift_threshold,
"reference_churn_rate": float(ref_churn_rate) if ref_churn_rate else None,
"current_churn_rate": float(curr_churn_rate) if curr_churn_rate else None,
"churn_rate_drift_pct": float(churn_rate_drift * 100) if churn_rate_drift else None,
"retraining_required": bool(retraining_required),
"severity": self._compute_severity(drift_share, churn_rate_drift)
}
# Salva report HTML
report_path = f"monitoring/drift_report_{analysis_date.strftime('%Y%m%d')}.html"
Path("monitoring").mkdir(exist_ok=True)
report.save_html(report_path)
logger.info(f"Report drift salvato: {report_path}")
if retraining_required:
self._trigger_retraining_alert(results)
return results
def _compute_severity(self, drift_share: float, churn_rate_drift: Optional[float]) -> str:
if drift_share > 0.5 or (churn_rate_drift and churn_rate_drift > 0.5):
return "critical"
elif drift_share > 0.3 or (churn_rate_drift and churn_rate_drift > 0.3):
return "high"
elif drift_share > self.drift_threshold:
return "medium"
return "low"
def _trigger_retraining_alert(self, results: dict):
"""Notifica il team e trigghera il retraining via GitHub Actions dispatch."""
logger.warning(f"DRIFT CRITICO rilevato - Retraining richiesto! {results}")
if self.alert_webhook:
import requests
payload = {
"event_type": "retrain-trigger",
"client_payload": {
"reason": "drift_detected",
"drift_share": results["drift_share"],
"analysis_date": results["analysis_date"]
}
}
resp = requests.post(
self.alert_webhook,
json=payload,
headers={"Authorization": f"token ${GITHUB_TOKEN}"}
)
logger.info(f"GitHub Actions dispatch: HTTP {resp.status_code}")
Potok CI/CD: ukończono działania w GitHub
Potok GitHub Actions integruje wszystkie komponenty: począwszy od sprawdzania poprawności danych po szkolenia, od ewaluacji po wdrożenie na Kubernetesie. Przepływ pracy został zaprojektowany szybko zakończyć niepowodzenie, blokując wdrożenia, jeśli metryki tego nie robią przekraczać określone progi.
name: ML Training Pipeline - TelecomIT Churn
on:
schedule:
- cron: '0 3 1 * *' # Ogni 1° del mese alle 03:00 UTC
workflow_dispatch:
inputs:
reason:
description: 'Motivo del retraining manuale'
required: false
default: 'manual'
force_deploy:
description: 'Forza deployment anche se metriche non migliorano'
type: boolean
default: false
repository_dispatch:
types: [retrain-trigger]
env:
PYTHON_VERSION: "3.11"
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
DVC_REMOTE_URL: ${{ secrets.DVC_REMOTE_URL }}
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/churn-serving
jobs:
# ---- JOB 1: Data Validation ----
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC remote
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
- name: Pull latest data
run: dvc pull data/raw/
- name: Run data validation
run: dvc repro data_validation
id: validation
- name: Check validation passed
run: |
RESULT=$(cat data/raw/validation_summary.json | python -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d['all_checks_passed'] else 1)")
echo "Validation result: $RESULT"
- name: Upload validation report
uses: actions/upload-artifact@v4
with:
name: validation-report
path: data/raw/validation_report.html
# ---- JOB 2: Training ----
training:
runs-on: ubuntu-latest
needs: data-validation
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Configure DVC and pull features
run: |
dvc remote modify myremote access_key_id ${{ secrets.DVC_ACCESS_KEY }}
dvc remote modify myremote secret_access_key ${{ secrets.DVC_SECRET_KEY }}
dvc pull data/features/
- name: Run preprocessing + feature engineering
run: dvc repro preprocessing feature_engineering
- name: Run training
run: dvc repro training
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
- name: Push artifacts to DVC remote
run: dvc push models/
- name: Output metrics
id: metrics
run: |
AUC=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['auc_roc'])")
RECALL=$(cat models/registry/metrics.json | python -c "import sys,json; print(json.load(sys.stdin)['recall'])")
echo "auc_roc=$AUC" >> $GITHUB_OUTPUT
echo "recall=$RECALL" >> $GITHUB_OUTPUT
echo "Training completato - AUC: $AUC, Recall: $RECALL"
outputs:
auc_roc: ${{ steps.metrics.outputs.auc_roc }}
recall: ${{ steps.metrics.outputs.recall }}
# ---- JOB 3: Evaluation Gate ----
evaluation-gate:
runs-on: ubuntu-latest
needs: training
steps:
- name: Check metrics threshold
run: |
AUC=${{ needs.training.outputs.auc_roc }}
RECALL=${{ needs.training.outputs.recall }}
MIN_AUC=0.80
MIN_RECALL=0.65
echo "AUC: $AUC (min: $MIN_AUC)"
echo "Recall: $RECALL (min: $MIN_RECALL)"
python -c "
auc = float('$AUC')
recall = float('$RECALL')
if auc < $MIN_AUC:
print(f'FAIL: AUC {auc:.4f} sotto soglia {$MIN_AUC}')
exit(1)
if recall < $MIN_RECALL:
print(f'FAIL: Recall {recall:.4f} sotto soglia {$MIN_RECALL}')
exit(1)
print('PASS: Metriche sopra soglia - deployment approvato')
"
# ---- JOB 4: Build & Push Docker ----
build-push:
runs-on: ubuntu-latest
needs: evaluation-gate
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Login to GHCR
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
file: docker/Dockerfile.serving
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ---- JOB 5: Deploy to Kubernetes ----
deploy:
runs-on: ubuntu-latest
needs: build-push
environment: production
steps:
- uses: actions/checkout@v4
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy to Kubernetes (rolling update)
run: |
kubectl set image deployment/churn-serving \
churn-api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
-n ml-production
kubectl rollout status deployment/churn-serving -n ml-production --timeout=300s
- name: Run smoke tests
run: |
python tests/smoke/test_api_smoke.py \
--endpoint ${{ secrets.API_ENDPOINT }}
- name: Promote model to champion in MLflow
run: |
python -c "
import mlflow
client = mlflow.MlflowClient('${{ env.MLFLOW_TRACKING_URI }}')
# Recupera l'ultima versione Staging
versions = client.get_latest_versions('telecomit-churn-xgb', stages=['Staging'])
if versions:
client.transition_model_version_stage(
name='telecomit-churn-xgb',
version=versions[0].version,
stage='Production',
archive_existing_versions=True
)
print(f'Modello v{versions[0].version} promosso a Production')
"
Wyniki, wskaźniki i zwrot z inwestycji
Po 6 miesiącach pracy w produkcji system przewidywania odejść klientów TelecomIT przyniosło wymierne i udokumentowane rezultaty. Poniższe dane są reprezentatywne rzeczywistych scenariuszy w średnich firmach telekomunikacyjnych.
| Metryczny | Cel | Wynik M1 | Wynik M6 | Trend |
|---|---|---|---|---|
| AUC-ROC | ≥ 0.80 | 0.823 | 0.841 | + |
| Recall (churn) | ≥ 0.65 | 0.703 | 0.718 | + |
| Precision (churn) | ≥ 0.60 | 0.634 | 0.672 | + |
| Latenza real-time p99 | < 200ms | 87ms | 82ms | + |
| Throughput batch | 2.1M in <4h | 2.1M in 2h 47m | 2.1M in 2h 31m | + |
| Uptime API | ≥ 99.9% | 99.94% | 99.97% | + |
| Drift alert eventi | - | 2 | 1 (M3, rimediato) | + |
Costi di Infrastruttura Reali
| Componente | Costo Mensile | Costo Annuale | Note |
|---|---|---|---|
| VPS Hetzner (2 core, 4GB) - serving | 5.83 EUR | 70 EUR | FastAPI + k3s |
| VPS Hetzner (4 core, 8GB) - MLflow + MinIO | 15.90 EUR | 191 EUR | Tracking + storage |
| VPS Hetzner (2 core, 4GB) - monitoring | 5.83 EUR | 70 EUR | Prometheus + Grafana |
| GitHub Actions (free tier) | 0 EUR | 0 EUR | 2000 min/mese free |
| Backup storage (Hetzner Object) | 2.50 EUR | 30 EUR | DVC remote storage |
| Całkowity | 30,06 EUR | 361 EUR | Dobrze poniżej 5 tys. EUR/rok |
ROI projektu MLOps
Całkowity koszt w pierwszym roku (rozwój + infrastruktura): ~45 000 EUR (7 miesięcy deweloper + infrastruktura). Szacunkowe roczne oszczędności netto: 7,2 mln EUR. 12-miesięczny zwrot z inwestycji: ~158x. Okres zwrotu wynosił 3 tygodnie.
Wyciągnięte wnioski i antywzorce, których należy unikać
Anty-wzorzec 1: „Najpierw modelujmy, później MLOps”
60% zespołów zaczyna od szablonu i dodaje infrastrukturę dopiero, gdy trafia ona do środowiska produkcyjnego. Takie podejście podwaja koszty refaktoryzacji. Rozwiązanie: skonfiguruj DVC, MLflow i minimalny potok CI/CD od pierwszego dnia, nawet z symbolami zastępczymi. Koszt krańcowy i niski, przewaga jest ogromna.
Anty-wzorzec 2: Tylko monitorowanie infrastruktury
Monitorowanie samych opóźnień i czasu pracy nie wystarczy dla systemu ML. Model może być technicznie „zdrowy” (odpowiada w ciągu 50 ms, brak błędów HTTP 500), ale generuje prognozy ulegają degradacji w wyniku dryfu danych. Rozwiązanie: zawsze monitoruj dystrybucję wejść i wyjść modelu, a nie tylko metryk systemowych.
Antywzorzec 3: Przekwalifikowanie się na ślepo
Nie każde przekwalifikowanie poprawia model. Jeśli spust jest źle skalibrowany, ryzykujesz skończyć z gorszym modelem w produkcji. Rozwiązanie: każdy przekwalifikowanie musi przejść bramkę oceny z wyraźnym porównaniem pretendenta do mistrza przed wdrożeniem. Zawsze musi być dostępne automatyczne przywracanie zmian.
Ostateczna lista kontrolna przed wdrożeniem
## Checklist Deploy Modello ML - TelecomIT Churn
### qualità del Modello
- [ ] AUC-ROC test set >= 0.80
- [ ] Recall test set >= 0.65
- [ ] Performance >= champion model attuale
- [ ] Nessuna feature con importanza anomala (segnale data leakage)
- [ ] Test su holdout temporale (dati fuori dal training period)
### Governance
- [ ] Model card generata e approvata da ML Lead
- [ ] Fairness check eseguito (demographic parity < 0.10)
- [ ] SHAP analysis disponibile per top-10 feature
- [ ] Audit trail registrato in MLflow con annotazioni
### Infrastruttura
- [ ] Dockerfile building senza warning di sicurezza
- [ ] Health check /health risponde 200
- [ ] Smoke test /predict con payload reale
- [ ] Latenza p99 < 200ms verificata con k6 o locust
- [ ] Rollback plan documentato e testato
### Monitoring
- [ ] Dashboard Grafana aggiornata con versione modello
- [ ] Alert Prometheus configurati per nuova versione
- [ ] Reference dataset Evidently aggiornato
- [ ] On-call aggiornato sul deploy
### Documenti
- [ ] JIRA ticket di deployment chiuso con evidence
- [ ] Post-deploy review schedulata a T+7 giorni
Wnioski: MLOps to nie projekt, to praktyka
W tym studium przypadku zbudowaliśmy kompletny system MLOps do przewidywania rezygnacji, począwszy od problemu biznesowego aż po wdrożenie na Kubernetesie wraz z monitoringiem, wykrywanie dryfu i zarządzanie nim. System spełnia wszystkie wymagania: opóźnienie <200ms w czasie rzeczywistym, 2,1 mln klientów w mniej niż 3 godziny wsadowo, AUC-ROC 0,84, koszty infrastruktura 361 EUR/rok.
Kluczowe przesłanie serii jest następujące: MLOps nie jest technologią, i dyscyplina. Nie ma narzędzia, które po zainstalowaniu rozwiąże wszystkie problemy problemy. Oraz zestaw praktyk, procesów i kultury, które umożliwiają modele uczenia maszynowego aby działać niezawodnie przez długi czas. Kod, który napisaliśmy w this Seria jest punktem wyjścia: prawdziwa praca polega na obserwacji, mierzeniu i uczeniu się i stale się doskonalić.
Zasoby do kontynuacji
- Ta kompletna seria: MLOps: od eksperymentu do produkcji → Rurociąg CI/CD → Wersja DVC → MLflow → Wykrywanie dryfu → Obsługa FastAPI → Kubernetes → Testy A/B → Zarządzanie
- Zaawansowane głębokie uczenie się: Zaawansowana seria głębokiego uczenia się - LoRA, kwantyzacja, Edge AI
- Inżynieria sztucznej inteligencji: Seria inżynierii AI - RAG, baza danych wektorowych, LangChain
- Repozytoria GitHub: Dostępny jest pełny kod tego studium przypadku na github.com/federicocalo/telecomit-churn-mlops (struktura, a nie rzeczywiste dane)
Rynek MLOps wzrośnie z 4,38 mld dolarów w 2026 r. do 89,18 mld dolarów w 2035 r. (CAGR 39,8%). Umiejętności, które nabyłeś w tej serii, stawiają Cię na doskonałej pozycji aby zaradzić temu wzrostowi. Szczęśliwych MLOpsów.







