Urmărirea experimentelor cu MLflow: Ghid complet
Ați pierdut vreodată ore în căutarea combinației de hiperparametri care a produs acel rezultat excelent de acum trei săptămâni? Sau te-ai trezit să te întrebi de ce modelul este în producție Se comportă diferit față de ceea ce ați testat la nivel local? Aceste probleme sunt foarte frecvente în ciclul de viață al învățării automate, au o rădăcină comună: lipsa unui sistem structurat de urmărirea experimentală.
MLflow și cel mai popular răspuns open-source la această problemă. Născut în Databricks în 2018 a devenit un proiect Apache în 2019, MLflow s-a impus ca standardul de facto pentru urmărirea experimentelor ML în ecosistemul Python. Cu eliberare de MLflow 3 în iunie 2025 în timpul Databricks Data + AI Summit, platforma a făcut un salt evolutiv semnificativ: de la instrument de urmărire la platformă unificată pentru dezvoltarea, evaluarea și implementarea modelelor ML și GenAI, cu LoggedModel ca entitate de primă clasă și performanța de înregistrare îmbunătățită cu 25% comparativ cu versiunea 2.5.
În acest ghid vom vedea MLflow end-to-end: de la instalare la urmărirea avansată, de la înregistrarea automată la Model Registry, la servirea modelului și integrarea Docker. Fiecare exemplu este testat și gata pentru utilizare în producție.
Ce veți învăța în acest articol
- Arhitectura MLflow: server de urmărire, magazin backend, magazin de artefacte și noutăți în MLflow 3
- Configurare și producție locală: SQLite, PostgreSQL, S3 ca magazin de artefacte
- Urmărirea completă a experimentului: parametri de jurnal, valori, artefacte, etichete și executări imbricate
- Înregistrare automată: integrare zero-config cu scikit-learn, XGBoost, PyTorch, TensorFlow
- Registrul de modele: gestionarea ciclului de viață al modelului, producție, arhivă și model
- Servirea modelului cu MLflow: API REST, container Docker, integrare FastAPI
- MLflow cu Docker Compose pentru implementarea în producție
- Comparație cu alternative: W&B, Neptune, ClearML - când să alegi ce
- Cele mai bune practici și anti-modele pentru echipele ML de toate dimensiunile
Seria MLOps și Machine Learning în producție
| # | Articol | Concentrează-te |
|---|---|---|
| 1 | MLOps: de la experiment la producție | Bazele și ciclul de viață complet |
| 2 | Conducta ML cu CI/CD | GitHub Actions și Docker pentru ML |
| 3 | Versiune DVC vs LakeFS | Seturi de date și versiunea modelului |
| 4 | Sunteți aici - Urmărirea experimentelor cu MLflow | Urmărire, înregistrare, servire |
| 5 | Detectarea derivei modelului | Monitorizare automată și recalificare |
| 6 | Servire cu FastAPI + Uvicorn | Implementarea modelelor în producție |
| 7 | Scalare ML pe Kubernetes | KubeFlow și Seldon Core |
| 8 | Testarea A/B a modelelor ML | Metodologie și implementare |
| 9 | Guvernarea ML | Conformitate, AI Act UE, etică |
| 10 | Studiu de caz: Predicția abandonului | Conductă de la capăt la capăt în producție |
Arhitectura MLflow: cele patru componente fundamentale
Înainte de a scrie o linie de cod, este important să înțelegeți elementele de bază ale MLflow. Platforma este formată din patru componente principale, fiecare cu un rol specific în ciclul de viață al învățării automate:
- Urmărire MLflow: API-ul și interfața de utilizare pentru înregistrarea în jurnal și interogarea experimentelor. Înregistrați parametrii, valorile, etichetele, artefactele și notele pentru fiecare cursă de antrenament.
- Proiecte MLflow: format pentru ambalarea codului ML în execuții reproductibile, cu managementul dependenței și al mediului prin Conda sau Docker.
- Modele MLflow: Format standard pentru salvarea șabloanelor, astfel încât acestea să poată face să fie deservite de mai multe cadre (funcția Python, API REST, Spark UDF etc.).
- Registrul modelului MLflow: magazin centralizat pentru gestionarea ciclului de viață de modele: versiuni, montaj, producție, arhivare și pistă de audit.
Cu MLflow 3 (2025) se adaugă un al cincilea element fundamental: conceptul de LoggedModel ca o entitate de primă clasă. În loc de abordarea anterioară centrat pe rulare (unde modelul a fost doar un artefact al unei rulări), LoggedModels persistă în mai multe rulări, medii și implementări, cu o filiație completă a parametrilor, metrici, urme și date de evaluare.
Arhitectura de stocare MLflow
| Componentă | Ce Conține | Backend recomandat |
|---|---|---|
| Magazin de backend | Rulați parametri, valori, etichete, metadate | PostgreSQL/MySQL (producție), SQLite (local) |
| Magazin de artefacte | Fișiere model, imagini, CSV, date de evaluare | S3, GCS, Azure Blob (producție), sistem de fișiere local |
| Server de urmărire | API-ul REST pentru înregistrare și interfață de utilizare web | Containere cu pod Docker/EC2/Kubernetes |
| Registrul modelului | Lansări de șabloane, etape, adnotări, webhook-uri | Necesită baza de date (nu funcționează cu sistemele de fișiere) |
Configurare MLflow: de la local la producție
Instalare și configurare locală
Instalarea de bază a MLflow necesită o singură comandă pip. Pentru dezvoltarea locală, MLflow folosește SQLite ca magazin de backend și sistemul de fișiere local ca magazin de artefacte: nu este nevoie de un server suplimentar.
# Installazione MLflow (versione 2.x/3.x)
pip install mlflow
# Con extras per integrazioni specifiche
pip install mlflow[extras] # scikit-learn, XGBoost, LightGBM
pip install mlflow[databricks] # integrazione Databricks
pip install mlflow[genai] # tools per GenAI e LLM (MLflow 3+)
# Verifica installazione
mlflow --version
# mlflow, version 2.x.x o 3.x.x
# Avvia la UI locale (usa ./mlruns come storage)
mlflow ui
# UI disponibile su http://localhost:5000
# Avvia con database SQLite
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 \
--port 5000
Configurare producție: PostgreSQL + S3
Pentru mediile de producție cu utilizatori multipli și concurență ridicată, este esențial să se utilizeze o bază de date relațională ca backend și o stocare de obiecte ca depozit de artefacte. Modelul MLflow Registry necesită o bază de date (nu funcționează cu sistemele de fișiere):
# ==================== docker-compose.yml ====================
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlflow"]
interval: 10s
timeout: 5s
retries: 5
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${MINIO_SECRET_KEY}
volumes:
- minio_data:/data
ports:
- "9000:9000"
- "9001:9001"
mlflow:
image: ghcr.io/mlflow/mlflow:v2.19.0
depends_on:
postgres:
condition: service_healthy
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
MLFLOW_ARTIFACT_ROOT: s3://mlflow-artifacts/
AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY}
MLFLOW_S3_ENDPOINT_URL: http://minio:9000
command: >
mlflow server
--backend-store-uri postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/
--host 0.0.0.0
--port 5000
ports:
- "5000:5000"
volumes:
postgres_data:
minio_data:
# .env file (NON commitare in Git!)
POSTGRES_PASSWORD=sicura_password_123
MINIO_ACCESS_KEY=minio_admin
MINIO_SECRET_KEY=minio_password_sicura
# Avvio dello stack completo
docker-compose up -d
# Verifica che MLflow sia in ascolto
curl http://localhost:5000/health
# {"status": "OK"}
# Crea il bucket MinIO per gli artefatti
docker exec -it minio_container mc alias set local http://localhost:9000 admin password
docker exec -it minio_container mc mb local/mlflow-artifacts
Buget <5K EUR/an pentru IMM-uri
Pentru echipele cu buget limitat, această configurare cu Docker Compose pe o singură VM costă în jur de 180 EUR/an (EC2 t3.mic sau echivalent). MinIO înlocuiește S3 local cu API-uri complet compatibile. Pentru stocarea persistentă puteți utiliza și AWS S3 (aproximativ 2-5 EUR/lună pentru câțiva GB de artefacte). Economii comparativ cu platformele SaaS precum W&B Teams (50+ USD/utilizator/lună) și semnificativ: o echipă de 5 persoane economisiți peste 2500 EUR/an.
Urmărirea experimentului: parametri jurnal, valori și artefacte
Inima MLflow și API-ul de urmărire. Fiecare apel către mlflow.start_run()
creați unul nou alerga în interiorul unui experiment. A
Experimentul grupează execuții asociate (de exemplu, toate execuțiile pentru modelul de predicție a abandonului).
Rulează jurnalul de patru tipuri de date:
- Parametrii: valori fixe pentru rulare (hiperparametri, configurație)
- Metrici: valori numerice care pot varia în timp (pierdere, precizie pe epocă)
- Artefacte: fișiere arbitrare (șabloane, imagini, seturi de date, rapoarte HTML)
- Etichete: metadate cheie-valoare pentru adnotarea și filtrarea rulărilor
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
accuracy_score, f1_score, roc_auc_score,
confusion_matrix, classification_report, RocCurveDisplay
)
from sklearn.model_selection import train_test_split
import os
# ==================== Configurazione MLflow ====================
# Connessione al tracking server (locale o remoto)
mlflow.set_tracking_uri("http://localhost:5000")
# Crea o usa un experiment esistente
experiment_name = "churn-prediction-gbm"
mlflow.set_experiment(experiment_name)
# Ottieni informazioni sull'experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
print(f"Experiment ID: {experiment.experiment_id}")
# ==================== Training con Tracking Completo ====================
def train_churn_model(X_train, X_val, y_train, y_val, params: dict) -> str:
"""
Allena un GBM per churn prediction con tracking MLflow completo.
Restituisce il run_id del run MLflow.
"""
with mlflow.start_run(run_name=f"gbm-lr{params['learning_rate']}-depth{params['max_depth']}") as run:
# ---- 1. TAG: metadata del run ----
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"dataset_version": "v2.1",
"git_commit": os.popen("git rev-parse HEAD").read().strip(),
"environment": "dev",
})
# ---- 2. PARAMS: iperparametri e configurazione ----
mlflow.log_params(params)
mlflow.log_params({
"train_size": len(X_train),
"val_size": len(X_val),
"n_features": X_train.shape[1],
"target_positive_rate": float(y_train.mean()),
})
# ---- 3. TRAINING ----
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ---- 4. METRICS: step-by-step durante training ----
# Logga la loss per ogni stage del GBM (equivalente all'epoch loss)
train_scores = list(model.staged_predict(X_train))
val_scores = list(model.staged_predict(X_val))
for step, (tr_pred, val_pred) in enumerate(zip(train_scores, val_scores)):
train_acc = accuracy_score(y_train, tr_pred)
val_acc = accuracy_score(y_val, val_pred)
mlflow.log_metrics({
"train_accuracy_step": train_acc,
"val_accuracy_step": val_acc,
}, step=step)
# ---- 5. METRICS FINALI ----
y_pred = model.predict(X_val)
y_prob = model.predict_proba(X_val)[:, 1]
final_metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"f1_score": f1_score(y_val, y_pred),
"auc_roc": roc_auc_score(y_val, y_prob),
"precision": float(np.mean(y_pred[y_pred == 1] == y_val[y_pred == 1])) if sum(y_pred) > 0 else 0.0,
"recall": float(sum((y_pred == 1) & (y_val == 1)) / sum(y_val == 1)),
}
mlflow.log_metrics(final_metrics)
# ---- 6. ARTIFACTS: file del modello e report ----
# Salva e logga confusion matrix come immagine
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_val, y_pred)
im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
ax.set_title('Confusion Matrix - Churn Prediction')
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
plt.colorbar(im)
plt.tight_layout()
mlflow.log_figure(fig, "confusion_matrix.png")
plt.close()
# Salva ROC curve
fig2, ax2 = plt.subplots(figsize=(6, 5))
RocCurveDisplay.from_predictions(y_val, y_prob, ax=ax2)
ax2.set_title('ROC Curve - Churn Model')
mlflow.log_figure(fig2, "roc_curve.png")
plt.close()
# Logga classification report come file di testo
report = classification_report(y_val, y_pred, target_names=["No Churn", "Churn"])
mlflow.log_text(report, "classification_report.txt")
# Logga feature importance come CSV
feature_imp = pd.DataFrame({
"feature": X_train.columns.tolist(),
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
mlflow.log_table(feature_imp.to_dict(orient="list"), "feature_importance.json")
# ---- 7. LOG MODEL: salva il modello con firma e input example ----
input_example = X_val.head(3)
signature = mlflow.models.infer_signature(X_val, y_pred)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="churn-gbm-model", # Registra automaticamente nel Registry
)
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {final_metrics['accuracy']:.4f}")
print(f"AUC-ROC: {final_metrics['auc_roc']:.4f}")
print(f"UI: http://localhost:5000/#/experiments/{experiment.experiment_id}/runs/{run.info.run_id}")
return run.info.run_id
# ==================== Esempio di utilizzo ====================
if __name__ == "__main__":
# Dati sintetici per demo
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_informative=10, random_state=42)
X_df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(20)])
X_train, X_val, y_train, y_val = train_test_split(X_df, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 300,
"learning_rate": 0.05,
"max_depth": 4,
"subsample": 0.8,
"min_samples_split": 20,
"random_state": 42,
}
run_id = train_churn_model(X_train, X_val, y_train, y_val, params)
Execuții imbricate și căutare cu hiperparametri
MLflow acceptă i alergări imbricate: Copilul aleargă într-un parinte. Acest model este ideal pentru căutarea cu hiperparametri (Optuna, GridSearchCV) oriunde doriți o rulare părinte reprezentând căutarea generală și multe rulări secundare, câte una pentru fiecare configuratie testata:
import mlflow
import optuna
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
def hyperparameter_search(X_train, y_train, n_trials: int = 50) -> str:
"""
Hyperparameter search con Optuna + MLflow nested runs.
Run padre: contiene il summary della ricerca
Run figli: ogni trial Optuna e un run MLflow figlio
"""
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn-hparam-search")
with mlflow.start_run(run_name="optuna-search-v1") as parent_run:
mlflow.set_tag("search_method", "optuna-tpe")
mlflow.log_param("n_trials", n_trials)
mlflow.log_param("optimization_metric", "auc_roc")
best_auc = 0.0
best_params = {}
def objective(trial) -> float:
"""Funzione obiettivo Optuna - ogni trial e un nested run MLflow."""
params = {
"n_estimators": trial.suggest_int("n_estimators", 100, 1000),
"learning_rate": trial.suggest_float("learning_rate", 0.001, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 8),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"min_samples_split": trial.suggest_int("min_samples_split", 5, 50),
}
# Ogni trial ottiene il suo run MLflow figlio
with mlflow.start_run(
run_name=f"trial-{trial.number}",
nested=True # <-- indica che e un run figlio
) as child_run:
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=3, scoring="roc_auc")
auc_mean = scores.mean()
auc_std = scores.std()
mlflow.log_metrics({
"cv_auc_mean": auc_mean,
"cv_auc_std": auc_std,
"trial_number": trial.number,
})
return auc_mean
# Esegui la ricerca Optuna
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective, n_trials=n_trials, n_jobs=1)
# Logga i risultati della ricerca nel run padre
best_trial = study.best_trial
mlflow.log_params({f"best_{k}": v for k, v in best_trial.params.items()})
mlflow.log_metrics({
"best_auc": best_trial.value,
"n_trials_completed": len(study.trials),
})
print(f"Miglior AUC: {best_trial.value:.4f}")
print(f"Migliori parametri: {best_trial.params}")
return parent_run.info.run_id
Înregistrare automată: Urmărire Zero-Config
Înregistrarea automată este una dintre cele mai convenabile caracteristici ale MLflow: cu o singură linie de cod, MLflow interceptează automat apelurile către principalele cadre ML și parametrii de jurnal, metrici și artefacte fără nicio modificare a codului de antrenament. Și susținut de scikit-learn, XGBoost, LightGBM, PyTorch Lightning, TensorFlow/Keras, Spark MLlib și altele.
import mlflow
import mlflow.sklearn
import mlflow.xgboost
# ==================== Autologging scikit-learn ====================
# Abilita autologging per scikit-learn
# Logga automaticamente: hyperparametri, metriche di training, signature del modello
mlflow.sklearn.autolog(
log_input_examples=True, # Logga esempi di input
log_model_signatures=True, # Inferisce e logga la firma del modello
log_models=True, # Salva il modello come artefatto
log_datasets=False, # Non loggare l'intero dataset (troppo grande)
max_tuning_runs=100, # Per GridSearchCV: max run tracciati
exclusive=False, # Permette log manuali in aggiunta
)
mlflow.set_experiment("autolog-demo")
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
# Con autolog attivo, fit() logga tutto automaticamente
with mlflow.start_run(run_name="rf-gridsearch-autolog"):
param_grid = {
"n_estimators": [100, 300],
"max_depth": [4, 6, 8],
"min_samples_split": [10, 20],
}
model = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=3,
scoring="roc_auc",
n_jobs=-1
)
model.fit(X_train, y_train)
# MLflow ha loggato automaticamente:
# - Tutti i parametri del RandomForest
# - cv=3, scoring, n_jobs
# - Best params dal GridSearchCV
# - Score di validazione incrociata
# - Il modello come artefatto
# ==================== Autologging XGBoost ====================
mlflow.xgboost.autolog(
importance_types=["gain", "weight"], # Logga feature importance
log_model_signatures=True,
log_input_examples=True,
)
import xgboost as xgb
import numpy as np
with mlflow.start_run(run_name="xgb-autolog"):
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
params = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "auc"],
"learning_rate": 0.05,
"max_depth": 6,
"n_estimators": 500,
"subsample": 0.8,
"seed": 42,
}
# Autolog traccia: tutte le metriche per ogni boosting round
# e il modello finale, la feature importance
booster = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=50,
verbose_eval=False,
)
# ==================== Autologging PyTorch Lightning ====================
mlflow.pytorch.autolog(
every_n_iter=10, # Logga ogni 10 iterazioni
log_models=True,
checkpoint_monitor="val_loss",
)
# Con PyTorch Lightning, chiama semplicemente trainer.fit()
# e MLflow cattura automaticamente tutte le loss e le metriche
Limitările înregistrării automate
Înregistrarea automată este convenabilă pentru prototiparea rapidă, dar are unele limitări în producție:
nu înregistrează valorile personalizate definite de utilizator, nu înregistrează informații despre setul de date
(dimensiune, versiune, distribuție de clasă), nu gestionează dependențele dintre
experimente. Pentru conductele MLOps mature, se recomandă utilizarea înregistrării automate ca bază
și adăugați apeluri manuale la mlflow.log_param(), mlflow.log_metric()
e mlflow.log_artifact() pentru informații specifice domeniului.
MLflow Model Registry: Model Lifecycle
MLflow Model Registry este componenta care transformă MLflow dintr-un instrument simplu urmărirea către o adevărată platformă MLOps. Vă permite să gestionați versiunile modelului prin un ciclu de viață standardizat: de la dezvoltare a punerea în scenă a producție, cu piste de audit, adnotări și notificări.
Registrul este accesibil atât prin interfața de utilizare (glisare și plasare pentru a schimba etapele) cât și prin intermediul API-ul Python, esențial pentru automatizarea CI/CD. Cu MLflow 3, Registrul este stat îmbogățit cu webhook-uri pentru notificări automate la schimbarea etapelor.
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry import ModelVersion
import time
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
MODEL_NAME = "churn-gbm-model"
# ==================== 1. REGISTRARE UN MODELLO ====================
# Metodo A: durante il log_model (più comune)
with mlflow.start_run() as run:
# ... training ...
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=MODEL_NAME,
)
# Il modello viene automaticamente registrato come versione 1
# con stage "None" (development)
# Metodo B: da un run esistente tramite URI
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
version = mlflow.register_model(
model_uri=model_uri,
name=MODEL_NAME,
tags={"team": "ml-eng", "algorithm": "gbm"}
)
print(f"Registrato: {MODEL_NAME} versione {version.version}")
# ==================== 2. GESTIRE LE VERSIONI E GLI STAGE ====================
# Aggiungi una descrizione alla versione
client.update_model_version(
name=MODEL_NAME,
version=version.version,
description=(
"GBM per churn prediction v2.1. "
"Accuracy: 0.9423, AUC-ROC: 0.9567. "
"Trainato su dataset 2024-01 to 2025-01, 45k samples."
)
)
# Promuovi a Staging (dopo validazione interna)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Staging",
archive_existing_versions=False, # Mantieni altre versioni staging
)
print(f"Modello v{version.version} promosso a Staging")
# Aggiungi tag per tracciabilita
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validated_by",
value="alice.rossi@company.com"
)
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validation_date",
value="2025-11-15"
)
# Promuovi a Production (dopo approvazione)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Production",
archive_existing_versions=True, # Archivia la versione Production precedente
)
print(f"Modello v{version.version} in produzione!")
# ==================== 3. CARICARE IL MODELLO IN PRODUZIONE ====================
def load_production_model(model_name: str):
"""Carica sempre la versione Production dal registry."""
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
# In uno script di inference o serving
model = load_production_model(MODEL_NAME)
predictions = model.predict(new_data)
# ==================== 4. INTERROGARE IL REGISTRY ====================
# Lista tutte le versioni di un modello
versions = client.search_model_versions(f"name='{MODEL_NAME}'")
for v in versions:
print(f"v{v.version} | Stage: {v.current_stage} | Run: {v.run_id[:8]}...")
# Cerca solo versioni in Production
prod_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
if prod_versions:
latest_prod = prod_versions[0]
print(f"Versione in produzione: v{latest_prod.version}")
print(f"Run ID: {latest_prod.run_id}")
# Ottieni tutte le metriche associate alla versione in produzione
run_data = client.get_run(latest_prod.run_id).data
print(f"AUC-ROC in produzione: {run_data.metrics.get('auc_roc', 'N/A')}")
# ==================== 5. ROLLBACK IN CASO DI PROBLEMA ====================
def rollback_to_previous_production(model_name: str) -> None:
"""
Rollback: archivia la versione Production attuale e
ripristina la versione Archived più recente.
"""
# Trova versione corrente in Production
current_prod = client.get_latest_versions(model_name, stages=["Production"])
if not current_prod:
print("Nessuna versione in produzione trovata")
return
# Trova la versione Archived più recente (precedente Production)
archived = client.search_model_versions(
f"name='{model_name}'",
filter_string="tags.stage_history LIKE '%production%'",
)
if len(archived) < 2:
print("Nessuna versione archiviata disponibile per rollback")
return
# Archivia la versione problematica
client.transition_model_version_stage(
name=model_name,
version=current_prod[0].version,
stage="Archived",
)
# Promuovi la versione precedente
prev_version = archived[1].version
client.transition_model_version_stage(
name=model_name,
version=prev_version,
stage="Production",
)
print(f"Rollback completato: ora in produzione v{prev_version}")
Servire model cu MLflow
MLflow include un server de servire încorporat care expune orice model înregistrat ca API REST cu o singură comandă. Și o soluție excelentă pentru prototipare și medii de dezvoltare. Pentru producția la scară, se recomandă integrarea modelului MLflow cu FastAPI (vezi articolul următor din serie).
# ==================== Serving via CLI ====================
# Servi l'ultima versione Production del modello
mlflow models serve \
--model-uri "models:/churn-gbm-model/Production" \
--host 0.0.0.0 \
--port 8080 \
--env-manager conda
# Servi un run specifico
mlflow models serve \
--model-uri "runs:/abc123def456/model" \
--port 8080
# Con Docker (raccomandato per produzione)
mlflow models build-docker \
--model-uri "models:/churn-gbm-model/Production" \
--name "churn-model-server" \
--enable-mlserver # Usa MLServer per performance migliori
docker run -p 8080:8080 churn-model-server
# ==================== Test del Serving ====================
# Il server espone l'endpoint /invocations
import requests
import json
import pandas as pd
# Prepara i dati di input nel formato atteso da MLflow
test_data = pd.DataFrame({
"feature_0": [0.5, -1.2],
"feature_1": [1.3, 0.8],
# ... altri 18 features
})
# MLflow accetta JSON in formato "split" o "records"
payload = {
"dataframe_split": {
"columns": test_data.columns.tolist(),
"data": test_data.values.tolist()
}
}
response = requests.post(
"http://localhost:8080/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
predictions = response.json()
print(f"Predizioni: {predictions}")
# {"predictions": [0, 1]}
Integrare MLflow + FastAPI pentru producție
Pentru o difuzare robustă de producție, cea mai bună practică este să încărcați modelul de la MLflow Model Registry când porniți aplicația FastAPI și actualizați-o automat atunci când o nouă versiune de producție este promovată:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np
import threading
import time
import logging
from typing import List
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API", version="2.0.0")
# ==================== Model Manager con Auto-Refresh ====================
class MLflowModelManager:
"""
Gestisce il caricamento e il refresh automatico del modello
dal MLflow Model Registry.
"""
def __init__(self, model_name: str, tracking_uri: str, refresh_interval: int = 300):
self.model_name = model_name
self.tracking_uri = tracking_uri
self.refresh_interval = refresh_interval # secondi
self._model = None
self._model_version = None
self._lock = threading.Lock()
mlflow.set_tracking_uri(tracking_uri)
self._load_model()
self._start_refresh_thread()
def _load_model(self) -> None:
"""Carica la versione Production corrente dal registry."""
try:
model_uri = f"models:/{self.model_name}/Production"
new_model = mlflow.sklearn.load_model(model_uri)
# Ottieni il numero di versione
client = mlflow.MlflowClient()
versions = client.get_latest_versions(self.model_name, stages=["Production"])
version = versions[0].version if versions else "unknown"
with self._lock:
self._model = new_model
self._model_version = version
logger.info(f"Modello {self.model_name} v{version} caricato dal registry")
except Exception as e:
logger.error(f"Errore nel caricamento del modello: {e}")
def _start_refresh_thread(self) -> None:
"""Avvia thread background per refresh periodico del modello."""
def refresh_loop():
while True:
time.sleep(self.refresh_interval)
self._load_model()
thread = threading.Thread(target=refresh_loop, daemon=True)
thread.start()
def predict(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict(features)
def predict_proba(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict_proba(features)[:, 1]
@property
def model_version(self) -> str:
return self._model_version or "unknown"
# Istanza globale del manager
model_manager = MLflowModelManager(
model_name="churn-gbm-model",
tracking_uri="http://mlflow-server:5000",
refresh_interval=300, # Controlla nuove versioni ogni 5 minuti
)
# ==================== API Endpoints ====================
class PredictionRequest(BaseModel):
features: List[List[float]]
feature_names: List[str]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[float]
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest) -> PredictionResponse:
"""Endpoint di predizione churn."""
try:
df = pd.DataFrame(
request.features,
columns=request.feature_names
)
predictions = model_manager.predict(df).tolist()
probabilities = model_manager.predict_proba(df).tolist()
return PredictionResponse(
predictions=predictions,
probabilities=probabilities,
model_version=model_manager.model_version,
)
except Exception as e:
logger.error(f"Errore nella predizione: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_name": "churn-gbm-model",
"model_version": model_manager.model_version,
}
MLflow vs alternative: W&B, Neptune, ClearML
MLflow nu este singura opțiune pentru urmărirea experimentului. Piața oferă diferite alternative valide, fiecare cu puncte forte specifice. Alegerea depinde de buget, dimensiunea echipei, infrastructura existentă și cerințele de guvernare.
Comparație completă a instrumentelor de urmărire a experimentelor
| Dimensiune | MLflow | W&B | Neptun | ClearML |
|---|---|---|---|---|
| Licenţă | Sursă deschisă (Apache 2.0) | SaaS + Enterprise auto-găzduită | SaaS + auto-gazdă | Open source + Enterprise |
| Cost (echipă de 5 persoane) | Gratuit (auto-găzduit) | ~250 USD/lună | ~100 USD/lună | Gratuit (auto-găzduit) |
| Înființat | Docker în ~30 min | Zero config (SaaS) | Zero config (SaaS) | Complex (server ClearML) |
| UI/UX | Funcțional, nu frumos | Excelent, plin de grafică | Bun, foarte personalizabil | Curbă de învățare completă, înaltă |
| Înregistrare automată | Excelent (20+ cadre) | Excelent (W&B SDK) | Bun | Automat prin monkey-patching |
| Registrul modelului | Flux de lucru integrat, în scenă | Registrul modelelor W&B | Model Registry disponibil | Depozitul integrat de modele |
| Sweep hiperparametric | Integrarea Optuna/Hyperopt | Native Sweeps (excelent) | Bun | HPO integrat |
| Guvernare/Conformitate | Pista de audit de bază | Controlul accesului, caracteristicile echipei | Spațiu de lucru în echipă | Avansat (RBAC, audit) |
| Suport GenAI/LLM | MLflow 3: urmărire, evaluare | Solicitari, monitorizare LLM | Urmărire LLM | Urmărirea experimentului LLM |
| Ideal pentru | Echipa cu infrastructura proprie, IMM, auto-gazduita | Echipa care acordă prioritate UX și colaborare | Echipa cu buget moderat | Întreprindere cu nevoi de automatizare |
Când să alegeți MLflow
MLflow este alegerea optimă în aceste scenarii:
- Buget limitat (<5K EUR/an): auto-găzduirea pe o singură VM costă în jur de 180 EUR/an
- Cerințe de rezidență a datelor: Date sensibile care nu pot părăsi infrastructura companiei
- Integrare cu ecosistemul Python existent: MLflow se integrează nativ cu scikit-learn, PyTorch, TensorFlow, XGBoost și peste 20 de cadre
- Conformitate și audit (AI Act EU): Acces complet la baza de date și artefacte, fără blocare SaaS
- Echipe orientate spre DevOps: MLflow este un container Docker ca orice alt serviciu
Interogarea și analizarea experimentelor
Una dintre cele mai valoroase caracteristici ale MLflow este capacitatea de a interoga programatic toate experimentele pentru a găsi cea mai bună rulare, a compara rulări pe mai multe dimensiuni sau a extrage date pentru rapoarte automate:
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
# ==================== Ricerca di Run con Filtri ====================
# Trova tutti i run con AUC-ROC > 0.92 nel tuo experiment
runs = mlflow.search_runs(
experiment_names=["churn-prediction-gbm"],
filter_string="metrics.auc_roc > 0.92 and tags.environment = 'dev'",
order_by=["metrics.auc_roc DESC"],
max_results=20,
)
# Il risultato e un DataFrame pandas
print(runs[["run_id", "metrics.auc_roc", "metrics.f1_score",
"params.learning_rate", "params.max_depth"]].head())
# ==================== Confronto Run su Multiple Metriche ====================
def compare_top_runs(experiment_name: str, n: int = 5) -> pd.DataFrame:
"""Restituisce un DataFrame con i top N run per AUC-ROC."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=["metrics.auc_roc DESC"],
max_results=n,
)
# Seleziona le colonne più rilevanti
cols = [
"run_id",
"metrics.accuracy", "metrics.f1_score", "metrics.auc_roc",
"params.n_estimators", "params.learning_rate", "params.max_depth",
"tags.dataset_version", "start_time",
]
# Filtra solo colonne esistenti
existing_cols = [c for c in cols if c in runs.columns]
return runs[existing_cols].copy()
comparison_df = compare_top_runs("churn-prediction-gbm")
print(comparison_df.to_string(index=False))
# ==================== Trovare il Best Run e Caricarlo ====================
def get_best_run(experiment_name: str, metric: str = "metrics.auc_roc") -> dict:
"""Trova il run con la metrica migliore e restituisce run_id e metriche."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=[f"{metric} DESC"],
max_results=1,
)
if runs.empty:
raise ValueError(f"Nessun run trovato in {experiment_name}")
best = runs.iloc[0]
return {
"run_id": best["run_id"],
"auc_roc": best.get("metrics.auc_roc"),
"accuracy": best.get("metrics.accuracy"),
"f1_score": best.get("metrics.f1_score"),
}
best_run = get_best_run("churn-prediction-gbm")
print(f"Best run: {best_run['run_id']}, AUC-ROC: {best_run['auc_roc']:.4f}")
# Carica il modello dal best run
model = mlflow.sklearn.load_model(f"runs:/{best_run['run_id']}/model")
# ==================== Export Metriche per Report ====================
# Carica la history di una metrica step-by-step (es. validation loss per epoch)
run_id = "abc123def456"
metric_history = client.get_metric_history(run_id, "val_accuracy_step")
steps = [m.step for m in metric_history]
values = [m.value for m in metric_history]
accuracy_over_time = pd.DataFrame({"step": steps, "val_accuracy": values})
print(f"Training steps loggati: {len(accuracy_over_time)}")
Cele mai bune practici pentru MLflow în producție
1. Convenții de denumire
O convenție consecventă de denumire face ca experimentele să fie căutate și ușor de înțeles luni mai târziu:
# Schema naming raccomandato per experiments e run
# EXPERIMENT: [team]-[progetto]-[tipo]
# Esempi:
"ml-eng-churn-prediction-gbm"
"ml-eng-churn-prediction-neural-net"
"research-recommender-collaborative"
# RUN NAME: [algoritmo]-[key-param]-[data]
# Esempi:
"gbm-lr0.05-depth6-2025-11-15"
"xgb-v2-autofeat-2025-11-20"
"baseline-logistic-regression"
# Usa sempre tag per metadata strutturati
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"environment": "dev", # dev | staging | prod
"dataset_version": "v2.1",
"git_branch": git_branch,
"git_commit": git_commit[:8],
"triggered_by": "ci-cd", # manual | ci-cd | scheduled
"approved_by": "", # Compilato prima della promozione
})
2. Structura Codului MLflow
from contextlib import contextmanager
import mlflow
import functools
# Pattern: decorator per tracking automatico
def mlflow_run(experiment_name: str, run_name: str = None, tags: dict = None):
"""Decorator che wrappa una funzione in un MLflow run."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name or func.__name__) as run:
if tags:
mlflow.set_tags(tags)
result = func(*args, **kwargs)
return result
return wrapper
return decorator
# Uso del decorator
@mlflow_run(
experiment_name="churn-prediction-gbm",
run_name="training-v2",
tags={"team": "ml-eng", "version": "v2"}
)
def train_model(X_train, y_train, params: dict):
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ... metriche e artefatti
return model
# Pattern: context manager per setup/teardown
@contextmanager
def mlflow_experiment(experiment_name: str, run_name: str):
"""Context manager con gestione errori."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name) as run:
try:
mlflow.set_tag("status", "running")
yield run
mlflow.set_tag("status", "success")
except Exception as e:
mlflow.set_tag("status", "failed")
mlflow.set_tag("error", str(e))
raise
# Uso
with mlflow_experiment("churn-prediction", "training-run-v3") as run:
mlflow.log_params({"n_estimators": 300})
# ... training
mlflow.log_metric("accuracy", 0.94)
3. Integrare cu GitHub Actions pentru CI/CD
# .github/workflows/ml-experiment.yml
name: ML Experiment + Model Promotion
on:
push:
branches: [main]
paths:
- 'src/models/**'
- 'src/features/**'
- 'params.yaml'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training
id: training
run: |
# Script di training che logga su MLflow e stampa il run_id
RUN_ID=$(python src/models/train.py --output-run-id)
echo "run_id=$RUN_ID" >> $GITHUB_OUTPUT
- name: Evaluate and check metrics
id: evaluation
run: |
python scripts/ci_evaluate.py \
--run-id ${{ steps.training.outputs.run_id }} \
--min-auc 0.92 \
--min-accuracy 0.90
- name: Promote to Staging
if: success()
run: |
python scripts/promote_model.py \
--run-id ${{ steps.training.outputs.run_id }} \
--model-name churn-gbm-model \
--target-stage Staging
- name: Notify team
if: success()
uses: slackapi/slack-github-action@v1.24.0
with:
payload: |
{
"text": "Nuovo modello promosso a Staging: run ${{ steps.training.outputs.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
Anti-modele de evitat
-
Utilizați mlflow.log_metric() cu chei diferite pentru aceeași măsurătoare:
accuracyeaccsunt două valori separate pentru MLflow. Definiți un dicționar de nume canonice și folosiți-l întotdeauna. -
Nu închideți cursele active: dacă codul se blochează într-o rulare activă
fără managerul de context
with mlflow.start_run(), alerga rămâne în starea „RUNNING” pe termen nelimitat. Utilizați întotdeauna managerul de context. - Înregistrarea artefactelor uriașe ca CSV: MLflow nu este un lac de date. Pentru seturi de date mari, înregistrați numai metadatele (cale DVC, hash, dimensiune) și utilizați DVC pentru versiunea datelor.
- Utilizați SQLite în producție cu mai mulți utilizatori: SQLite nu acceptă scrieri concurente. Cu două procese de antrenament paralele, apar blocaje erori. Utilizați PostgreSQL sau MySQL pentru orice configurare multi-utilizator.
- Nu înregistrați versiunea setului de date: parametrii modelului fără versiunea datelor nu sunt suficiente pentru reproductibilitate. Conectați-vă întotdeauna Git commit, eticheta DVC și dimensiunea setului de date.
- Promovați în producție fără promovare intermediară: fluxul de lucru intermediar Staging permite teste de integrare și validare a echipei înainte de implementarea în producție. Nu sări peste acest pas.
Ce este nou în MLflow 3: Către GenAI și agenți
Odată cu lansarea MLflow 3 în iunie 2025, platforma a făcut un salt evolutiv semnificativ orientat către lumea GenAI. Cele mai relevante știri pentru cei care lucrează cu modele ML tradițional și cu LLM:
- LoggedModel ca entitate de primă clasă: modelul nu mai exista doar un artefact al unei alergări. LoggedModels persistă în rulări, medii și implementare, cu descendență completă la metrici, parametri, urme și date de evaluare.
- Performanța s-a îmbunătățit cu 25%: MLflow 3.x are interogări optimizate la baza de date și a redus supraîncărcarea în jurnal, rezultând un debit mai mare de înregistrare 25% față de versiunea 2.5 (benchmark 2026).
- Urmărirea GenAI: Urmărire automată pentru LLM, lanțuri, apeluri de instrumente și agenți cu suport pentru LangChain, LlamaIndex, OpenAI SDK, Anthropic și altele.
- API-ul Feedback Collection: colecție structurată de feedback uman asupra rezultatele modelului, integrate cu UI pentru revizuire și evaluare.
-
Cadru de evaluare evoluat:
mlflow.evaluate()acum acceptă valori personalizate, LLM-as-judge și compararea automată a modelelor.
Concluzii și pașii următori
MLflow s-a consolidat ca cel mai răspândit instrument de urmărire experimentală din ecosistem ML open-source, cu o comunitate activă și evoluție constantă. Combinația de urmărirea, modelul de registru și servirea într-o singură platformă auto-găzduită îl fac alegerea natural pentru echipele care doresc control complet asupra infrastructurii lor fără MLOps costurile soluțiilor SaaS.
Fluxul de lucru pe care l-am văzut în acest articol, de la înregistrarea experimentelor pentru promovarea în producție prin Registrul de modele, acoperă 90% din cazurile de utilizare realitatea unei echipe ML. Integrarea acestuia cu DVC pentru versiunea datelor (articolul anterior) iar cu GitHub Actions pentru automatizarea CI/CD, obțineți un sistem MLOps complet și profesionist cu un buget mai mic de 250 EUR/an.
În următorul articol vom aborda una dintre cele mai dificile probleme ale învățării automate în producție: cel Model Drift. Vom vedea cum să detectăm degradarea performanța în timp (derivare a datelor, deriva conceptului, deriva predicției) și modul de implementare sisteme automate de recalificare cu alerte pe Grafana și Prometheus.
Resurse și pașii următori
- Documentație oficială MLflow: mlflow.org/docs/latest
- Note de lansare MLflow 3: mlflow.org/releases/3
- MLflow GitHub: github.com/mlflow/mlflow
- Articolul precedent: Versiune seturi de date și modele cu DVC
- Articolul următor: Detectarea derivei modelului și reinstruire automată
- Legături încrucișate: Advanced Deep Learning - Training avansat
- Legături încrucișate: Computer Vision - Conducta de detectare a obiectelor
Stiva MLOps completă cu MLflow (Buget <5.000 EUR/an)
| Componentă | Instrument | Costul anual estimat |
|---|---|---|
| Urmărirea experimentului | MLflow auto-găzduit | Gratuit (sursă deschisă) |
| Magazin de backend | PostgreSQL pe Docker | Gratuit (același server MLflow) |
| Magazin de artefacte | MinIO (S3-compat.) sau AWS S3 | Gratuit / ~30 EUR/an |
| VM pentru MLflow + PostgreSQL | EC2 t3.small (2 vCPU, 2 GB RAM) | ~180 EUR/an |
| Versiunea setului de date | DVC + DagsHub | Gratuit |
| Conducta CI/CD | Acțiuni GitHub | Gratuit (2000 min/lună) |
| Total estimat | <220 EUR/an |







