Creo applicazioni web moderne e strumenti digitali personalizzati per aiutare le attività a crescere attraverso l'innovazione tecnologica. La mia passione è unire informatica ed economia per generare valore reale.
La mia passione per l'informatica è nata tra i banchi dell'Istituto Tecnico Commerciale di Maglie, dove ho scoperto il potere della programmazione e il fascino di creare soluzioni digitali. Fin da subito, ho capito che l'informatica non era solo codice, ma uno strumento straordinario per trasformare idee in realtà.
Durante gli studi superiori in Sistemi Informativi Aziendali, ho iniziato a intrecciare informatica ed economia, comprendendo come la tecnologia possa essere il motore della crescita per qualsiasi attività. Questa visione mi ha accompagnato all'Università degli Studi di Bari, dove ho conseguito la Laurea in Informatica, approfondendo le mie competenze tecniche e la mia passione per lo sviluppo software.
Oggi metto questa esperienza al servizio di imprese, professionisti e startup, creando soluzioni digitali su misura che automatizzano processi, ottimizzano risorse e aprono nuove opportunità di business. Perché la vera innovazione inizia quando la tecnologia incontra le esigenze reali delle persone.
Le Mie Competenze
Analisi Dati & Modelli Previsionali
Trasformo i dati in insights strategici con analisi approfondite e modelli predittivi per decisioni informate
Automazione Processi
Creo strumenti personalizzati che automatizzano operazioni ripetitive e liberano tempo per attività a valore aggiunto
Sistemi Custom
Sviluppo sistemi software su misura, dalle integrazioni tra piattaforme alle dashboard personalizzate
Credo fermamente che l'informatica sia lo strumento più potente per trasformare le idee in realtà e migliorare la vita delle persone.
Democratizzare la Tecnologia
La mia missione è rendere l'informatica accessibile a tutti: dalle piccole imprese locali alle startup innovative, fino ai professionisti che vogliono digitalizzare la propria attività. Ogni realtà merita di sfruttare le potenzialità del digitale.
Unire Informatica ed Economia
Non è solo questione di scrivere codice: è capire come la tecnologia possa generare valore reale. Intrecciando competenze informatiche e visione economica, aiuto le attività a crescere, ottimizzare processi e raggiungere nuovi traguardi di efficienza e redditività.
Creare Soluzioni su Misura
Ogni attività è unica, e così devono esserlo le soluzioni. Sviluppo strumenti personalizzati che rispondono alle esigenze specifiche di ciascun cliente, automatizzando processi ripetitivi e liberando tempo per ciò che conta davvero: far crescere il business.
Trasforma la Tua Attività con la Tecnologia
Che tu gestisca un negozio, uno studio professionale o un'azienda, posso aiutarti a sfruttare le potenzialità dell'informatica per lavorare meglio, più velocemente e in modo più intelligente.
Il mio percorso accademico e le tecnologie che padroneggio
Certificazioni Professionali
8 certificazioni conseguite
Nuovo
Visualizza
Reinvention With Agentic AI Learning Program
Anthropic
Dicembre 2024
Nuovo
Visualizza
Agentic AI Fluency
Anthropic
Dicembre 2024
Nuovo
Visualizza
AI Fluency for Students
Anthropic
Dicembre 2024
Nuovo
Visualizza
AI Fluency: Framework and Foundations
Anthropic
Dicembre 2024
Nuovo
Visualizza
Claude with the Anthropic API
Anthropic
Dicembre 2024
Visualizza
Master SQL
RoadMap.sh
Novembre 2024
Visualizza
Oracle Certified Foundations Associate
Oracle
Ottobre 2024
Visualizza
People Leadership Credential
Connect
Settembre 2024
Linguaggi & Tecnologie
Java
Python
JavaScript
Angular
React
TypeScript
SQL
PHP
CSS/SCSS
Node.js
Docker
Git
💼
12/2024 - Presente
Custom Software Engineering Analyst
Accenture
Bari, Puglia, Italia · Ibrida
Analisi e sviluppo di sistemi informatici attraverso l'utilizzo di Java e Quarkus in Health and Public Sector. Formazione continua su tecnologie moderne per la creazione di soluzioni software personalizzate ed efficienti e sugli agenti.
💼
06/2022 - 12/2024
Analista software e Back End Developer Associate Consultant
Links Management and Technology SpA
Esperienza nell'analisi di sistemi software as-is e flussi ETL utilizzando PowerCenter. Formazione completata su Spring Boot per lo sviluppo di applicazioni backend moderne e scalabili. Sviluppatore Backend specializzato in Spring Boot, con esperienza in progettazione di database, analisi, sviluppo e testing dei task assegnati.
💼
02/2021 - 10/2021
Programmatore software
Adesso.it (prima era WebScience srl)
Esperienza nell'analisi AS-IS e TO-BE, evoluzioni SEO ed evoluzioni website per migliorare le performance e l'engagement degli utenti.
🎓
2018 - 2025
Laurea in Informatica
Università degli Studi di Bari Aldo Moro
Bachelor's degree in Computer Science, focusing on software engineering, algorithms, and modern development practices.
📚
2013 - 2018
Diploma - Sistemi Informativi Aziendali
Istituto Tecnico Commerciale di Maglie
Technical diploma specializing in Business Information Systems, combining IT knowledge with business management.
Contattami
Hai un progetto in mente? Parliamone! Compila il form qui sotto e ti risponderò al più presto.
* Campi obbligatori. I tuoi dati saranno utilizzati solo per rispondere alla tua richiesta.
Experiment Tracking con MLflow: Guida Completa
Hai mai perso ore a cercare quale combinazione di iperparametri aveva prodotto quel risultato
eccellente di tre settimane fa? O ti sei trovato a chiederti perchè il modello in produzione
si comporta diversamente da quello che avevi testato in locale? Questi problemi, comunissimi
nel ciclo di vita del machine learning, hanno una radice comune: la mancanza di un sistema
strutturato di experiment tracking.
MLflow e la risposta open-source più diffusa a questo problema. Nato in
Databricks nel 2018 e diventato un progetto Apache nel 2019, MLflow si e affermato come
lo standard de facto per il tracking degli esperimenti ML nell'ecosistema Python. Con il
rilascio di MLflow 3 nel giugno 2025 durante il Databricks Data + AI Summit,
la piattaforma ha fatto un salto evolutivo significativo: da strumento di tracking a
piattaforma unificata per sviluppo, valutazione e deployment di modelli ML e GenAI,
con LoggedModel come entità di prima classe e performance di logging migliorate del 25%
rispetto alla versione 2.5.
In questa guida vedremo MLflow end-to-end: dall'installazione al tracking avanzato,
dall'autologging al Model Registry, fino al serving dei modelli e all'integrazione con Docker.
Ogni esempio e testato e pronto all'uso in produzione.
Cosa Imparerai in Questo Articolo
Architettura MLflow: tracking server, backend store, artifact store e le novità di MLflow 3
Setup locale e produzione: SQLite, PostgreSQL, S3 come artifact store
Autologging: integrazione zero-config con scikit-learn, XGBoost, PyTorch, TensorFlow
Model Registry: staging, production, archivio e gestione del ciclo di vita dei modelli
Model Serving con MLflow: REST API, Docker container, integrazione FastAPI
MLflow con Docker Compose per deployment in produzione
Confronto con alternative: W&B, Neptune, ClearML - quando scegliere cosa
Best practices e anti-patterns per team ML di ogni dimensione
La Serie MLOps e Machine Learning in Produzione
#
Articolo
Focus
1
MLOps: Da Esperimento a Produzione
Fondamenta e lifecycle completo
2
Pipeline ML con CI/CD
GitHub Actions e Docker per ML
3
Versioning DVC vs LakeFS
Dataset e model versioning
4
Sei qui - Experiment Tracking con MLflow
Tracking, registry, serving
5
Model Drift Detection
Monitoraggio e retraining automatico
6
Serving con FastAPI + Uvicorn
Deployment modelli in produzione
7
Scaling ML su Kubernetes
KubeFlow e Seldon Core
8
A/B Testing di Modelli ML
Metodologia e implementazione
9
Governance ML
Compliance, AI Act EU, ethics
10
Case Study: Churn Prediction
Pipeline end-to-end in produzione
Architettura MLflow: I Quattro Componenti Fondamentali
Prima di scrivere una riga di codice, e importante capire i building block di MLflow.
La piattaforma si compone di quattro componenti principali, ciascuno con un ruolo specifico
nel ciclo di vita del machine learning:
MLflow Tracking: l'API e la UI per loggare e interrogare esperimenti.
Registra parametri, metriche, tag, artefatti e note per ogni run di training.
MLflow Projects: formato per impacchettare codice ML in run riproducibili,
con gestione delle dipendenze e dell'ambiente tramite Conda o Docker.
MLflow Models: formato standard per salvare modelli in modo che possano
essere serviti da più framework (Python function, REST API, Spark UDF, ecc.).
MLflow Model Registry: store centralizzato per gestire il ciclo di vita
dei modelli: versioning, staging, produzione, archiviazione e audit trail.
Con MLflow 3 (2025) si aggiunge un quinto elemento fondamentale: il concetto di
LoggedModel come entità di prima classe. Invece del precedente approccio
run-centrico (dove il modello era solo un artefatto di un run), i LoggedModel persistono
attraverso multipli run, ambienti e deployment, con lineage completa verso parametri,
metriche, tracce e dati di valutazione.
Architettura di Storage MLflow
Componente
Cosa Contiene
Backend Consigliato
Backend Store
Parametri, metriche, tag, metadata dei run
PostgreSQL / MySQL (produzione), SQLite (locale)
Artifact Store
File del modello, immagini, CSV, dati di valutazione
L'installazione base di MLflow richiede un singolo comando pip. Per development locale,
MLflow usa SQLite come backend store e il filesystem locale come artifact store:
nessun server aggiuntivo necessario.
# Installazione MLflow (versione 2.x/3.x)
pip install mlflow
# Con extras per integrazioni specifiche
pip install mlflow[extras] # scikit-learn, XGBoost, LightGBM
pip install mlflow[databricks] # integrazione Databricks
pip install mlflow[genai] # tools per GenAI e LLM (MLflow 3+)
# Verifica installazione
mlflow --version
# mlflow, version 2.x.x o 3.x.x
# Avvia la UI locale (usa ./mlruns come storage)
mlflow ui
# UI disponibile su http://localhost:5000
# Avvia con database SQLite
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 \
--port 5000
Setup Produzione: PostgreSQL + S3
Per ambienti di produzione con più utenti e concorrenza elevata, e fondamentale usare
un database relazionale come backend e un object storage come artifact store. Il Model
Registry di MLflow richiede obbligatoriamente un database (non funziona con file system):
# .env file (NON commitare in Git!)
POSTGRES_PASSWORD=sicura_password_123
MINIO_ACCESS_KEY=minio_admin
MINIO_SECRET_KEY=minio_password_sicura
# Avvio dello stack completo
docker-compose up -d
# Verifica che MLflow sia in ascolto
curl http://localhost:5000/health
# {"status": "OK"}
# Crea il bucket MinIO per gli artefatti
docker exec -it minio_container mc alias set local http://localhost:9000 admin password
docker exec -it minio_container mc mb local/mlflow-artifacts
Budget <5K EUR/anno per PMI
Per team con budget limitato, questa configurazione con Docker Compose su una singola VM
costa circa 180 EUR/anno (EC2 t3.small o equivalente). MinIO sostituisce S3 in locale
con API completamente compatibili. Per storage persistente si può usare anche AWS S3
(circa 2-5 EUR/mese per pochi GB di artefatti). Il risparmio rispetto a piattaforme
SaaS come W&B Teams (50+ USD/utente/mese) e significativo: un team di 5 persone
risparmia oltre 2500 EUR/anno.
Experiment Tracking: Log Params, Metrics e Artifacts
Il cuore di MLflow e l'API di tracking. Ogni chiamata a mlflow.start_run()
crea un nuovo run all'interno di un experiment. Un
experiment raggruppa run correlati (es. tutti i run per il modello di churn prediction).
I run loggano quattro tipi di dati:
Parameters: valori fissi per il run (iperparametri, configurazione)
Metrics: valori numerici che possono variare nel tempo (loss, accuracy per epoch)
Tags: metadati chiave-valore per annotare e filtrare i run
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
accuracy_score, f1_score, roc_auc_score,
confusion_matrix, classification_report, RocCurveDisplay
)
from sklearn.model_selection import train_test_split
import os
# ==================== Configurazione MLflow ====================
# Connessione al tracking server (locale o remoto)
mlflow.set_tracking_uri("http://localhost:5000")
# Crea o usa un experiment esistente
experiment_name = "churn-prediction-gbm"
mlflow.set_experiment(experiment_name)
# Ottieni informazioni sull'experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
print(f"Experiment ID: {experiment.experiment_id}")
# ==================== Training con Tracking Completo ====================
def train_churn_model(X_train, X_val, y_train, y_val, params: dict) -> str:
"""
Allena un GBM per churn prediction con tracking MLflow completo.
Restituisce il run_id del run MLflow.
"""
with mlflow.start_run(run_name=f"gbm-lr{params['learning_rate']}-depth{params['max_depth']}") as run:
# ---- 1. TAG: metadata del run ----
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"dataset_version": "v2.1",
"git_commit": os.popen("git rev-parse HEAD").read().strip(),
"environment": "dev",
})
# ---- 2. PARAMS: iperparametri e configurazione ----
mlflow.log_params(params)
mlflow.log_params({
"train_size": len(X_train),
"val_size": len(X_val),
"n_features": X_train.shape[1],
"target_positive_rate": float(y_train.mean()),
})
# ---- 3. TRAINING ----
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ---- 4. METRICS: step-by-step durante training ----
# Logga la loss per ogni stage del GBM (equivalente all'epoch loss)
train_scores = list(model.staged_predict(X_train))
val_scores = list(model.staged_predict(X_val))
for step, (tr_pred, val_pred) in enumerate(zip(train_scores, val_scores)):
train_acc = accuracy_score(y_train, tr_pred)
val_acc = accuracy_score(y_val, val_pred)
mlflow.log_metrics({
"train_accuracy_step": train_acc,
"val_accuracy_step": val_acc,
}, step=step)
# ---- 5. METRICS FINALI ----
y_pred = model.predict(X_val)
y_prob = model.predict_proba(X_val)[:, 1]
final_metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"f1_score": f1_score(y_val, y_pred),
"auc_roc": roc_auc_score(y_val, y_prob),
"precision": float(np.mean(y_pred[y_pred == 1] == y_val[y_pred == 1])) if sum(y_pred) > 0 else 0.0,
"recall": float(sum((y_pred == 1) & (y_val == 1)) / sum(y_val == 1)),
}
mlflow.log_metrics(final_metrics)
# ---- 6. ARTIFACTS: file del modello e report ----
# Salva e logga confusion matrix come immagine
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_val, y_pred)
im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
ax.set_title('Confusion Matrix - Churn Prediction')
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
plt.colorbar(im)
plt.tight_layout()
mlflow.log_figure(fig, "confusion_matrix.png")
plt.close()
# Salva ROC curve
fig2, ax2 = plt.subplots(figsize=(6, 5))
RocCurveDisplay.from_predictions(y_val, y_prob, ax=ax2)
ax2.set_title('ROC Curve - Churn Model')
mlflow.log_figure(fig2, "roc_curve.png")
plt.close()
# Logga classification report come file di testo
report = classification_report(y_val, y_pred, target_names=["No Churn", "Churn"])
mlflow.log_text(report, "classification_report.txt")
# Logga feature importance come CSV
feature_imp = pd.DataFrame({
"feature": X_train.columns.tolist(),
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
mlflow.log_table(feature_imp.to_dict(orient="list"), "feature_importance.json")
# ---- 7. LOG MODEL: salva il modello con firma e input example ----
input_example = X_val.head(3)
signature = mlflow.models.infer_signature(X_val, y_pred)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="churn-gbm-model", # Registra automaticamente nel Registry
)
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {final_metrics['accuracy']:.4f}")
print(f"AUC-ROC: {final_metrics['auc_roc']:.4f}")
print(f"UI: http://localhost:5000/#/experiments/{experiment.experiment_id}/runs/{run.info.run_id}")
return run.info.run_id
# ==================== Esempio di utilizzo ====================
if __name__ == "__main__":
# Dati sintetici per demo
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_informative=10, random_state=42)
X_df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(20)])
X_train, X_val, y_train, y_val = train_test_split(X_df, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 300,
"learning_rate": 0.05,
"max_depth": 4,
"subsample": 0.8,
"min_samples_split": 20,
"random_state": 42,
}
run_id = train_churn_model(X_train, X_val, y_train, y_val, params)
Nested Runs e Hyperparameter Search
MLflow supporta i nested runs: run figli all'interno di un run padre.
Questo pattern e ideale per hyperparameter search (Optuna, GridSearchCV) dove si vuole
un run padre che rappresenta la ricerca complessiva e tanti run figli, uno per ogni
configurazione testata:
import mlflow
import optuna
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
def hyperparameter_search(X_train, y_train, n_trials: int = 50) -> str:
"""
Hyperparameter search con Optuna + MLflow nested runs.
Run padre: contiene il summary della ricerca
Run figli: ogni trial Optuna e un run MLflow figlio
"""
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn-hparam-search")
with mlflow.start_run(run_name="optuna-search-v1") as parent_run:
mlflow.set_tag("search_method", "optuna-tpe")
mlflow.log_param("n_trials", n_trials)
mlflow.log_param("optimization_metric", "auc_roc")
best_auc = 0.0
best_params = {}
def objective(trial) -> float:
"""Funzione obiettivo Optuna - ogni trial e un nested run MLflow."""
params = {
"n_estimators": trial.suggest_int("n_estimators", 100, 1000),
"learning_rate": trial.suggest_float("learning_rate", 0.001, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 8),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"min_samples_split": trial.suggest_int("min_samples_split", 5, 50),
}
# Ogni trial ottiene il suo run MLflow figlio
with mlflow.start_run(
run_name=f"trial-{trial.number}",
nested=True # <-- indica che e un run figlio
) as child_run:
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=3, scoring="roc_auc")
auc_mean = scores.mean()
auc_std = scores.std()
mlflow.log_metrics({
"cv_auc_mean": auc_mean,
"cv_auc_std": auc_std,
"trial_number": trial.number,
})
return auc_mean
# Esegui la ricerca Optuna
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective, n_trials=n_trials, n_jobs=1)
# Logga i risultati della ricerca nel run padre
best_trial = study.best_trial
mlflow.log_params({f"best_{k}": v for k, v in best_trial.params.items()})
mlflow.log_metrics({
"best_auc": best_trial.value,
"n_trials_completed": len(study.trials),
})
print(f"Miglior AUC: {best_trial.value:.4f}")
print(f"Migliori parametri: {best_trial.params}")
return parent_run.info.run_id
Autologging: Zero-Config Tracking
L'autologging e una delle funzionalità più comode di MLflow: con una singola riga di codice,
MLflow intercetta automaticamente le chiamate ai principali framework ML e logga parametri,
metriche e artefatti senza alcuna modifica al codice di training. E supportato da scikit-learn,
XGBoost, LightGBM, PyTorch Lightning, TensorFlow/Keras, Spark MLlib e altri.
import mlflow
import mlflow.sklearn
import mlflow.xgboost
# ==================== Autologging scikit-learn ====================
# Abilita autologging per scikit-learn
# Logga automaticamente: hyperparametri, metriche di training, signature del modello
mlflow.sklearn.autolog(
log_input_examples=True, # Logga esempi di input
log_model_signatures=True, # Inferisce e logga la firma del modello
log_models=True, # Salva il modello come artefatto
log_datasets=False, # Non loggare l'intero dataset (troppo grande)
max_tuning_runs=100, # Per GridSearchCV: max run tracciati
exclusive=False, # Permette log manuali in aggiunta
)
mlflow.set_experiment("autolog-demo")
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
# Con autolog attivo, fit() logga tutto automaticamente
with mlflow.start_run(run_name="rf-gridsearch-autolog"):
param_grid = {
"n_estimators": [100, 300],
"max_depth": [4, 6, 8],
"min_samples_split": [10, 20],
}
model = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=3,
scoring="roc_auc",
n_jobs=-1
)
model.fit(X_train, y_train)
# MLflow ha loggato automaticamente:
# - Tutti i parametri del RandomForest
# - cv=3, scoring, n_jobs
# - Best params dal GridSearchCV
# - Score di validazione incrociata
# - Il modello come artefatto
# ==================== Autologging XGBoost ====================
mlflow.xgboost.autolog(
importance_types=["gain", "weight"], # Logga feature importance
log_model_signatures=True,
log_input_examples=True,
)
import xgboost as xgb
import numpy as np
with mlflow.start_run(run_name="xgb-autolog"):
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
params = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "auc"],
"learning_rate": 0.05,
"max_depth": 6,
"n_estimators": 500,
"subsample": 0.8,
"seed": 42,
}
# Autolog traccia: tutte le metriche per ogni boosting round
# e il modello finale, la feature importance
booster = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=50,
verbose_eval=False,
)
# ==================== Autologging PyTorch Lightning ====================
mlflow.pytorch.autolog(
every_n_iter=10, # Logga ogni 10 iterazioni
log_models=True,
checkpoint_monitor="val_loss",
)
# Con PyTorch Lightning, chiama semplicemente trainer.fit()
# e MLflow cattura automaticamente tutte le loss e le metriche
Limitazioni dell'Autologging
L'autologging e comodo per prototipazione rapida, ma ha alcune limitazioni in produzione:
non logga metriche custom definite dall'utente, non logga informazioni sul dataset
(dimensioni, versione, distribuzione delle classi), non gestisce le dipendenze tra
esperimenti. Per pipeline MLOps mature, e consigliato usare l'autologging come base
e aggiungere chiamate manuali a mlflow.log_param(), mlflow.log_metric()
e mlflow.log_artifact() per le informazioni specifiche del dominio.
MLflow Model Registry: Ciclo di Vita dei Modelli
Il Model Registry di MLflow e il componente che trasforma MLflow da semplice strumento
di tracking a vera piattaforma MLOps. Permette di gestire le versioni dei modelli attraverso
un ciclo di vita standardizzato: da sviluppo a staging a produzione,
con audit trail, annotazioni e notifiche.
Il Registry e accessibile sia tramite UI (drag & drop per cambiare stage) sia tramite
l'API Python, essenziale per automazione in CI/CD. Con MLflow 3, il Registry e stato
arricchito con webhooks per notifiche automatiche al passaggio di stage.
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry import ModelVersion
import time
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
MODEL_NAME = "churn-gbm-model"
# ==================== 1. REGISTRARE UN MODELLO ====================
# Metodo A: durante il log_model (più comune)
with mlflow.start_run() as run:
# ... training ...
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=MODEL_NAME,
)
# Il modello viene automaticamente registrato come versione 1
# con stage "None" (development)
# Metodo B: da un run esistente tramite URI
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
version = mlflow.register_model(
model_uri=model_uri,
name=MODEL_NAME,
tags={"team": "ml-eng", "algorithm": "gbm"}
)
print(f"Registrato: {MODEL_NAME} versione {version.version}")
# ==================== 2. GESTIRE LE VERSIONI E GLI STAGE ====================
# Aggiungi una descrizione alla versione
client.update_model_version(
name=MODEL_NAME,
version=version.version,
description=(
"GBM per churn prediction v2.1. "
"Accuracy: 0.9423, AUC-ROC: 0.9567. "
"Trainato su dataset 2024-01 to 2025-01, 45k samples."
)
)
# Promuovi a Staging (dopo validazione interna)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Staging",
archive_existing_versions=False, # Mantieni altre versioni staging
)
print(f"Modello v{version.version} promosso a Staging")
# Aggiungi tag per tracciabilita
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validated_by",
value="alice.rossi@company.com"
)
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validation_date",
value="2025-11-15"
)
# Promuovi a Production (dopo approvazione)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Production",
archive_existing_versions=True, # Archivia la versione Production precedente
)
print(f"Modello v{version.version} in produzione!")
# ==================== 3. CARICARE IL MODELLO IN PRODUZIONE ====================
def load_production_model(model_name: str):
"""Carica sempre la versione Production dal registry."""
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
# In uno script di inference o serving
model = load_production_model(MODEL_NAME)
predictions = model.predict(new_data)
# ==================== 4. INTERROGARE IL REGISTRY ====================
# Lista tutte le versioni di un modello
versions = client.search_model_versions(f"name='{MODEL_NAME}'")
for v in versions:
print(f"v{v.version} | Stage: {v.current_stage} | Run: {v.run_id[:8]}...")
# Cerca solo versioni in Production
prod_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
if prod_versions:
latest_prod = prod_versions[0]
print(f"Versione in produzione: v{latest_prod.version}")
print(f"Run ID: {latest_prod.run_id}")
# Ottieni tutte le metriche associate alla versione in produzione
run_data = client.get_run(latest_prod.run_id).data
print(f"AUC-ROC in produzione: {run_data.metrics.get('auc_roc', 'N/A')}")
# ==================== 5. ROLLBACK IN CASO DI PROBLEMA ====================
def rollback_to_previous_production(model_name: str) -> None:
"""
Rollback: archivia la versione Production attuale e
ripristina la versione Archived più recente.
"""
# Trova versione corrente in Production
current_prod = client.get_latest_versions(model_name, stages=["Production"])
if not current_prod:
print("Nessuna versione in produzione trovata")
return
# Trova la versione Archived più recente (precedente Production)
archived = client.search_model_versions(
f"name='{model_name}'",
filter_string="tags.stage_history LIKE '%production%'",
)
if len(archived) < 2:
print("Nessuna versione archiviata disponibile per rollback")
return
# Archivia la versione problematica
client.transition_model_version_stage(
name=model_name,
version=current_prod[0].version,
stage="Archived",
)
# Promuovi la versione precedente
prev_version = archived[1].version
client.transition_model_version_stage(
name=model_name,
version=prev_version,
stage="Production",
)
print(f"Rollback completato: ora in produzione v{prev_version}")
Model Serving con MLflow
MLflow include un server di serving integrato che espone qualsiasi modello registrato
come REST API con un singolo comando. E una soluzione eccellente per prototipazione
e ambienti di sviluppo. Per produzione a scala, si consiglia di integrare il modello
MLflow con FastAPI (vedi il prossimo articolo della serie).
# ==================== Serving via CLI ====================
# Servi l'ultima versione Production del modello
mlflow models serve \
--model-uri "models:/churn-gbm-model/Production" \
--host 0.0.0.0 \
--port 8080 \
--env-manager conda
# Servi un run specifico
mlflow models serve \
--model-uri "runs:/abc123def456/model" \
--port 8080
# Con Docker (raccomandato per produzione)
mlflow models build-docker \
--model-uri "models:/churn-gbm-model/Production" \
--name "churn-model-server" \
--enable-mlserver # Usa MLServer per performance migliori
docker run -p 8080:8080 churn-model-server
# ==================== Test del Serving ====================
# Il server espone l'endpoint /invocations
import requests
import json
import pandas as pd
# Prepara i dati di input nel formato atteso da MLflow
test_data = pd.DataFrame({
"feature_0": [0.5, -1.2],
"feature_1": [1.3, 0.8],
# ... altri 18 features
})
# MLflow accetta JSON in formato "split" o "records"
payload = {
"dataframe_split": {
"columns": test_data.columns.tolist(),
"data": test_data.values.tolist()
}
}
response = requests.post(
"http://localhost:8080/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
predictions = response.json()
print(f"Predizioni: {predictions}")
# {"predictions": [0, 1]}
Integrazione MLflow + FastAPI per Produzione
Per un serving di produzione robusto, la pratica migliore e caricare il modello dal
Model Registry di MLflow all'avvio dell'applicazione FastAPI, e aggiornarlo
automaticamente quando viene promossa una nuova versione Production:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np
import threading
import time
import logging
from typing import List
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API", version="2.0.0")
# ==================== Model Manager con Auto-Refresh ====================
class MLflowModelManager:
"""
Gestisce il caricamento e il refresh automatico del modello
dal MLflow Model Registry.
"""
def __init__(self, model_name: str, tracking_uri: str, refresh_interval: int = 300):
self.model_name = model_name
self.tracking_uri = tracking_uri
self.refresh_interval = refresh_interval # secondi
self._model = None
self._model_version = None
self._lock = threading.Lock()
mlflow.set_tracking_uri(tracking_uri)
self._load_model()
self._start_refresh_thread()
def _load_model(self) -> None:
"""Carica la versione Production corrente dal registry."""
try:
model_uri = f"models:/{self.model_name}/Production"
new_model = mlflow.sklearn.load_model(model_uri)
# Ottieni il numero di versione
client = mlflow.MlflowClient()
versions = client.get_latest_versions(self.model_name, stages=["Production"])
version = versions[0].version if versions else "unknown"
with self._lock:
self._model = new_model
self._model_version = version
logger.info(f"Modello {self.model_name} v{version} caricato dal registry")
except Exception as e:
logger.error(f"Errore nel caricamento del modello: {e}")
def _start_refresh_thread(self) -> None:
"""Avvia thread background per refresh periodico del modello."""
def refresh_loop():
while True:
time.sleep(self.refresh_interval)
self._load_model()
thread = threading.Thread(target=refresh_loop, daemon=True)
thread.start()
def predict(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict(features)
def predict_proba(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict_proba(features)[:, 1]
@property
def model_version(self) -> str:
return self._model_version or "unknown"
# Istanza globale del manager
model_manager = MLflowModelManager(
model_name="churn-gbm-model",
tracking_uri="http://mlflow-server:5000",
refresh_interval=300, # Controlla nuove versioni ogni 5 minuti
)
# ==================== API Endpoints ====================
class PredictionRequest(BaseModel):
features: List[List[float]]
feature_names: List[str]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[float]
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest) -> PredictionResponse:
"""Endpoint di predizione churn."""
try:
df = pd.DataFrame(
request.features,
columns=request.feature_names
)
predictions = model_manager.predict(df).tolist()
probabilities = model_manager.predict_proba(df).tolist()
return PredictionResponse(
predictions=predictions,
probabilities=probabilities,
model_version=model_manager.model_version,
)
except Exception as e:
logger.error(f"Errore nella predizione: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_name": "churn-gbm-model",
"model_version": model_manager.model_version,
}
MLflow vs Alternative: W&B, Neptune, ClearML
MLflow non e l'unica opzione per l'experiment tracking. Il mercato offre diverse
alternative valide, ciascuna con punti di forza specifici. La scelta dipende da budget,
dimensione del team, infrastruttura esistente e requisiti di governance.
Confronto Completo Strumenti di Experiment Tracking
Dimensione
MLflow
W&B
Neptune
ClearML
Licenza
Open-source (Apache 2.0)
SaaS + self-host Enterprise
SaaS + self-host
Open-source + Enterprise
Costo (team 5 persone)
Gratis (self-hosted)
~250 USD/mese
~100 USD/mese
Gratis (self-hosted)
Setup
Docker in ~30 min
Zero config (SaaS)
Zero config (SaaS)
Complesso (ClearML Server)
UI/UX
Funzionale, non bella
Eccellente, ricca di grafici
Buona, molto customizzabile
Completa, curva di apprendimento alta
Autologging
Eccellente (20+ framework)
Eccellente (W&B SDK)
Buono
Automatico via monkey-patching
Model Registry
Integrato, staging workflow
W&B Model Registry
Model Registry disponibile
Model Repository integrato
Hyperparameter Sweep
Integrazione Optuna/Hyperopt
Sweeps nativo (eccellente)
Buono
HPO integrato
Governance/Compliance
Audit trail di base
Access control, team features
Team workspace
Avanzato (RBAC, audit)
GenAI/LLM Support
MLflow 3: tracing, evaluation
Prompts, LLM monitoring
LLM tracking
LLM Experiment tracking
Ideale per
Team con infrastruttura propria, PMI, self-hosted
Team che prioritizza UX e collaboration
Team con budget moderato
Enterprise con esigenze di automation
Quando Scegliere MLflow
MLflow e la scelta ottimale in questi scenari:
Budget limitato (<5K EUR/anno): self-hosting su singola VM costa circa 180 EUR/anno
Requisiti di data residency: dati sensibili che non possono uscire dall'infrastruttura aziendale
Integrazione con ecosistema Python esistente: MLflow si integra nativamente con scikit-learn, PyTorch, TensorFlow, XGBoost e oltre 20 framework
Compliance e audit (AI Act EU): accesso completo al database e agli artefatti, nessun lock-in SaaS
Team DevOps-oriented: MLflow e un container Docker come qualsiasi altro servizio
Interrogare ed Analizzare gli Esperimenti
Una delle funzionalità più preziose di MLflow e la possibilità di interrogare programmaticamente
tutti gli esperimenti per trovare il run migliore, confrontare run su più dimensioni, o estrarre
dati per report automatici:
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
# ==================== Ricerca di Run con Filtri ====================
# Trova tutti i run con AUC-ROC > 0.92 nel tuo experiment
runs = mlflow.search_runs(
experiment_names=["churn-prediction-gbm"],
filter_string="metrics.auc_roc > 0.92 and tags.environment = 'dev'",
order_by=["metrics.auc_roc DESC"],
max_results=20,
)
# Il risultato e un DataFrame pandas
print(runs[["run_id", "metrics.auc_roc", "metrics.f1_score",
"params.learning_rate", "params.max_depth"]].head())
# ==================== Confronto Run su Multiple Metriche ====================
def compare_top_runs(experiment_name: str, n: int = 5) -> pd.DataFrame:
"""Restituisce un DataFrame con i top N run per AUC-ROC."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=["metrics.auc_roc DESC"],
max_results=n,
)
# Seleziona le colonne più rilevanti
cols = [
"run_id",
"metrics.accuracy", "metrics.f1_score", "metrics.auc_roc",
"params.n_estimators", "params.learning_rate", "params.max_depth",
"tags.dataset_version", "start_time",
]
# Filtra solo colonne esistenti
existing_cols = [c for c in cols if c in runs.columns]
return runs[existing_cols].copy()
comparison_df = compare_top_runs("churn-prediction-gbm")
print(comparison_df.to_string(index=False))
# ==================== Trovare il Best Run e Caricarlo ====================
def get_best_run(experiment_name: str, metric: str = "metrics.auc_roc") -> dict:
"""Trova il run con la metrica migliore e restituisce run_id e metriche."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=[f"{metric} DESC"],
max_results=1,
)
if runs.empty:
raise ValueError(f"Nessun run trovato in {experiment_name}")
best = runs.iloc[0]
return {
"run_id": best["run_id"],
"auc_roc": best.get("metrics.auc_roc"),
"accuracy": best.get("metrics.accuracy"),
"f1_score": best.get("metrics.f1_score"),
}
best_run = get_best_run("churn-prediction-gbm")
print(f"Best run: {best_run['run_id']}, AUC-ROC: {best_run['auc_roc']:.4f}")
# Carica il modello dal best run
model = mlflow.sklearn.load_model(f"runs:/{best_run['run_id']}/model")
# ==================== Export Metriche per Report ====================
# Carica la history di una metrica step-by-step (es. validation loss per epoch)
run_id = "abc123def456"
metric_history = client.get_metric_history(run_id, "val_accuracy_step")
steps = [m.step for m in metric_history]
values = [m.value for m in metric_history]
accuracy_over_time = pd.DataFrame({"step": steps, "val_accuracy": values})
print(f"Training steps loggati: {len(accuracy_over_time)}")
Best Practices per MLflow in Produzione
1. Convenzioni di Naming
Una naming convention coerente rende gli esperimenti ricercabili e comprensibili mesi dopo:
# Schema naming raccomandato per experiments e run
# EXPERIMENT: [team]-[progetto]-[tipo]
# Esempi:
"ml-eng-churn-prediction-gbm"
"ml-eng-churn-prediction-neural-net"
"research-recommender-collaborative"
# RUN NAME: [algoritmo]-[key-param]-[data]
# Esempi:
"gbm-lr0.05-depth6-2025-11-15"
"xgb-v2-autofeat-2025-11-20"
"baseline-logistic-regression"
# Usa sempre tag per metadata strutturati
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"environment": "dev", # dev | staging | prod
"dataset_version": "v2.1",
"git_branch": git_branch,
"git_commit": git_commit[:8],
"triggered_by": "ci-cd", # manual | ci-cd | scheduled
"approved_by": "", # Compilato prima della promozione
})
2. Struttura del Codice MLflow
from contextlib import contextmanager
import mlflow
import functools
# Pattern: decorator per tracking automatico
def mlflow_run(experiment_name: str, run_name: str = None, tags: dict = None):
"""Decorator che wrappa una funzione in un MLflow run."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name or func.__name__) as run:
if tags:
mlflow.set_tags(tags)
result = func(*args, **kwargs)
return result
return wrapper
return decorator
# Uso del decorator
@mlflow_run(
experiment_name="churn-prediction-gbm",
run_name="training-v2",
tags={"team": "ml-eng", "version": "v2"}
)
def train_model(X_train, y_train, params: dict):
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ... metriche e artefatti
return model
# Pattern: context manager per setup/teardown
@contextmanager
def mlflow_experiment(experiment_name: str, run_name: str):
"""Context manager con gestione errori."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name) as run:
try:
mlflow.set_tag("status", "running")
yield run
mlflow.set_tag("status", "success")
except Exception as e:
mlflow.set_tag("status", "failed")
mlflow.set_tag("error", str(e))
raise
# Uso
with mlflow_experiment("churn-prediction", "training-run-v3") as run:
mlflow.log_params({"n_estimators": 300})
# ... training
mlflow.log_metric("accuracy", 0.94)
3. Integrazione con GitHub Actions per CI/CD
# .github/workflows/ml-experiment.yml
name: ML Experiment + Model Promotion
on:
push:
branches: [main]
paths:
- 'src/models/**'
- 'src/features/**'
- 'params.yaml'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
env:
MLFLOW_TRACKING_URI: #123;{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: #123;{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: #123;{ secrets.AWS_SECRET_ACCESS_KEY }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training
id: training
run: |
# Script di training che logga su MLflow e stampa il run_id
RUN_ID=$(python src/models/train.py --output-run-id)
echo "run_id=$RUN_ID" >> $GITHUB_OUTPUT
- name: Evaluate and check metrics
id: evaluation
run: |
python scripts/ci_evaluate.py \
--run-id #123;{ steps.training.outputs.run_id }} \
--min-auc 0.92 \
--min-accuracy 0.90
- name: Promote to Staging
if: success()
run: |
python scripts/promote_model.py \
--run-id #123;{ steps.training.outputs.run_id }} \
--model-name churn-gbm-model \
--target-stage Staging
- name: Notify team
if: success()
uses: slackapi/slack-github-action@v1.24.0
with:
payload: |
{
"text": "Nuovo modello promosso a Staging: run #123;{ steps.training.outputs.run_id }}"
}
env:
SLACK_WEBHOOK_URL: #123;{ secrets.SLACK_WEBHOOK_URL }}
Anti-Patterns da Evitare
Usare mlflow.log_metric() con chiavi diverse per la stessa metrica:
accuracy e acc sono due metriche separate per MLflow.
Definisci un dizionario di nomi canonici e usalo sempre.
Non chiudere i run attivi: se il codice crasha dentro un run attivo
senza il context manager with mlflow.start_run(), il run rimane
nello stato "RUNNING" indefinitamente. Usa sempre il context manager.
Loggare artefatti enormi come CSV: MLflow non e un data lake.
Per dataset grandi, logga solo i metadata (path DVC, hash, dimensioni) e usa
DVC per il versioning dei dati.
Usare SQLite in produzione con più utenti: SQLite non supporta
scritture concorrenti. Con due processi di training paralleli, si verificano lock
errors. Usa PostgreSQL o MySQL per qualsiasi setup multi-utente.
Non loggare la versione del dataset: i parametri del modello
senza la versione dei dati non bastano per la riproducibilità. Logga sempre il
commit Git, il tag DVC e le dimensioni del dataset.
Promuovere a Production senza promozione intermedia Staging:
il workflow Staging intermediario permette test di integrazione e validazione
del team prima del deployment in produzione. Non saltare questo step.
Le Novità di MLflow 3: Verso il GenAI e gli Agent
Con il rilascio di MLflow 3 nel giugno 2025, la piattaforma ha fatto un salto evolutivo
significativo orientato al mondo GenAI. Le novità più rilevanti per chi lavora con modelli
ML tradizionali e con LLM:
LoggedModel come entità di prima classe: il modello non e più
solo un artefatto di un run. I LoggedModel persistono attraverso run, ambienti e
deployment, con lineage completa verso metriche, parametri, tracce e dati di valutazione.
Performance migliorate del 25%: MLflow 3.x ha ottimizzato le query
al database e ridotto l'overhead del logging, con throughput di log superiore del
25% rispetto alla versione 2.5 (benchmark 2026).
GenAI Tracing: tracing automatico per LLM, chain, tool call e agenti
con supporto per LangChain, LlamaIndex, OpenAI SDK, Anthropic e altri.
Feedback Collection API: raccolta strutturata di feedback umano sui
risultati dei modelli, integrata con la UI per revisione e valutazione.
Evaluation Framework evoluto: mlflow.evaluate() ora
supporta metriche custom, LLM-as-judge, e comparazione automatica di modelli.
Conclusioni e Prossimi Passi
MLflow si e consolidato come lo strumento di experiment tracking più diffuso nell'ecosistema
ML open-source, con una community attiva e un'evoluzione costante. La combinazione di
tracking, Model Registry e serving in un'unica piattaforma self-hosted lo rende la scelta
naturale per team che vogliono controllo completo sulla propria infrastruttura MLOps senza
i costi delle soluzioni SaaS.
Il workflow che abbiamo visto in questo articolo, dalla registrazione degli esperimenti
alla promozione in produzione tramite il Model Registry, copre il 90% dei casi d'uso
reali di un team ML. Integrandolo con DVC per il versioning dei dati (articolo precedente)
e con GitHub Actions per l'automazione CI/CD, si ottiene un sistema MLOps completo e
professionale con un budget inferiore a 250 EUR/anno.
Nel prossimo articolo affronteremo uno dei problemi più insidiosi del machine learning
in produzione: il Model Drift. Vedremo come rilevare il degrado delle
performance nel tempo (data drift, concept drift, prediction drift) e come implementare
sistemi di retraining automatico con alert su Grafana e Prometheus.