Dall'Idea alla Produzione: La Pipeline ML Completa
Costruire un modello ML che funziona su un notebook Jupyter è solo il 10% del lavoro. Il restante 90% consiste nel trasformarlo in un sistema production-ready: affidabile, monitorato, riproducibile e manutenibile. Una pipeline ML end-to-end automatizza l'intero flusso, dal dato grezzo alla previsione in produzione, garantendo consistenza e qualità ad ogni esecuzione. Questo articolo costruisce una pipeline completa per un problema reale: la churn prediction (previsione dell'abbandono clienti).
La pipeline copre sette fasi: caricamento dati, preprocessing, feature engineering, training, valutazione, selezione del modello e deployment. Ogni fase è un componente modulare e testabile, orchestrato da strumenti come MLflow per il tracking degli esperimenti e Docker per la containerizzazione.
Cosa Imparerai in Questo Articolo
- Architettura di una pipeline ML completa
- Case study: churn prediction end-to-end
- Experiment tracking con MLflow
- Data versioning e riproducibilità
- Model serving con FastAPI
- Monitoring e retraining in produzione
Fase 1: Caricamento e Validazione Dati
La prima fase carica i dati dalla sorgente e ne verifica la qualità. I controlli includono: schema validation (colonne attese, tipi di dato), data quality checks (percentuale valori mancanti, range validi, distribuzioni anomale), e data profiling (statistiche descrittive). Se i dati non superano i controlli, la pipeline si ferma con un errore chiaro invece di produrre un modello inaffidabile.
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class DataValidationResult:
"""Risultato della validazione dati."""
is_valid: bool
errors: List[str]
warnings: List[str]
stats: Dict[str, float]
def load_and_validate(filepath: str, expected_columns: List[str]) -> tuple:
"""Carica e valida il dataset."""
errors = []
warnings = []
# Caricamento
df = pd.read_csv(filepath)
# Schema validation
missing_cols = set(expected_columns) - set(df.columns)
if missing_cols:
errors.append(f"Colonne mancanti: {missing_cols}")
# Data quality checks
null_pct = df.isnull().mean()
high_null_cols = null_pct[null_pct > 0.3].index.tolist()
if high_null_cols:
warnings.append(f"Colonne con >30% null: {high_null_cols}")
# Duplicati
n_dupes = df.duplicated().sum()
if n_dupes > 0:
warnings.append(f"{n_dupes} righe duplicate trovate")
# Statistiche
stats = {
'n_rows': len(df),
'n_cols': len(df.columns),
'null_pct_avg': null_pct.mean(),
'n_duplicates': n_dupes
}
result = DataValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
stats=stats
)
return df, result
# Uso
# df, validation = load_and_validate('data/churn.csv', expected_columns)
# if not validation.is_valid:
# raise ValueError(f"Validazione fallita: {validation.errors}")
print("Pipeline Stage 1: Data Loading & Validation - OK")
Fase 2-3: Preprocessing e Feature Engineering
Il preprocessing e il feature engineering vengono incapsulati in scikit-learn Pipeline e
custom transformers. I custom transformers estendono BaseEstimator e
TransformerMixin per integrarsi perfettamente nella pipeline. Questo garantisce che le stesse
trasformazioni vengano applicate sia in training che in produzione, eliminando il rischio di inconsistenze.
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
class FeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom transformer per feature engineering."""
def fit(self, X, y=None):
return self
def transform(self, X):
df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
# Feature derivate (esempio churn prediction)
if 'tenure' in df.columns and 'monthly_charges' in df.columns:
df['total_spent'] = df['tenure'] * df['monthly_charges']
df['avg_monthly_ratio'] = df['monthly_charges'] / (df['tenure'] + 1)
if 'tenure' in df.columns:
df['tenure_group'] = pd.cut(
df['tenure'], bins=[0, 12, 24, 48, 72],
labels=['new', 'developing', 'mature', 'loyal']
)
return df
def build_preprocessing_pipeline(
numeric_features: list,
categorical_features: list
) -> Pipeline:
"""Costruisce la pipeline di preprocessing."""
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
return Pipeline([
('feature_engineer', FeatureEngineer()),
('preprocessor', preprocessor)
])
print("Pipeline Stage 2-3: Preprocessing & Feature Engineering - OK")
Fase 4-5: Training, Valutazione e Selezione Modello
La fase di training confronta sistematicamente più algoritmi con cross-validation e seleziona il migliore. L'experiment tracking con MLflow registra automaticamente parametri, metriche e artefatti di ogni esperimento, rendendo il processo riproducibile e confrontabile. Il modello selezionato viene serializzato e versionato per il deployment.
from sklearn.ensemble import (
RandomForestClassifier, GradientBoostingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
import numpy as np
from datetime import datetime
class ExperimentTracker:
"""Tracker semplificato per esperimenti ML."""
def __init__(self):
self.experiments = []
def log_experiment(self, name, params, metrics, model):
self.experiments.append({
'name': name,
'params': params,
'metrics': metrics,
'model': model,
'timestamp': datetime.now().isoformat()
})
def get_best(self, metric='f1'):
return max(self.experiments, key=lambda x: x['metrics'].get(metric, 0))
def train_and_evaluate(X, y, tracker):
"""Addestra e valuta più' modelli."""
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {
'accuracy': 'accuracy',
'f1': 'f1',
'roc_auc': 'roc_auc',
'precision': 'precision',
'recall': 'recall'
}
models = {
'LogisticRegression': LogisticRegression(max_iter=10000, random_state=42),
'RandomForest': RandomForestClassifier(n_estimators=200, random_state=42),
'GradientBoosting': GradientBoostingClassifier(
n_estimators=200, learning_rate=0.05, max_depth=5, random_state=42
)
}
for name, model in models.items():
results = cross_validate(
model, X, y, cv=cv, scoring=scoring, return_train_score=True
)
metrics = {
metric: results[f'test_{metric}'].mean()
for metric in scoring.keys()
}
tracker.log_experiment(name, model.get_params(), metrics, model)
print(f"{name:<25s} F1={metrics['f1']:.3f} AUC={metrics['roc_auc']:.3f}")
best = tracker.get_best('f1')
print(f"\nMiglior modello: {best['name']} (F1={best['metrics']['f1']:.3f})")
return best
# Uso
tracker = ExperimentTracker()
# best_model = train_and_evaluate(X_preprocessed, y, tracker)
print("Pipeline Stage 4-5: Training & Evaluation - OK")
Fase 6: Model Serving con FastAPI
Il deployment del modello come API REST con FastAPI permette a qualsiasi applicazione di ottenere previsioni in tempo reale. L'API accetta richieste JSON con le feature del cliente e restituisce la probabilità di churn e la classe predetta. Il modello serializzato (con joblib o pickle) viene caricato all'avvio del server.
# api.py - Deployment del modello con FastAPI
# pip install fastapi uvicorn
from dataclasses import dataclass
from typing import Dict, Any
import json
@dataclass
class PredictionRequest:
"""Schema della richiesta di predizione."""
tenure: int
monthly_charges: float
total_charges: float
contract: str
payment_method: str
@dataclass
class PredictionResponse:
"""Schema della risposta."""
churn_probability: float
prediction: str
confidence: float
model_version: str
class ModelServer:
"""Server per il modello ML."""
def __init__(self, model_path: str, version: str = "1.0.0"):
self.version = version
# In produzione: self.model = joblib.load(model_path)
# self.pipeline = joblib.load(f"{model_path}/pipeline.pkl")
print(f"Modello v{version} caricato da {model_path}")
def predict(self, request: PredictionRequest) -> PredictionResponse:
"""Genera predizione per un singolo cliente."""
# features = self.pipeline.transform(request_to_dataframe(request))
# proba = self.model.predict_proba(features)[0]
# Simulazione
proba = [0.3, 0.7]
return PredictionResponse(
churn_probability=round(proba[1], 3),
prediction="churn" if proba[1] > 0.5 else "no_churn",
confidence=round(max(proba), 3),
model_version=self.version
)
# FastAPI app (in produzione):
# app = FastAPI(title="Churn Prediction API")
# server = ModelServer("models/best_model")
#
# @app.post("/predict")
# async def predict(request: PredictionRequest):
# return server.predict(request)
#
# Avvio: uvicorn api:app --host 0.0.0.0 --port 8000
print("Pipeline Stage 6: Model Serving - OK")
Fase 7: Monitoring e Retraining
Un modello in produzione degrada nel tempo a causa del data drift (i dati di produzione cambiano rispetto al training) e del concept drift (la relazione tra feature e target cambia). Il monitoring continuo traccia le performance del modello, la distribuzione degli input e la distribuzione delle predizioni. Quando le metriche scendono sotto una soglia, viene triggerato il retraining automatico.
Production Checklist ML: (1) Versionamento dati e modello, (2) Pipeline riproducibile, (3) Test automatici per preprocessing e predizioni, (4) Monitoring delle metriche in produzione, (5) Alert per data drift e performance degradation, (6) Rollback rapido alla versione precedente, (7) Logging completo per debugging, (8) A/B testing per nuove versioni del modello.
Strumenti MLOps
L'ecosistema MLOps offre strumenti specializzati per ogni fase. MLflow per experiment tracking e model registry. DVC (Data Version Control) per il versionamento dei dati e delle pipeline. Docker per la containerizzazione. GitHub Actions per CI/CD. Prometheus + Grafana per il monitoring. Great Expectations per la data quality. La scelta degli strumenti dipende dalla scala del progetto e dall'infrastruttura esistente.
Punti Chiave
- Una pipeline ML end-to-end copre 7 fasi: dal dato grezzo al modello monitorato in produzione
- Custom transformers in scikit-learn garantiscono consistenza tra training e inference
- Experiment tracking (MLflow) rende gli esperimenti riproducibili e confrontabili
- FastAPI + Docker è lo stack standard per il model serving
- Data drift e concept drift richiedono monitoring continuo e retraining automatico
- La riproducibilità è il requisito più importante in una pipeline ML di produzione







