Sledování experimentu s MLflow: Kompletní průvodce
Už jste někdy ztráceli hodiny hledáním, která kombinace hyperparametrů přinesla tento výsledek? vynikající před třemi týdny? Nebo jste se přistihli, že jste se divili, proč je model ve výrobě Chová se odlišně od toho, co jste testovali lokálně? Tyto problémy jsou velmi časté v životním cyklu strojového učení mají společný kořen: nedostatek systému strukturováno podle experimentální sledování.
MLflow a nejoblíbenější open-source odpověď na tento problém. Narozen v Databricks v roce 2018 se stal projektem Apache v roce 2019, MLflow se etabloval jako de facto standard pro sledování experimentů ML v ekosystému Python. s vydání MLflow 3 v červnu 2025 během Databricks Data + AI Summit, platforma udělala významný evoluční skok: od sledovacího nástroje k jednotná platforma pro vývoj, hodnocení a nasazení modelů ML a GenAI, s LoggedModel jako prvotřídní entitou a výkon protokolování se zlepšil o 25 % oproti verzi 2.5.
V této příručce uvidíme MLflow end-to-end: od instalace po pokročilé sledování, od automatického protokolování do registru modelů až po poskytování modelů a integraci Dockeru. Každý příklad je otestován a připraven k produkčnímu použití.
Co se dozvíte v tomto článku
- Architektura MLflow: sledovací server, backendový obchod, úložiště artefaktů a co je nového v MLflow 3
- Lokální nastavení a produkce: SQLite, PostgreSQL, S3 jako úložiště artefaktů
- Kompletní sledování experimentu: parametry protokolu, metriky, artefakty, značky a vnořené běhy
- Autologging: nulová konfigurace integrace se scikit-learn, XGBoost, PyTorch, TensorFlow
- Registr modelů: příprava, výroba, archiv a správa životního cyklu modelu
- Model Serving s MLflow: REST API, kontejner Docker, integrace FastAPI
- MLflow s Docker Compose pro produkční nasazení
- Srovnání s alternativami: W&B, Neptune, ClearML - kdy si vybrat co
- Osvědčené postupy a anti-vzory pro týmy ML všech velikostí
Řada MLOps a strojové učení ve výrobě
| # | Položka | Soustředit |
|---|---|---|
| 1 | MLOps: Od experimentu k produkci | Základy a kompletní životní cyklus |
| 2 | ML potrubí s CI/CD | Akce GitHub a Docker pro ML |
| 3 | Verze DVC vs LakeFS | Datové sady a verzování modelu |
| 4 | Jste zde – Sledování experimentů s MLflow | Sledování, registr, podávání |
| 5 | Detekce driftu modelu | Automatické sledování a rekvalifikace |
| 6 | Podáváme s FastAPI + Uvicorn | Nasazení modelů do výroby |
| 7 | Škálování ML na Kubernetes | KubeFlow a Seldon Core |
| 8 | A/B testování ML modelů | Metodika a implementace |
| 9 | ML řízení | Compliance, AI Act EU, etika |
| 10 | Případová studie: Předpověď odchodu | End-to-end potrubí ve výrobě |
Architektura MLflow: Čtyři základní komponenty
Před napsáním řádku kódu je důležité porozumět stavebním blokům MLflow. Platforma se skládá ze čtyř hlavních součástí, z nichž každá má specifickou roli v životním cyklu strojového učení:
- MLflow Tracking: API a uživatelské rozhraní pro protokolování a dotazování experimentů. Zaznamenávejte parametry, metriky, značky, artefakty a poznámky pro každý tréninkový běh.
- Projekty MLflow: formát pro balení ML kódu do reprodukovatelných běhů, se správou závislostí a prostředí přes Conda nebo Docker.
- Modely MLflow: Standardní formát pro ukládání šablon, aby mohly být obsluhováno více frameworky (funkce Python, REST API, Spark UDF atd.).
- Registr modelu MLflow: centralizovaný obchod pro správu životního cyklu modelů: verzování, staging, výroba, archivace a audit trail.
S MLflow 3 (2025) je přidán pátý základní prvek: koncept LoggedModel jako prvotřídní entita. Místo předchozího přístupu run-centric (kde byl model pouze artefaktem běhu), LoggedModels přetrvávají napříč více běhy, prostředími a nasazeními, s kompletní řadou parametrů, metriky, trasování a vyhodnocovací data.
Architektura úložiště MLflow
| Komponent | Co obsahuje | Backend Doporučeno |
|---|---|---|
| Backend Store | Spustit parametry, metriky, značky, metadata | PostgreSQL/MySQL (produkce), SQLite (místní) |
| Obchod s artefakty | Modelové soubory, obrázky, CSV, vyhodnocovací data | S3, GCS, Azure Blob (produkce), místní souborový systém |
| Sledovací server | REST API pro protokolování a webové uživatelské rozhraní | Kontejnery pod Docker/EC2/Kubernetes |
| Registr modelů | Vydání šablon, fáze, anotace, webhooky | Vyžaduje databázi (nefunguje se systémy souborů) |
Nastavení MLflow: Od místního po produkční
Místní instalace a nastavení
Základní instalace MLflow vyžaduje jeden příkaz pip. Pro místní rozvoj, MLflow používá SQLite jako backendové úložiště a místní souborový systém jako úložiště artefaktů: není potřeba žádný další server.
# Installazione MLflow (versione 2.x/3.x)
pip install mlflow
# Con extras per integrazioni specifiche
pip install mlflow[extras] # scikit-learn, XGBoost, LightGBM
pip install mlflow[databricks] # integrazione Databricks
pip install mlflow[genai] # tools per GenAI e LLM (MLflow 3+)
# Verifica installazione
mlflow --version
# mlflow, version 2.x.x o 3.x.x
# Avvia la UI locale (usa ./mlruns come storage)
mlflow ui
# UI disponibile su http://localhost:5000
# Avvia con database SQLite
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 \
--port 5000
Nastavení výroby: PostgreSQL + S3
Pro produkční prostředí s více uživateli a vysokou souběžností je nezbytné používat relační databáze jako backend a objektové úložiště jako úložiště artefaktů. Model MLflow Registry vyžaduje databázi (nepracuje se systémy souborů):
# ==================== docker-compose.yml ====================
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlflow"]
interval: 10s
timeout: 5s
retries: 5
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${MINIO_SECRET_KEY}
volumes:
- minio_data:/data
ports:
- "9000:9000"
- "9001:9001"
mlflow:
image: ghcr.io/mlflow/mlflow:v2.19.0
depends_on:
postgres:
condition: service_healthy
environment:
MLFLOW_BACKEND_STORE_URI: postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
MLFLOW_ARTIFACT_ROOT: s3://mlflow-artifacts/
AWS_ACCESS_KEY_ID: ${MINIO_ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${MINIO_SECRET_KEY}
MLFLOW_S3_ENDPOINT_URL: http://minio:9000
command: >
mlflow server
--backend-store-uri postgresql://mlflow:${POSTGRES_PASSWORD}@postgres:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/
--host 0.0.0.0
--port 5000
ports:
- "5000:5000"
volumes:
postgres_data:
minio_data:
# .env file (NON commitare in Git!)
POSTGRES_PASSWORD=sicura_password_123
MINIO_ACCESS_KEY=minio_admin
MINIO_SECRET_KEY=minio_password_sicura
# Avvio dello stack completo
docker-compose up -d
# Verifica che MLflow sia in ascolto
curl http://localhost:5000/health
# {"status": "OK"}
# Crea il bucket MinIO per gli artefatti
docker exec -it minio_container mc alias set local http://localhost:9000 admin password
docker exec -it minio_container mc mb local/mlflow-artifacts
Rozpočet <5 000 EUR/rok pro malé a střední podniky
Pro týmy s omezeným rozpočtem je toto nastavení s Docker Compose na jediném virtuálním počítači náklady kolem 180 EUR/rok (EC2 t3.small nebo ekvivalent). MinIO lokálně nahrazuje S3 s plně kompatibilními API. Pro trvalé úložiště můžete také použít AWS S3 (kolem 2-5 EUR/měsíc za pár GB artefaktů). Úspory oproti platformám SaaS jako W&B Teams (50+ USD/uživatel/měsíc) a významné: tým 5 lidí ušetříte přes 2500 EUR/rok.
Sledování experimentu: Zaznamenat parametry, metriky a artefakty
Srdce MLflow a sledovací API. Každé volání mlflow.start_run()
vytvořit nový běh uvnitř a experimentovat. A
experiment groups related runs (e.g. all runs for the churn prediction model).
Spustí protokol čtyř typů dat:
- Parametry: pevné hodnoty pro běh (hyperparametry, konfigurace)
- Metriky: číselné hodnoty, které se mohou v průběhu času měnit (ztráta, přesnost na epochu)
- Artefakty: libovolné soubory (šablony, obrázky, datové sady, HTML sestavy)
- Tagy: Metadata párů klíč–hodnota pro anotování a filtrování běhů
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
accuracy_score, f1_score, roc_auc_score,
confusion_matrix, classification_report, RocCurveDisplay
)
from sklearn.model_selection import train_test_split
import os
# ==================== Configurazione MLflow ====================
# Connessione al tracking server (locale o remoto)
mlflow.set_tracking_uri("http://localhost:5000")
# Crea o usa un experiment esistente
experiment_name = "churn-prediction-gbm"
mlflow.set_experiment(experiment_name)
# Ottieni informazioni sull'experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
print(f"Experiment ID: {experiment.experiment_id}")
# ==================== Training con Tracking Completo ====================
def train_churn_model(X_train, X_val, y_train, y_val, params: dict) -> str:
"""
Allena un GBM per churn prediction con tracking MLflow completo.
Restituisce il run_id del run MLflow.
"""
with mlflow.start_run(run_name=f"gbm-lr{params['learning_rate']}-depth{params['max_depth']}") as run:
# ---- 1. TAG: metadata del run ----
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"dataset_version": "v2.1",
"git_commit": os.popen("git rev-parse HEAD").read().strip(),
"environment": "dev",
})
# ---- 2. PARAMS: iperparametri e configurazione ----
mlflow.log_params(params)
mlflow.log_params({
"train_size": len(X_train),
"val_size": len(X_val),
"n_features": X_train.shape[1],
"target_positive_rate": float(y_train.mean()),
})
# ---- 3. TRAINING ----
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ---- 4. METRICS: step-by-step durante training ----
# Logga la loss per ogni stage del GBM (equivalente all'epoch loss)
train_scores = list(model.staged_predict(X_train))
val_scores = list(model.staged_predict(X_val))
for step, (tr_pred, val_pred) in enumerate(zip(train_scores, val_scores)):
train_acc = accuracy_score(y_train, tr_pred)
val_acc = accuracy_score(y_val, val_pred)
mlflow.log_metrics({
"train_accuracy_step": train_acc,
"val_accuracy_step": val_acc,
}, step=step)
# ---- 5. METRICS FINALI ----
y_pred = model.predict(X_val)
y_prob = model.predict_proba(X_val)[:, 1]
final_metrics = {
"accuracy": accuracy_score(y_val, y_pred),
"f1_score": f1_score(y_val, y_pred),
"auc_roc": roc_auc_score(y_val, y_prob),
"precision": float(np.mean(y_pred[y_pred == 1] == y_val[y_pred == 1])) if sum(y_pred) > 0 else 0.0,
"recall": float(sum((y_pred == 1) & (y_val == 1)) / sum(y_val == 1)),
}
mlflow.log_metrics(final_metrics)
# ---- 6. ARTIFACTS: file del modello e report ----
# Salva e logga confusion matrix come immagine
fig, ax = plt.subplots(figsize=(6, 5))
cm = confusion_matrix(y_val, y_pred)
im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
ax.set_title('Confusion Matrix - Churn Prediction')
ax.set_xlabel('Predicted')
ax.set_ylabel('True')
plt.colorbar(im)
plt.tight_layout()
mlflow.log_figure(fig, "confusion_matrix.png")
plt.close()
# Salva ROC curve
fig2, ax2 = plt.subplots(figsize=(6, 5))
RocCurveDisplay.from_predictions(y_val, y_prob, ax=ax2)
ax2.set_title('ROC Curve - Churn Model')
mlflow.log_figure(fig2, "roc_curve.png")
plt.close()
# Logga classification report come file di testo
report = classification_report(y_val, y_pred, target_names=["No Churn", "Churn"])
mlflow.log_text(report, "classification_report.txt")
# Logga feature importance come CSV
feature_imp = pd.DataFrame({
"feature": X_train.columns.tolist(),
"importance": model.feature_importances_
}).sort_values("importance", ascending=False)
mlflow.log_table(feature_imp.to_dict(orient="list"), "feature_importance.json")
# ---- 7. LOG MODEL: salva il modello con firma e input example ----
input_example = X_val.head(3)
signature = mlflow.models.infer_signature(X_val, y_pred)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="churn-gbm-model", # Registra automaticamente nel Registry
)
print(f"Run ID: {run.info.run_id}")
print(f"Accuracy: {final_metrics['accuracy']:.4f}")
print(f"AUC-ROC: {final_metrics['auc_roc']:.4f}")
print(f"UI: http://localhost:5000/#/experiments/{experiment.experiment_id}/runs/{run.info.run_id}")
return run.info.run_id
# ==================== Esempio di utilizzo ====================
if __name__ == "__main__":
# Dati sintetici per demo
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=10000, n_features=20, n_informative=10, random_state=42)
X_df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(20)])
X_train, X_val, y_train, y_val = train_test_split(X_df, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 300,
"learning_rate": 0.05,
"max_depth": 4,
"subsample": 0.8,
"min_samples_split": 20,
"random_state": 42,
}
run_id = train_churn_model(X_train, X_val, y_train, y_val, params)
Vnořené běhy a hledání hyperparametrů
MLflow podporuje i vnořené běhy: Podřízený běh v rámci rodičovského běhu. Tento vzor je ideální pro vyhledávání hyperparametrů (Optuna, GridSearchCV), kdekoli chcete rodičovský běh představující celkové vyhledávání a mnoho podřízených běhů, jeden pro každý testovaná konfigurace:
import mlflow
import optuna
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
def hyperparameter_search(X_train, y_train, n_trials: int = 50) -> str:
"""
Hyperparameter search con Optuna + MLflow nested runs.
Run padre: contiene il summary della ricerca
Run figli: ogni trial Optuna e un run MLflow figlio
"""
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("churn-hparam-search")
with mlflow.start_run(run_name="optuna-search-v1") as parent_run:
mlflow.set_tag("search_method", "optuna-tpe")
mlflow.log_param("n_trials", n_trials)
mlflow.log_param("optimization_metric", "auc_roc")
best_auc = 0.0
best_params = {}
def objective(trial) -> float:
"""Funzione obiettivo Optuna - ogni trial e un nested run MLflow."""
params = {
"n_estimators": trial.suggest_int("n_estimators", 100, 1000),
"learning_rate": trial.suggest_float("learning_rate", 0.001, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 2, 8),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"min_samples_split": trial.suggest_int("min_samples_split", 5, 50),
}
# Ogni trial ottiene il suo run MLflow figlio
with mlflow.start_run(
run_name=f"trial-{trial.number}",
nested=True # <-- indica che e un run figlio
) as child_run:
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
scores = cross_val_score(model, X_train, y_train, cv=3, scoring="roc_auc")
auc_mean = scores.mean()
auc_std = scores.std()
mlflow.log_metrics({
"cv_auc_mean": auc_mean,
"cv_auc_std": auc_std,
"trial_number": trial.number,
})
return auc_mean
# Esegui la ricerca Optuna
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective, n_trials=n_trials, n_jobs=1)
# Logga i risultati della ricerca nel run padre
best_trial = study.best_trial
mlflow.log_params({f"best_{k}": v for k, v in best_trial.params.items()})
mlflow.log_metrics({
"best_auc": best_trial.value,
"n_trials_completed": len(study.trials),
})
print(f"Miglior AUC: {best_trial.value:.4f}")
print(f"Migliori parametri: {best_trial.params}")
return parent_run.info.run_id
Autologging: Zero-Config Tracking
Automatické protokolování je jednou z nejpohodlnějších funkcí MLflow: s jediným řádkem kódu, MLflow automaticky zachycuje volání hlavních rámců ML a protokoluje parametry, metriky a artefakty bez jakýchkoli změn v tréninkovém kódu. A podporováno scikit-learn, XGBoost, LightGBM, PyTorch Lightning, TensorFlow/Keras, Spark MLlib a další.
import mlflow
import mlflow.sklearn
import mlflow.xgboost
# ==================== Autologging scikit-learn ====================
# Abilita autologging per scikit-learn
# Logga automaticamente: hyperparametri, metriche di training, signature del modello
mlflow.sklearn.autolog(
log_input_examples=True, # Logga esempi di input
log_model_signatures=True, # Inferisce e logga la firma del modello
log_models=True, # Salva il modello come artefatto
log_datasets=False, # Non loggare l'intero dataset (troppo grande)
max_tuning_runs=100, # Per GridSearchCV: max run tracciati
exclusive=False, # Permette log manuali in aggiunta
)
mlflow.set_experiment("autolog-demo")
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
# Con autolog attivo, fit() logga tutto automaticamente
with mlflow.start_run(run_name="rf-gridsearch-autolog"):
param_grid = {
"n_estimators": [100, 300],
"max_depth": [4, 6, 8],
"min_samples_split": [10, 20],
}
model = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=3,
scoring="roc_auc",
n_jobs=-1
)
model.fit(X_train, y_train)
# MLflow ha loggato automaticamente:
# - Tutti i parametri del RandomForest
# - cv=3, scoring, n_jobs
# - Best params dal GridSearchCV
# - Score di validazione incrociata
# - Il modello come artefatto
# ==================== Autologging XGBoost ====================
mlflow.xgboost.autolog(
importance_types=["gain", "weight"], # Logga feature importance
log_model_signatures=True,
log_input_examples=True,
)
import xgboost as xgb
import numpy as np
with mlflow.start_run(run_name="xgb-autolog"):
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
params = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "auc"],
"learning_rate": 0.05,
"max_depth": 6,
"n_estimators": 500,
"subsample": 0.8,
"seed": 42,
}
# Autolog traccia: tutte le metriche per ogni boosting round
# e il modello finale, la feature importance
booster = xgb.train(
params,
dtrain,
num_boost_round=500,
evals=[(dtrain, "train"), (dval, "val")],
early_stopping_rounds=50,
verbose_eval=False,
)
# ==================== Autologging PyTorch Lightning ====================
mlflow.pytorch.autolog(
every_n_iter=10, # Logga ogni 10 iterazioni
log_models=True,
checkpoint_monitor="val_loss",
)
# Con PyTorch Lightning, chiama semplicemente trainer.fit()
# e MLflow cattura automaticamente tutte le loss e le metriche
Omezení autologování
Autologging je vhodný pro rychlé prototypování, ale má určitá omezení ve výrobě:
nezaznamenává uživatelem definované vlastní metriky, nezaznamenává informace o sadě dat
(velikost, verze, distribuce tříd), nezvládá závislosti mezi
experimenty. U vyspělých potrubí MLOps se doporučuje použít jako základ automatické protokolování
a přidejte ruční volání mlflow.log_param(), mlflow.log_metric()
e mlflow.log_artifact() pro informace specifické pro doménu.
Registr modelu MLflow: Životní cyklus modelu
Registr modelu MLflow je komponenta, která transformuje MLflow z jednoduchého nástroje sledování na skutečnou platformu MLOps. Umožňuje spravovat verze modelu prostřednictvím standardizovaný životní cyklus: od rozvoj a inscenování a výroba, s auditními záznamy, anotacemi a upozorněními.
Registr je přístupný jak přes uživatelské rozhraní (přetažením přetažením změníte fáze), tak přes Python API, nezbytné pro automatizaci CI/CD. S MLflow 3 je registr ve stavu obohacený o webhooky pro automatická upozornění při přepínání fází.
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry import ModelVersion
import time
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
MODEL_NAME = "churn-gbm-model"
# ==================== 1. REGISTRARE UN MODELLO ====================
# Metodo A: durante il log_model (più comune)
with mlflow.start_run() as run:
# ... training ...
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name=MODEL_NAME,
)
# Il modello viene automaticamente registrato come versione 1
# con stage "None" (development)
# Metodo B: da un run esistente tramite URI
run_id = "abc123def456"
model_uri = f"runs:/{run_id}/model"
version = mlflow.register_model(
model_uri=model_uri,
name=MODEL_NAME,
tags={"team": "ml-eng", "algorithm": "gbm"}
)
print(f"Registrato: {MODEL_NAME} versione {version.version}")
# ==================== 2. GESTIRE LE VERSIONI E GLI STAGE ====================
# Aggiungi una descrizione alla versione
client.update_model_version(
name=MODEL_NAME,
version=version.version,
description=(
"GBM per churn prediction v2.1. "
"Accuracy: 0.9423, AUC-ROC: 0.9567. "
"Trainato su dataset 2024-01 to 2025-01, 45k samples."
)
)
# Promuovi a Staging (dopo validazione interna)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Staging",
archive_existing_versions=False, # Mantieni altre versioni staging
)
print(f"Modello v{version.version} promosso a Staging")
# Aggiungi tag per tracciabilita
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validated_by",
value="alice.rossi@company.com"
)
client.set_model_version_tag(
name=MODEL_NAME,
version=version.version,
key="validation_date",
value="2025-11-15"
)
# Promuovi a Production (dopo approvazione)
client.transition_model_version_stage(
name=MODEL_NAME,
version=version.version,
stage="Production",
archive_existing_versions=True, # Archivia la versione Production precedente
)
print(f"Modello v{version.version} in produzione!")
# ==================== 3. CARICARE IL MODELLO IN PRODUZIONE ====================
def load_production_model(model_name: str):
"""Carica sempre la versione Production dal registry."""
model_uri = f"models:/{model_name}/Production"
model = mlflow.sklearn.load_model(model_uri)
return model
# In uno script di inference o serving
model = load_production_model(MODEL_NAME)
predictions = model.predict(new_data)
# ==================== 4. INTERROGARE IL REGISTRY ====================
# Lista tutte le versioni di un modello
versions = client.search_model_versions(f"name='{MODEL_NAME}'")
for v in versions:
print(f"v{v.version} | Stage: {v.current_stage} | Run: {v.run_id[:8]}...")
# Cerca solo versioni in Production
prod_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
if prod_versions:
latest_prod = prod_versions[0]
print(f"Versione in produzione: v{latest_prod.version}")
print(f"Run ID: {latest_prod.run_id}")
# Ottieni tutte le metriche associate alla versione in produzione
run_data = client.get_run(latest_prod.run_id).data
print(f"AUC-ROC in produzione: {run_data.metrics.get('auc_roc', 'N/A')}")
# ==================== 5. ROLLBACK IN CASO DI PROBLEMA ====================
def rollback_to_previous_production(model_name: str) -> None:
"""
Rollback: archivia la versione Production attuale e
ripristina la versione Archived più recente.
"""
# Trova versione corrente in Production
current_prod = client.get_latest_versions(model_name, stages=["Production"])
if not current_prod:
print("Nessuna versione in produzione trovata")
return
# Trova la versione Archived più recente (precedente Production)
archived = client.search_model_versions(
f"name='{model_name}'",
filter_string="tags.stage_history LIKE '%production%'",
)
if len(archived) < 2:
print("Nessuna versione archiviata disponibile per rollback")
return
# Archivia la versione problematica
client.transition_model_version_stage(
name=model_name,
version=current_prod[0].version,
stage="Archived",
)
# Promuovi la versione precedente
prev_version = archived[1].version
client.transition_model_version_stage(
name=model_name,
version=prev_version,
stage="Production",
)
print(f"Rollback completato: ora in produzione v{prev_version}")
Model Serving s MLflow
MLflow obsahuje vestavěný obslužný server, který odhaluje všechny registrované modely jako REST API s jediným příkazem. A vynikající řešení pro prototypování a vývojová prostředí. Pro výrobu v měřítku se doporučuje integrovat model MLflow s FastAPI (viz další článek v sérii).
# ==================== Serving via CLI ====================
# Servi l'ultima versione Production del modello
mlflow models serve \
--model-uri "models:/churn-gbm-model/Production" \
--host 0.0.0.0 \
--port 8080 \
--env-manager conda
# Servi un run specifico
mlflow models serve \
--model-uri "runs:/abc123def456/model" \
--port 8080
# Con Docker (raccomandato per produzione)
mlflow models build-docker \
--model-uri "models:/churn-gbm-model/Production" \
--name "churn-model-server" \
--enable-mlserver # Usa MLServer per performance migliori
docker run -p 8080:8080 churn-model-server
# ==================== Test del Serving ====================
# Il server espone l'endpoint /invocations
import requests
import json
import pandas as pd
# Prepara i dati di input nel formato atteso da MLflow
test_data = pd.DataFrame({
"feature_0": [0.5, -1.2],
"feature_1": [1.3, 0.8],
# ... altri 18 features
})
# MLflow accetta JSON in formato "split" o "records"
payload = {
"dataframe_split": {
"columns": test_data.columns.tolist(),
"data": test_data.values.tolist()
}
}
response = requests.post(
"http://localhost:8080/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
predictions = response.json()
print(f"Predizioni: {predictions}")
# {"predictions": [0, 1]}
Integrace MLflow + FastAPI pro produkci
Pro robustní produkční obsluhu je osvědčeným postupem načíst model z Registr modelu MLflow při spouštění aplikace FastAPI a aktualizujte jej automaticky při propagaci nové produkční verze:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import mlflow
import pandas as pd
import numpy as np
import threading
import time
import logging
from typing import List
logger = logging.getLogger(__name__)
app = FastAPI(title="Churn Prediction API", version="2.0.0")
# ==================== Model Manager con Auto-Refresh ====================
class MLflowModelManager:
"""
Gestisce il caricamento e il refresh automatico del modello
dal MLflow Model Registry.
"""
def __init__(self, model_name: str, tracking_uri: str, refresh_interval: int = 300):
self.model_name = model_name
self.tracking_uri = tracking_uri
self.refresh_interval = refresh_interval # secondi
self._model = None
self._model_version = None
self._lock = threading.Lock()
mlflow.set_tracking_uri(tracking_uri)
self._load_model()
self._start_refresh_thread()
def _load_model(self) -> None:
"""Carica la versione Production corrente dal registry."""
try:
model_uri = f"models:/{self.model_name}/Production"
new_model = mlflow.sklearn.load_model(model_uri)
# Ottieni il numero di versione
client = mlflow.MlflowClient()
versions = client.get_latest_versions(self.model_name, stages=["Production"])
version = versions[0].version if versions else "unknown"
with self._lock:
self._model = new_model
self._model_version = version
logger.info(f"Modello {self.model_name} v{version} caricato dal registry")
except Exception as e:
logger.error(f"Errore nel caricamento del modello: {e}")
def _start_refresh_thread(self) -> None:
"""Avvia thread background per refresh periodico del modello."""
def refresh_loop():
while True:
time.sleep(self.refresh_interval)
self._load_model()
thread = threading.Thread(target=refresh_loop, daemon=True)
thread.start()
def predict(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict(features)
def predict_proba(self, features: pd.DataFrame) -> np.ndarray:
with self._lock:
if self._model is None:
raise RuntimeError("Modello non disponibile")
return self._model.predict_proba(features)[:, 1]
@property
def model_version(self) -> str:
return self._model_version or "unknown"
# Istanza globale del manager
model_manager = MLflowModelManager(
model_name="churn-gbm-model",
tracking_uri="http://mlflow-server:5000",
refresh_interval=300, # Controlla nuove versioni ogni 5 minuti
)
# ==================== API Endpoints ====================
class PredictionRequest(BaseModel):
features: List[List[float]]
feature_names: List[str]
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[float]
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest) -> PredictionResponse:
"""Endpoint di predizione churn."""
try:
df = pd.DataFrame(
request.features,
columns=request.feature_names
)
predictions = model_manager.predict(df).tolist()
probabilities = model_manager.predict_proba(df).tolist()
return PredictionResponse(
predictions=predictions,
probabilities=probabilities,
model_version=model_manager.model_version,
)
except Exception as e:
logger.error(f"Errore nella predizione: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_name": "churn-gbm-model",
"model_version": model_manager.model_version,
}
MLflow vs alternativy: W&B, Neptun, ClearML
MLflow není jedinou možností pro sledování experimentů. Trh nabízí různé platné alternativy, každá se specifickými silnými stránkami. Výběr závisí na rozpočtu, velikost týmu, stávající infrastruktura a požadavky na správu.
Kompletní srovnání nástrojů pro sledování experimentů
| Velikost | MLflow | W&B | Neptune | ClearML |
|---|---|---|---|---|
| Licence | Open source (Apache 2.0) | SaaS + podnik s vlastním hostitelem | SaaS + vlastní hostitel | Open source + Enterprise |
| Cena (tým 5 lidí) | Zdarma (vlastně hostované) | ~250 USD/měsíc | ~100 USD/měsíc | Zdarma (vlastně hostované) |
| Nastavení | Docker za ~30 min | Nulová konfigurace (SaaS) | Nulová konfigurace (SaaS) | Komplexní (ClearML Server) |
| UI/UX | Funkční, ne krásné | Skvělé, plné grafiky | Dobré, velmi přizpůsobitelné | Kompletní, vysoká křivka učení |
| Autologování | Vynikající (20+ rámců) | Vynikající (W&B SDK) | Dobrý | Automaticky přes opičí záplatování |
| Registr modelů | Integrovaný, pracovní postup | Registr modelů W&B | Registr modelů k dispozici | Integrované úložiště modelů |
| Hyperparameter Sweep | Integrace Optuna/Hyperopt | Native Sweeps (výborné) | Dobrý | Integrovaný HPO |
| Governance/Compliance | Základní auditní stopa | Řízení přístupu, týmové funkce | Týmový pracovní prostor | Pokročilé (RBAC, audit) |
| Podpora GenAI/LLM | MLflow 3: sledování, vyhodnocení | Výzvy, monitorování LLM | Sledování LLM | Sledování experimentů LLM |
| Ideální pro | Tým s vlastní infrastrukturou, SME, self-hosted | Tým, který upřednostňuje UX a spolupráci | Mírný rozpočtový tým | Podnik s potřebami automatizace |
Kdy zvolit MLflow
MLflow je optimální volbou v těchto scénářích:
- Omezený rozpočet (<5 000 EUR/rok): self-hosting na jednom VM stojí kolem 180 EUR/rok
- Požadavky na datovou rezidenci: Citlivá data, která nemohou opustit firemní infrastrukturu
- Integrace se stávajícím ekosystémem Pythonu: MLflow se nativně integruje s frameworky scikit-learn, PyTorch, TensorFlow, XGBoost a 20+
- Soulad a audit (AI Act EU): Plný přístup k databázi a artefaktům, žádný zámek SaaS
- Týmy zaměřené na DevOps: MLflow je kontejner Dockeru jako každá jiná služba
Dotazování a analýza experimentů
Jednou z nejcennějších funkcí MLflow je schopnost programově dotazovat všechny experimenty, abyste našli nejlepší spuštění, porovnali běhy napříč více dimenzemi nebo extrahovali data pro automatické reporty:
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
mlflow.set_tracking_uri("http://localhost:5000")
client = MlflowClient()
# ==================== Ricerca di Run con Filtri ====================
# Trova tutti i run con AUC-ROC > 0.92 nel tuo experiment
runs = mlflow.search_runs(
experiment_names=["churn-prediction-gbm"],
filter_string="metrics.auc_roc > 0.92 and tags.environment = 'dev'",
order_by=["metrics.auc_roc DESC"],
max_results=20,
)
# Il risultato e un DataFrame pandas
print(runs[["run_id", "metrics.auc_roc", "metrics.f1_score",
"params.learning_rate", "params.max_depth"]].head())
# ==================== Confronto Run su Multiple Metriche ====================
def compare_top_runs(experiment_name: str, n: int = 5) -> pd.DataFrame:
"""Restituisce un DataFrame con i top N run per AUC-ROC."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=["metrics.auc_roc DESC"],
max_results=n,
)
# Seleziona le colonne più rilevanti
cols = [
"run_id",
"metrics.accuracy", "metrics.f1_score", "metrics.auc_roc",
"params.n_estimators", "params.learning_rate", "params.max_depth",
"tags.dataset_version", "start_time",
]
# Filtra solo colonne esistenti
existing_cols = [c for c in cols if c in runs.columns]
return runs[existing_cols].copy()
comparison_df = compare_top_runs("churn-prediction-gbm")
print(comparison_df.to_string(index=False))
# ==================== Trovare il Best Run e Caricarlo ====================
def get_best_run(experiment_name: str, metric: str = "metrics.auc_roc") -> dict:
"""Trova il run con la metrica migliore e restituisce run_id e metriche."""
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string="status = 'FINISHED'",
order_by=[f"{metric} DESC"],
max_results=1,
)
if runs.empty:
raise ValueError(f"Nessun run trovato in {experiment_name}")
best = runs.iloc[0]
return {
"run_id": best["run_id"],
"auc_roc": best.get("metrics.auc_roc"),
"accuracy": best.get("metrics.accuracy"),
"f1_score": best.get("metrics.f1_score"),
}
best_run = get_best_run("churn-prediction-gbm")
print(f"Best run: {best_run['run_id']}, AUC-ROC: {best_run['auc_roc']:.4f}")
# Carica il modello dal best run
model = mlflow.sklearn.load_model(f"runs:/{best_run['run_id']}/model")
# ==================== Export Metriche per Report ====================
# Carica la history di una metrica step-by-step (es. validation loss per epoch)
run_id = "abc123def456"
metric_history = client.get_metric_history(run_id, "val_accuracy_step")
steps = [m.step for m in metric_history]
values = [m.value for m in metric_history]
accuracy_over_time = pd.DataFrame({"step": steps, "val_accuracy": values})
print(f"Training steps loggati: {len(accuracy_over_time)}")
Osvědčené postupy pro MLflow ve výrobě
1. Konvence pojmenování
Důsledná konvence pojmenování činí experimenty prohledatelnými a srozumitelnými o měsíce později:
# Schema naming raccomandato per experiments e run
# EXPERIMENT: [team]-[progetto]-[tipo]
# Esempi:
"ml-eng-churn-prediction-gbm"
"ml-eng-churn-prediction-neural-net"
"research-recommender-collaborative"
# RUN NAME: [algoritmo]-[key-param]-[data]
# Esempi:
"gbm-lr0.05-depth6-2025-11-15"
"xgb-v2-autofeat-2025-11-20"
"baseline-logistic-regression"
# Usa sempre tag per metadata strutturati
mlflow.set_tags({
"team": "ml-engineering",
"project": "churn-prediction",
"environment": "dev", # dev | staging | prod
"dataset_version": "v2.1",
"git_branch": git_branch,
"git_commit": git_commit[:8],
"triggered_by": "ci-cd", # manual | ci-cd | scheduled
"approved_by": "", # Compilato prima della promozione
})
2. Struktura MLflow Code
from contextlib import contextmanager
import mlflow
import functools
# Pattern: decorator per tracking automatico
def mlflow_run(experiment_name: str, run_name: str = None, tags: dict = None):
"""Decorator che wrappa una funzione in un MLflow run."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name or func.__name__) as run:
if tags:
mlflow.set_tags(tags)
result = func(*args, **kwargs)
return result
return wrapper
return decorator
# Uso del decorator
@mlflow_run(
experiment_name="churn-prediction-gbm",
run_name="training-v2",
tags={"team": "ml-eng", "version": "v2"}
)
def train_model(X_train, y_train, params: dict):
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
model.fit(X_train, y_train)
# ... metriche e artefatti
return model
# Pattern: context manager per setup/teardown
@contextmanager
def mlflow_experiment(experiment_name: str, run_name: str):
"""Context manager con gestione errori."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name) as run:
try:
mlflow.set_tag("status", "running")
yield run
mlflow.set_tag("status", "success")
except Exception as e:
mlflow.set_tag("status", "failed")
mlflow.set_tag("error", str(e))
raise
# Uso
with mlflow_experiment("churn-prediction", "training-run-v3") as run:
mlflow.log_params({"n_estimators": 300})
# ... training
mlflow.log_metric("accuracy", 0.94)
3. Integrace s GitHub Actions pro CI/CD
# .github/workflows/ml-experiment.yml
name: ML Experiment + Model Promotion
on:
push:
branches: [main]
paths:
- 'src/models/**'
- 'src/features/**'
- 'params.yaml'
jobs:
train-and-evaluate:
runs-on: ubuntu-latest
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training
id: training
run: |
# Script di training che logga su MLflow e stampa il run_id
RUN_ID=$(python src/models/train.py --output-run-id)
echo "run_id=$RUN_ID" >> $GITHUB_OUTPUT
- name: Evaluate and check metrics
id: evaluation
run: |
python scripts/ci_evaluate.py \
--run-id ${{ steps.training.outputs.run_id }} \
--min-auc 0.92 \
--min-accuracy 0.90
- name: Promote to Staging
if: success()
run: |
python scripts/promote_model.py \
--run-id ${{ steps.training.outputs.run_id }} \
--model-name churn-gbm-model \
--target-stage Staging
- name: Notify team
if: success()
uses: slackapi/slack-github-action@v1.24.0
with:
payload: |
{
"text": "Nuovo modello promosso a Staging: run ${{ steps.training.outputs.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
Anti-vzory, kterým je třeba se vyhnout
-
Pro stejnou metriku použijte mlflow.log_metric() s různými klíči:
accuracyeaccjsou to dvě samostatné metriky pro MLflow. Definujte slovník kanonických jmen a vždy jej používejte. -
Neuzavírejte aktivní běhy: pokud se kód zhroutí během aktivního běhu
bez kontextového manažera
with mlflow.start_run(), běh zůstává ve stavu "PROVOZ" na dobu neurčitou. Vždy používejte kontextového manažera. - Protokolování obrovských artefaktů jako CSV: MLflow není datové jezero. U velkých datových sad zaprotokolujte pouze metadata (cestu DVC, hash, velikost) a použijte DVC pro verzování dat.
- Použijte SQLite v produkci s více uživateli: SQLite nepodporuje souběžné spisy. U dvou paralelních tréninkových procesů dochází k zablokování chyby. Použijte PostgreSQL nebo MySQL pro jakékoli nastavení pro více uživatelů.
- Neprotokolujte verzi datové sady: parametry modelu bez verzování data nestačí pro reprodukovatelnost. Vždy se přihlaste Potvrzení Git, značka DVC a velikost datové sady.
- Povýšit do produkce bez přechodné propagace Staging: zprostředkující pracovní postup Staging umožňuje integrační a ověřovací testy týmu před nasazením do výroby. Tento krok nepřeskakujte.
Co je nového v MLflow 3: Směrem ke GenAI a agentům
S vydáním MLflow 3 v červnu 2025 platforma udělala evoluční skok významný orientovaný na svět GenAI. Nejrelevantnější zpráva pro ty, kteří pracují s modelkami Tradiční ML a s LLM:
- LoggedModel jako prvotřídní entita: model již neexistuje jen artefakt běhu. LoggedModels přetrvávají napříč běhy, prostředími a nasazení s kompletní linií metrik, parametrů, trasování a vyhodnocovacích dat.
- Výkon se zlepšil o 25 %: MLflow 3.x má optimalizované dotazy do databáze a snížení režie protokolování, což má za následek vyšší propustnost protokolování 25 % ve srovnání s verzí 2.5 (benchmark 2026).
- Sledování GenAI: Automatické sledování LLM, řetězců, volání nástrojů a agentů s podporou LangChain, LlamaIndex, OpenAI SDK, Anthropic a dalších.
- Feedback Collection API: strukturovaný sběr lidské zpětné vazby na výsledky modelu, integrované s uživatelským rozhraním pro kontrolu a hodnocení.
-
Vyvinutý rámec hodnocení:
mlflow.evaluate()teď podporuje vlastní metriky, LLM-as-judge a automatické porovnání modelů.
Závěry a další kroky
MLflow se upevnil jako nejrozšířenější experimentální sledovací nástroj v ekosystému Open-source ML s aktivní komunitou a neustálým vývojem. Kombinace sledování, registr modelů a poskytování na jediné vlastní hostované platformě z něj činí volbu přirozené pro týmy, které chtějí úplnou kontrolu nad svou infrastrukturou bez MLOps náklady na řešení SaaS.
Pracovní postup, který jsme viděli v tomto článku, z registrace experimentů k propagaci do výroby prostřednictvím registru modelů, pokrývá 90 % případů použití realita ML týmu. Integrace s DVC pro verzování dat (předchozí článek) a s GitHub Actions pro automatizaci CI/CD získáte kompletní systém MLOps a profesionál s rozpočtem nižším než 250 EUR/rok.
V příštím článku se budeme zabývat jedním z nejzáludnějších problémů strojového učení ve výrobě: Model Drift. Uvidíme, jak zjistit degradaci výkon v čase (posunutí dat, posun konceptu, posun predikce) a způsob implementace automatické rekvalifikační systémy s upozorněním na Grafana a Prometheus.
Zdroje a další kroky
- Oficiální dokumentace MLflow: mlflow.org/docs/latest
- Poznámky k vydání MLflow 3: mlflow.org/releases/3
- MLflow GitHub: github.com/mlflow/mlflow
- Předchozí článek: Verzování datových sad a modelů pomocí DVC
- Další článek: Detekce driftu modelu a automatické přeškolení
- Křížový odkaz: Pokročilé hluboké učení – pokročilý výcvik
- Křížový odkaz: Počítačové vidění – potrubí detekce objektů
Plný zásobník MLOps s MLflow (rozpočet <5 000 EUR/rok)
| Komponent | Nástroj | Odhadované roční náklady |
|---|---|---|
| Sledování experimentu | Samoobslužný MLflow | Zdarma (open source) |
| Backend Store | PostgreSQL na Dockeru | Zdarma (stejný server MLflow) |
| Obchod s artefakty | MinIO (kompatibilita s S3) nebo AWS S3 | Zdarma / ~30 EUR/rok |
| VM pro MLflow + PostgreSQL | EC2 t3.small (2 vCPU, 2 GB RAM) | ~180 EUR/rok |
| Verze datové sady | DVC + DagsHub | Uvolnit |
| CI/CD potrubí | Akce GitHubu | Zdarma (2000 min/měsíc) |
| Odhadovaný součet | <220 EUR/rok |







