Obsluhování ML modelů: FastAPI + Uvicorn ve výrobě
Vycvičili jste model, který překonává všechny základní linie, metriky MLflow jsou vynikající a váš tým a nadšený. Pak přichází nevyhnutelná otázka: "Kdy to můžeme použít ve výrobě?". A to je místo, kde se mnoho inženýrů ML dostává do potíží: mezera mezi notebookem Jupyter a službou HTTP je škálovatelné, spolehlivé a monitorovatelné a mnohem širší, než se zdá.
FastAPI a stal se de facto standardem pro model Python sloužící v letech 2024–2025, s více než 80 miliony stažení měsíčně na PyPI. Jeho kombinace tipů nativního typu, ověření automatické přes Pydantic, automaticky generovanou dokumentaci OpenAPI a nativní podporu async lo je ideální pro vytváření produkčních inferenčních API. Obklopený Uvicorn (vysoce výkonný server ASGI) a osvědčené postupy kontejnerizace Docker, FastAPI umožňuje přinést model scikit-learn, PyTorch nebo Hugging Face do výroby během několika hodin.
V této příručce vytvoříme kompletní službu poskytování modelů: od základního koncového bodu predikce až po asynchronní odvození s dávkováním, od zdravotních kontrol po monitorování pomocí Prometheus a Grafana, až po na kontejnerizované a škálovatelné nasazení na Docker a Kubernetes. Každý příklad je otestován a připraven pro použití v reálném prostředí.
Co se naučíte
- Strukturujte aplikaci FastAPI pro obsluhu modelů se správou životního cyklu
- Implementujte synchronní a asynchronní odvození s fondy vláken pro úlohy vázané na CPU
- Spravujte dynamické dávkování, abyste maximalizovali propustnost GPU/CPU
- Přidejte kontroly stavu, sondy připravenosti a monitorování Prometheus
- Kontejnerujte s vícestupňovými sestavami Docker a optimalizujte je pro produkci
- Porovnejte FastAPI s BentoML, TorchServe a Triton Inference Server
- Implementujte zátěžové testování s Locustem pro ověření výkonu
proč FastAPI for Model Serving
Než se pustíte do kódu, stojí za to pochopit, proč FastAPI získalo toto dominantní postavení v poskytování modelu Python. Srovnání s Flask, tradiční volbou, je poučné.
Flask používá WSGI (Web Server Gateway Interface), synchronní, blokující architekturu. Každá žádost zabírá vlákno serveru, dokud nebude dokončeno. U modelů, které vyžadují pouhých 50 ms z inference, baňka se 4 pracovníky zvládá asi 80 req/s, než začne degradovat. FastAPI používá ASGI (Asynchronous Server Gateway Interface), což umožňuje jedinému procesu spravovat tisíce souběžných připojení neblokujícím způsobem. S Uvicornem a 4 dělníky, stejný hardware snadno zvládá 500+ req/s pro lehké odvození.
Upozornění: Async automaticky neznamená rychlejší pro odvození
Častou chybou je definování koncového bodu predikce jako async def a pak zavolat
přímo model. Odvození ML je vázáno na CPU (nebo GPU): spusťte jej v asynchronním vláknu
main blokuje smyčku událostí a zhoršuje výkon. Správným řešením je použití
asyncio.get_event_loop().run_in_executor() nebo Starlette's
run_in_threadpool() spustit odvození v samostatném vlákně.
Nastavení projektu
Začněme strukturou projektu. Dobrá organizace kódu je nezbytná udržovatelnost ve výrobě.
# Struttura del progetto
ml-serving/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app e lifecycle
│ ├── models/
│ │ ├── __init__.py
│ │ ├── predictor.py # Wrapper del modello ML
│ │ └── schemas.py # Pydantic schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── predict.py # Endpoint predizione
│ │ └── health.py # Health check endpoints
│ └── middleware/
│ ├── __init__.py
│ └── metrics.py # Prometheus metrics
├── tests/
│ ├── test_predict.py
│ └── test_health.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── locustfile.py
Pojďme nainstalovat potřebné závislosti:
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.10.0
scikit-learn==1.5.2
numpy==1.26.4
pandas==2.2.3
joblib==1.4.2
prometheus-fastapi-instrumentator==7.0.0
prometheus-client==0.21.0
httpx==0.28.0 # per test async
python-multipart==0.0.20
# Installazione
pip install -r requirements.txt
Aplikace FastAPI se správou životního cyklu
Jádrem obsluhy modelu je načíst model pouze jednou při spuštění aplikace, ne na každou žádost. FastAPI 0.93+ zavádí i kontextové manažery životnosti, moderní a čistý způsob správy zdrojů, které je třeba inicializovat při spuštění e uvolněna při odstávce.
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
import logging
import time
from app.models.predictor import ModelPredictor
from app.routers import predict, health
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Stato globale dell'applicazione (immutabile dopo init)
class AppState:
def __init__(self):
self.predictor: ModelPredictor | None = None
self.model_load_time: float = 0.0
self.model_version: str = ""
app_state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Gestione lifecycle: load all'avvio, cleanup allo shutdown"""
# Startup
logger.info("Avvio applicazione - caricamento modello...")
start_time = time.time()
try:
app_state.predictor = ModelPredictor(
model_path="models/churn_model.pkl",
scaler_path="models/scaler.pkl"
)
app_state.model_load_time = time.time() - start_time
app_state.model_version = app_state.predictor.get_version()
logger.info(
f"Modello caricato in {app_state.model_load_time:.2f}s "
f"(versione: {app_state.model_version})"
)
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise RuntimeError(f"Impossibile avviare il servizio: {e}")
yield # L'app e in esecuzione
# Shutdown
logger.info("Shutdown applicazione - cleanup risorse...")
app_state.predictor = None
# Inizializzazione FastAPI
app = FastAPI(
title="ML Model Serving API",
description="Production-ready inference API con FastAPI e Uvicorn",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
# CORS (configura per il tuo ambiente)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In prod: specifica i domini
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus instrumentation
Instrumentator().instrument(app).expose(app)
# Router
app.include_router(predict.router, prefix="/api/v1", tags=["prediction"])
app.include_router(health.router, tags=["health"])
# Dependency injection dello stato
app.state.app_state = app_state
Model Predictor: ML Model Wrapper
Il ModelPredictor a srdcem služby. Zapouzdřte model ML pomocí a
Čisté rozhraní, zvládá předběžné zpracování vstupů a následné zpracování vstupů
výstup a poskytuje užitečná metadata pro monitorování.
# app/models/predictor.py
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import logging
from typing import Any
import hashlib
import time
logger = logging.getLogger(__name__)
class ModelPredictor:
"""Wrapper produzione-ready per modelli scikit-learn.
Responsabilità:
- Caricamento e validazione del modello
- Pre/post processing degli input/output
- Raccolta metriche di performance
"""
def __init__(self, model_path: str, scaler_path: str):
model_file = Path(model_path)
scaler_file = Path(scaler_path)
if not model_file.exists():
raise FileNotFoundError(f"Modello non trovato: {model_path}")
if not scaler_file.exists():
raise FileNotFoundError(f"Scaler non trovato: {scaler_path}")
self._model = joblib.load(model_file)
self._scaler = joblib.load(scaler_file)
self._model_hash = self._compute_hash(model_file)
self._load_timestamp = time.time()
# Feature names attese (definite al training)
self._feature_names = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_phone_service", "has_internet",
"contract_type", "payment_method"
]
logger.info(f"ModelPredictor inizializzato - hash: {self._model_hash[:8]}")
def predict(self, features: dict[str, Any]) -> dict[str, Any]:
"""Predizione singola con timing e validazione."""
start_time = time.perf_counter()
# Pre-processing
df = self._preprocess(features)
# Inference
prediction = self._model.predict(df)[0]
probability = self._model.predict_proba(df)[0].tolist()
inference_time_ms = (time.perf_counter() - start_time) * 1000
return {
"prediction": int(prediction),
"probability": {
"no_churn": round(probability[0], 4),
"churn": round(probability[1], 4)
},
"inference_time_ms": round(inference_time_ms, 2),
"model_version": self.get_version()
}
def predict_batch(
self,
batch: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Predizione batch ottimizzata (una sola chiamata al modello)."""
start_time = time.perf_counter()
# Costruisci DataFrame dall'intero batch
rows = [self._preprocess(item).iloc[0] for item in batch]
df_batch = pd.DataFrame(rows)
# Inference batch (una sola chiamata)
predictions = self._model.predict(df_batch)
probabilities = self._model.predict_proba(df_batch)
inference_time_ms = (time.perf_counter() - start_time) * 1000
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"prediction": int(pred),
"probability": {
"no_churn": round(float(prob[0]), 4),
"churn": round(float(prob[1]), 4)
},
"batch_index": i
})
logger.info(
f"Batch inference: {len(batch)} items in {inference_time_ms:.1f}ms "
f"({inference_time_ms/len(batch):.2f}ms/item)"
)
return results
def _preprocess(self, features: dict[str, Any]) -> pd.DataFrame:
"""Preprocessing input: validazione, encoding, scaling."""
df = pd.DataFrame([features])
# Encoding categorico
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
df["contract_type"] = df["contract_type"].map(contract_map).fillna(0)
df["payment_method"] = df["payment_method"].map(payment_map).fillna(0)
# Seleziona features nell'ordine corretto
df = df[self._feature_names]
# Scaling
df_scaled = self._scaler.transform(df)
return pd.DataFrame(df_scaled, columns=self._feature_names)
def get_version(self) -> str:
return self._model_hash[:12]
def get_metadata(self) -> dict[str, Any]:
return {
"model_hash": self._model_hash[:12],
"load_timestamp": self._load_timestamp,
"feature_names": self._feature_names,
"model_type": type(self._model).__name__
}
@staticmethod
def _compute_hash(file_path: Path) -> str:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
Pydantická schémata: Ověření vstupu
Pydantic v2 (výchozí ve FastAPI 0.100+) nabízí ultra rychlé ověření díky přepisování v Rustu. Definování přísných schémat chrání model před nesprávně tvarovanými vstupy a poskytuje automatická dokumentace API.
# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal
from enum import Enum
class ContractType(str, Enum):
MONTH_TO_MONTH = "month-to-month"
ONE_YEAR = "one-year"
TWO_YEAR = "two-year"
class PaymentMethod(str, Enum):
ELECTRONIC = "electronic"
MAILED = "mailed"
BANK = "bank"
CREDIT = "credit"
class PredictionRequest(BaseModel):
"""Schema input per predizione churn singola."""
tenure_months: int = Field(
..., ge=0, le=120,
description="Mesi di tenure del cliente"
)
monthly_charges: float = Field(
..., ge=0, le=500,
description="Addebito mensile in EUR"
)
total_charges: float = Field(
..., ge=0,
description="Addebito totale storico in EUR"
)
num_products: int = Field(
..., ge=1, le=10,
description="Numero di prodotti sottoscritti"
)
has_phone_service: bool = Field(
..., description="Il cliente ha servizio telefonico"
)
has_internet: bool = Field(
..., description="Il cliente ha servizio internet"
)
contract_type: ContractType = Field(
..., description="Tipo di contratto"
)
payment_method: PaymentMethod = Field(
..., description="Metodo di pagamento"
)
@model_validator(mode='after')
def validate_total_charges(self) -> 'PredictionRequest':
"""total_charges non può essere minore di monthly_charges."""
if self.total_charges < self.monthly_charges:
raise ValueError(
f"total_charges ({self.total_charges}) non può essere "
f"minore di monthly_charges ({self.monthly_charges})"
)
return self
model_config = {
"json_schema_extra": {
"example": {
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
}
}
}
class PredictionResponse(BaseModel):
"""Schema output per predizione singola."""
prediction: Literal[0, 1]
probability: dict[str, float]
inference_time_ms: float
model_version: str
class BatchPredictionRequest(BaseModel):
"""Schema input per batch prediction (max 100 items)."""
items: list[PredictionRequest] = Field(
..., min_length=1, max_length=100,
description="Lista di richieste da processare in batch"
)
class BatchPredictionResponse(BaseModel):
"""Schema output per batch prediction."""
results: list[dict]
batch_size: int
total_inference_time_ms: float
Koncové body predikce: Synchronní a asynchronní
Implementujeme koncové body predikce podle správného vzoru pro úlohy vázané na CPU: odvození se provádí v samostatném fondu vláken, aby neblokovala smyčku asynchronních událostí.
# app/routers/predict.py
from fastapi import APIRouter, Depends, HTTPException, Request
from starlette.concurrency import run_in_threadpool
import logging
import time
from app.models.predictor import ModelPredictor
from app.models.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse
)
from app.middleware.metrics import (
PREDICTION_COUNTER, PREDICTION_LATENCY,
BATCH_SIZE_HISTOGRAM, ERROR_COUNTER
)
logger = logging.getLogger(__name__)
router = APIRouter()
def get_predictor(request: Request) -> ModelPredictor:
"""Dependency injection del predictor."""
predictor = request.app.state.app_state.predictor
if predictor is None:
raise HTTPException(
status_code=503,
detail="Modello non disponibile - servizio in fase di avvio"
)
return predictor
@router.post(
"/predict",
response_model=PredictionResponse,
summary="Predizione singola",
description="Inferenza su un singolo cliente per churn prediction"
)
async def predict_single(
request: PredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> PredictionResponse:
"""
Endpoint di predizione singola.
Usa run_in_threadpool per eseguire l'inference CPU-bound
senza bloccare l'event loop async.
"""
try:
# CORRETTO: esegui task CPU-bound in threadpool
result = await run_in_threadpool(
predictor.predict,
request.model_dump()
)
# Aggiorna metriche Prometheus
PREDICTION_COUNTER.labels(
model_version=result["model_version"],
outcome="success"
).inc()
PREDICTION_LATENCY.observe(result["inference_time_ms"] / 1000)
return PredictionResponse(**result)
except Exception as e:
ERROR_COUNTER.labels(endpoint="predict", error_type=type(e).__name__).inc()
logger.error(f"Errore predizione: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Errore durante l'inference: {str(e)}"
)
@router.post(
"/predict/batch",
response_model=BatchPredictionResponse,
summary="Predizione batch",
description="Inferenza batch ottimizzata (max 100 items per request)"
)
async def predict_batch(
batch_request: BatchPredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> BatchPredictionResponse:
"""
Endpoint batch: una sola chiamata al modello per N items.
Throughput 3-5x superiore rispetto a N chiamate singole.
"""
start_time = time.perf_counter()
batch_size = len(batch_request.items)
try:
items_dicts = [item.model_dump() for item in batch_request.items]
results = await run_in_threadpool(
predictor.predict_batch,
items_dicts
)
total_time_ms = (time.perf_counter() - start_time) * 1000
BATCH_SIZE_HISTOGRAM.observe(batch_size)
PREDICTION_COUNTER.labels(
model_version=predictor.get_version(),
outcome="success_batch"
).inc(batch_size)
return BatchPredictionResponse(
results=results,
batch_size=batch_size,
total_inference_time_ms=round(total_time_ms, 2)
)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict_batch",
error_type=type(e).__name__
).inc()
logger.error(f"Errore batch inference: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
Zdravotní prohlídky: živost a připravenost
V nasazení Kubernetes (nebo Docker Compose s kontrolou stavu) rozlišujte mezi sonda živosti (je proces živý?) e sonda připravenosti (je služba připravena přijímat provoz?) a nezbytné pro správný provoz směrování a postupné nasazení.
# app/routers/health.py
from fastapi import APIRouter, Request
from pydantic import BaseModel
import time
import psutil
import os
router = APIRouter()
class HealthResponse(BaseModel):
status: str
timestamp: float
uptime_seconds: float
class ReadinessResponse(BaseModel):
status: str
model_loaded: bool
model_version: str
model_load_time_seconds: float
memory_usage_mb: float
cpu_percent: float
_start_time = time.time()
@router.get(
"/health",
response_model=HealthResponse,
summary="Liveness probe",
tags=["health"]
)
async def liveness() -> HealthResponse:
"""
Liveness probe: verifica che il processo sia attivo.
Kubernetes usa questo per decidere se riavviare il pod.
Risponde sempre 200 se il processo e in esecuzione.
"""
return HealthResponse(
status="alive",
timestamp=time.time(),
uptime_seconds=round(time.time() - _start_time, 1)
)
@router.get(
"/health/ready",
response_model=ReadinessResponse,
summary="Readiness probe",
tags=["health"]
)
async def readiness(request: Request) -> ReadinessResponse:
"""
Readiness probe: verifica che il servizio sia pronto a ricevere traffico.
Kubernetes usa questo per il load balancing.
Risponde 503 se il modello non e ancora caricato.
"""
from fastapi import HTTPException
app_state = request.app.state.app_state
model_loaded = app_state.predictor is not None
# Metriche sistema
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=0.1)
response = ReadinessResponse(
status="ready" if model_loaded else "not_ready",
model_loaded=model_loaded,
model_version=app_state.model_version if model_loaded else "",
model_load_time_seconds=round(app_state.model_load_time, 3),
memory_usage_mb=round(memory_mb, 1),
cpu_percent=round(cpu_percent, 1)
)
if not model_loaded:
raise HTTPException(status_code=503, detail=response.model_dump())
return response
@router.get("/metrics/model", tags=["health"])
async def model_metrics(request: Request):
"""Metadata e statistiche del modello caricato."""
app_state = request.app.state.app_state
if app_state.predictor is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="Modello non disponibile")
return app_state.predictor.get_metadata()
Monitoring s Prometheus a Grafana
Monitorování služby ML v produkci daleko přesahuje standardní metriky HTTP.
Chceme sledovat inferenční latenci, distribuci predikcí, chybovost
a využití zdrojů. Knihovna prometheus-fastapi-instrumentator
poskytuje základní HTTP metriky; přidáváme vlastní metriky specifické pro ML.
# app/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Contatore predizioni per versione modello e outcome
PREDICTION_COUNTER = Counter(
"ml_predictions_total",
"Numero totale di predizioni eseguite",
["model_version", "outcome"]
)
# Latenza inference (in secondi)
PREDICTION_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Durata inference in secondi",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
# Dimensione batch
BATCH_SIZE_HISTOGRAM = Histogram(
"ml_batch_size",
"Dimensione delle richieste batch",
buckets=[1, 5, 10, 25, 50, 100]
)
# Contatore errori
ERROR_COUNTER = Counter(
"ml_errors_total",
"Numero totale di errori",
["endpoint", "error_type"]
)
# Distribuzione predizioni (gauge aggiornato periodicamente)
CHURN_RATE_GAUGE = Gauge(
"ml_churn_rate_rolling",
"Tasso di churn predetto (finestra mobile 1000 predizioni)"
)
# Utilizzo memoria modello
MODEL_MEMORY_GAUGE = Gauge(
"ml_model_memory_bytes",
"Memoria utilizzata dal modello ML"
)
class PredictionTracker:
"""Tracker per statistiche rolling delle predizioni."""
def __init__(self, window_size: int = 1000):
self._window_size = window_size
self._predictions: list[int] = []
def record(self, prediction: int) -> None:
self._predictions.append(prediction)
if len(self._predictions) > self._window_size:
self._predictions = self._predictions[-self._window_size:]
# Aggiorna gauge churn rate
if self._predictions:
churn_rate = sum(self._predictions) / len(self._predictions)
CHURN_RATE_GAUGE.set(churn_rate)
# Istanza globale
prediction_tracker = PredictionTracker()
Nastavení Docker Compose pro celý zásobník monitorování:
# docker-compose.yml
version: "3.9"
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.pkl
- SCALER_PATH=/app/models/scaler.pkl
- LOG_LEVEL=INFO
volumes:
- ./models:/app/models:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: "2.0"
prometheus:
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=mlops2025
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards:ro
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "ml-api"
static_configs:
- targets: ["ml-api:8000"]
metrics_path: "/metrics"
scrape_interval: 10s
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
Vícefázové sestavení Dockerfile
Produkčně optimalizovaný Dockerfile používá vícestupňová stavba oddělit závislosti sestavení od runtime, což výrazně snižuje velikost výsledného obrázku (od ~2GB do ~400MB pro scikit-learn).
# Dockerfile
# Stage 1: Builder - installa dipendenze
FROM python:3.12-slim AS builder
WORKDIR /build
# Installa tool di build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
# Copia requirements e installa in directory separata
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime - immagine finale minimale
FROM python:3.12-slim AS runtime
# Utente non-root per sicurezza
RUN useradd --create-home --shell /bin/bash mlserving
WORKDIR /app
# Copia dipendenze dal builder
COPY --from=builder /install /usr/local
# Copia il codice applicativo
COPY --chown=mlserving:mlserving app/ ./app/
# Crea directory modelli (i modelli sono montati come volume)
RUN mkdir -p /app/models && chown mlserving:mlserving /app/models
USER mlserving
# Healthcheck integrato
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import httpx; r = httpx.get('http://localhost:8000/health'); exit(0 if r.status_code == 200 else 1)"
# Esposizione porta
EXPOSE 8000
# Avvio con Uvicorn: 4 worker, timeouts produzione
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--timeout-keep-alive", "30",
"--access-log",
"--log-level", "info"]
Kolik Uvicorn Workers ve výrobě?
Základní pravidlo e 2 x CPU jádra + 1. Pro modul se 2 vCPU použijte 5 pracovníků. Upozornění: každý pracovník si nahraje kopii modelu do paměti. S modelem 500 MB a 4 pracovníci, kontejner vyžaduje přibližně 2 GB RAM. Vyrovnejte počet pracovníků s pamětí k dispozici. U velkých modelů (LLM) je často nejlepší volbou 1 pracovník s dávkováním.
BentoML: Specializovaný rámec pro poskytování modelů
Zatímco FastAPI je vynikající pro obecné služby, BentoML a byl navržen speciálně pro modelování a automaticky řeší mnoho problémů, které ve FastAPI musíte řídit ručně: dynamické dávkování, integrované verzování modelu, abstrakce běžců pro nezávislé škálování odvození a automatické generování souborů Dockerfile e Kubernetes manifestuje.
# bentoml_service.py
import bentoml
import numpy as np
import pandas as pd
from bentoml.io import JSON
from pydantic import BaseModel, Field
from typing import Annotated
# 1. Salva il modello nel BentoML Model Store
# (esegui una sola volta dopo il training)
def save_model_to_store(sklearn_model, scaler):
"""Salva modello e scaler nel BentoML registry locale."""
bento_model = bentoml.sklearn.save_model(
"churn_classifier",
sklearn_model,
signatures={
"predict": {"batchable": True, "batch_dim": 0},
"predict_proba": {"batchable": True, "batch_dim": 0},
},
custom_objects={"scaler": scaler},
metadata={
"framework": "scikit-learn",
"task": "churn_prediction",
"training_date": "2025-01-15",
"metrics": {"auc_roc": 0.89, "f1": 0.82}
}
)
print(f"Modello salvato: {bento_model.tag}")
return bento_model
# 2. Definisci il Runner (inferenza scalabile)
# Il runner e l'astrazione che gestisce il modello ML
churn_runner = bentoml.sklearn.get("churn_classifier:latest").to_runner()
# 3. Definisci i Pydantic schemas
class ChurnRequest(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: str
payment_method: str
class ChurnResponse(BaseModel):
churn_prediction: int
churn_probability: float
model_tag: str
# 4. Definisci il Service BentoML
svc = bentoml.Service(
name="churn-prediction-service",
runners=[churn_runner]
)
@svc.api(
input=JSON(pydantic_model=ChurnRequest),
output=JSON(pydantic_model=ChurnResponse),
route="/predict"
)
async def predict(request: ChurnRequest) -> ChurnResponse:
"""Predizione churn con BentoML - batching automatico."""
# BentoML gestisce il batching automaticamente
# quando batchable=True e configurato nel runner
# Prepara features (stesso preprocessing del training)
features = preprocess(request)
# Chiamata async al runner (BentoML gestisce threading/batching)
prediction = await churn_runner.predict.async_run(features)
probability = await churn_runner.predict_proba.async_run(features)
return ChurnResponse(
churn_prediction=int(prediction[0]),
churn_probability=round(float(probability[0][1]), 4),
model_tag=str(bentoml.sklearn.get("churn_classifier:latest").tag)
)
def preprocess(request: ChurnRequest) -> np.ndarray:
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
return np.array([[
request.tenure_months,
request.monthly_charges,
request.total_charges,
request.num_products,
int(request.has_phone_service),
int(request.has_internet),
contract_map.get(request.contract_type, 0),
payment_map.get(request.payment_method, 0)
]])
Nasazení s BentoML ve třech příkazech:
# 1. Build del Bento (artifact deployabile)
bentoml build
# Output:
# Successfully built Bento(tag="churn-prediction-service:a1b2c3d4")
# Bento size: 245MB (modello + codice + deps)
# 2. Genera immagine Docker automaticamente
bentoml containerize churn-prediction-service:latest
# 3. Avvia il container
docker run -p 3000:3000 churn-prediction-service:latest
# Oppure: deploy su BentoCloud (managed)
# bentoml deploy churn-prediction-service:latest --name prod-churn
Dynamické dávkování: Maximalizace propustnosti
Il dynamické dávkování a technika, která shromažďuje více příchozích požadavků a zpracuje je společně v jediném volání modelu. U GPU to platí obzvlášť efektivní, protože GPU je navržen pro paralelní operace na velkých dávkách. Na CPU je výhoda menší, ale stále významná pro modely s vysokou fixní režií.
# app/batching/dynamic_batcher.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class PendingRequest:
"""Una singola richiesta in attesa di essere processata in batch."""
data: dict[str, Any]
future: asyncio.Future
arrival_time: float
class DynamicBatcher:
"""
Dynamic batcher per ML inference.
Raccoglie richieste per max_wait_ms millisecondi (o fino a
max_batch_size richieste) e poi le processa insieme.
Parametri da tuner per il proprio use case:
- max_batch_size: dipende dalla memoria GPU/CPU disponibile
- max_wait_ms: tradeoff tra latenza singola e throughput
"""
def __init__(
self,
predictor,
max_batch_size: int = 32,
max_wait_ms: float = 10.0
):
self._predictor = predictor
self._max_batch_size = max_batch_size
self._max_wait_ms = max_wait_ms
self._queue: deque[PendingRequest] = deque()
self._lock = asyncio.Lock()
self._batch_task: asyncio.Task | None = None
async def predict(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Aggiunge la richiesta alla coda e attende il risultato.
Chiamata concorrente - sicura per uso multi-thread.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
pending = PendingRequest(
data=data,
future=future,
arrival_time=time.perf_counter()
)
async with self._lock:
self._queue.append(pending)
# Avvia il task di batch processing se non e già in esecuzione
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(
self._process_batch()
)
return await future
async def _process_batch(self) -> None:
"""Processa un batch di richieste."""
# Attendi max_wait_ms o fino a max_batch_size richieste
await asyncio.sleep(self._max_wait_ms / 1000)
async with self._lock:
if not self._queue:
return
# Prendi fino a max_batch_size richieste dalla coda
batch = []
while self._queue and len(batch) < self._max_batch_size:
batch.append(self._queue.popleft())
if not batch:
return
# Processa il batch (CPU-bound in threadpool)
try:
from starlette.concurrency import run_in_threadpool
items = [req.data for req in batch]
results = await run_in_threadpool(
self._predictor.predict_batch,
items
)
# Distribuisci i risultati ai rispettivi futures
for pending_req, result in zip(batch, results):
if not pending_req.future.done():
pending_req.future.set_result(result)
wait_times = [
(time.perf_counter() - req.arrival_time) * 1000
for req in batch
]
logger.info(
f"Batch processato: {len(batch)} items, "
f"attesa media {sum(wait_times)/len(wait_times):.1f}ms"
)
except Exception as e:
logger.error(f"Errore batch processing: {e}")
for pending_req in batch:
if not pending_req.future.done():
pending_req.future.set_exception(e)
Zátěžové testování s Locustem
Před zahájením výroby je nezbytné ověřit výkon služby při skutečné zátěži. Locust a standardní nástroj Python pro zátěžové testování, s intuitivním DSL pro simulaci komplexního uživatelského chování.
# locustfile.py
from locust import HttpUser, task, between
import json
import random
SAMPLE_REQUESTS = [
{
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
},
{
"tenure_months": 60,
"monthly_charges": 45.0,
"total_charges": 2700.0,
"num_products": 2,
"has_phone_service": True,
"has_internet": False,
"contract_type": "two-year",
"payment_method": "bank"
},
]
class MLApiUser(HttpUser):
"""Simula un utente che chiama l'API di predizione."""
wait_time = between(0.1, 0.5) # Attesa tra richieste: 100-500ms
@task(weight=8)
def predict_single(self):
"""80% delle richieste: predizione singola."""
payload = random.choice(SAMPLE_REQUESTS)
with self.client.post(
"/api/v1/predict",
json=payload,
catch_response=True
) as response:
if response.status_code == 200:
data = response.json()
if "prediction" not in data:
response.failure("Response mancante campo 'prediction'")
else:
response.failure(f"Status code: {response.status_code}")
@task(weight=2)
def predict_batch(self):
"""20% delle richieste: batch prediction (10 items)."""
batch_size = random.randint(5, 20)
payload = {
"items": [
random.choice(SAMPLE_REQUESTS)
for _ in range(batch_size)
]
}
with self.client.post(
"/api/v1/predict/batch",
json=payload,
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Batch failed: {response.status_code}")
@task(weight=1)
def health_check(self):
"""Health check periodico."""
self.client.get("/health/ready")
# Avvio load test:
# locust --headless --users 100 --spawn-rate 10 \
# --host http://localhost:8000 --run-time 2m \
# --html report.html
Srovnání rámců: Kdy použít co
Výběr rámce pro poskytování závisí na kontextu. Zde je praktický návod:
| Rámce | Ideální případ použití | Pro | Proti | Latence (p99) |
|---|---|---|---|---|
| FastAPI + Uvicorn | Vlastní rozhraní API, mikroslužby, tým Pythonu | Maximální flexibilita, bohatý ekosystém, výborná dokumentace | Žádné automatické dávkování, manuální sledování | 5-20 ms |
| BentoML | Balení modelů, tým zaměřený na ML | Automatické dávkování, integrovaný model úložiště, automatický Docker/K8s gen | Nadzemní rámec, křivka učení | 8-30 ms |
| TorchServe | Modely PyTorch ve výrobě | Optimalizováno pro PyTorch, TorchScript podporu, multi-model | Pouze PyTorch, interní prvky založené na Javě | 3-15 ms |
| Triton Inference Server | Vysoká propustnost obsluhy GPU | Maximální výkon GPU, TensorRT, multi-framework | Vysoká složitost, vyžaduje GPU NVIDIA | 1–5 ms (GPU) |
| Modely MLflow | Rychlé prototypování, tým MLflow | Nativní integrace MLflow, nulová konfigurace | Nevhodné pro vysoký provoz, omezené přizpůsobení | 20-100 ms |
Doporučení pro malé a střední podniky (rozpočet < 5 000 EUR/rok)
Pro většinu italských malých a středních podniků počínaje modelovým podáváním je zásobník FastAPI + Uvicorn + Docker + Prometheus + Grafana a optimální volba: a 100% open-source, nevyžaduje specializované dovednosti v rámcích ML, škálování snadno s Kubernetes v případě potřeby a má obrovskou komunitu pro podporu. BentoML stojí za to prozkoumat, když má tým více modelů ke správě a chce automatizovat balení. Triton a TorchServe jsou relevantní pouze s vyhrazenými GPU a požadavky na latenci pod 5 ms.
Osvědčené postupy a anti-vzorce
Po zhlédnutí úplné implementace shrňme kritické osvědčené postupy a nejběžnější anti-vzory v modelovém provozu s FastAPI.
Anti-vzorce, kterým se absolutně vyhnout
- Načtěte model s každým požadavkem: načítání trvá 1-10 sekund a ničí výkony. Vždy používejte kontextový manažer životnosti.
- Zavolejte model v async def bez run_in_threadpool: blokovat smyčka událostí a dělá službu de facto jednovláknovou.
- Žádné ověření vstupu: může způsobit anomální hodnota nejasné výjimky v modelu. Vždy používejte Pydantic s přísnými omezeními.
- Žádná zdravotní kontrola: Kubernetes začne odesílat provozu před načtením modelu, což způsobí 500 chyb při studeném startu.
- Příliš podrobné protokoly v horké cestě: zaznamenat každou předpověď na úrovni INFO se může stát sám o sobě úzkým hrdlem při vysokém provozu. Použijte DEBUG pro jednotlivé předpovědi, INFO pro souhrnné statistiky.
Základní doporučené postupy
-
Verze API: vždy používejte předponu
/api/v1/. Když aktualizujete model s přerušením změn ve vstupech, zvyšte a/api/v2/udržování v1 aktivní pro kompatibilitu se stávajícími klienty. -
Explicitní časové limity: nakonfigurovat časový limit odvození (např. 5 sekund)
s
asyncio.wait_for()abyste zabránili zahlcení fondu vláken pomalými požadavky. -
Jističe: implementovat jistič k zastavení
odesílat požadavky na model, když chybovost překročí prahovou hodnotu (např. 50 %
za 60 sekund). Knihovna
pybreakera dobrá volba. -
Elegantní vypnutí: nakonfigurovat Uvicorn pomocí
--timeout-graceful-shutdown 30k dokončení probíhajících požadavků, než se kontejner vypne. -
Strukturované protokolování: USA
structlognebo formátu JSON pro výrobní deníky. Usnadňuje integraci s Elasticsearch/Loki.
Spusťte službu
Jakmile je vše implementováno, spusťte službu ve vývoji a výrobě a jednoduše:
# Sviluppo locale (hot reload)
uvicorn app.main:app --reload --port 8000
# Produzione con Uvicorn diretto (senza Docker)
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 30 \
--access-log \
--log-level info \
--timeout-graceful-shutdown 30
# Con Docker Compose (raccomandato per produzione locale)
docker compose up -d
# Verifica il servizio
curl http://localhost:8000/health/ready
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 24, "monthly_charges": 65.5, "total_charges": 1572.0, "num_products": 3, "has_phone_service": true, "has_internet": true, "contract_type": "month-to-month", "payment_method": "electronic"}'
# Documentazione API interattiva
# http://localhost:8000/docs (Swagger UI)
# http://localhost:8000/redoc (ReDoc)
# http://localhost:8000/metrics (Prometheus metrics)
# http://localhost:3000 (Grafana dashboard)
Závěry a další kroky
V této příručce jsme vytvořili službu modelového servisu připraveného pro výrobu FastAPI a Uvicorn: od správy životního cyklu po monitorování pomocí Prometheus, od dávkování Dynamický Dockerfile optimalizovaný s vícestupňovým sestavováním. Také jsme viděli BentoML jako specializovanou alternativu a porovnal hlavní dostupné rámce v roce 2025.
Kompletní kód v této příručce, včetně testů, předkonfigurovaného řídicího panelu Grafana a manifesty Kubernetes a jsou dostupné v úložišti GitHub řady MLOps. Stoh FastAPI + Uvicorn + Docker + Prometheus pokrývá drtivou většinu modelových případů použití pro týmy až 20-30 ML inženýrů s náklady na obsahová infrastruktura a maximální flexibilita.
Další přirozený krok poté, co si osvojíte podávání a hle škálování na Kubernetes: Nasazení s Horizontal Pod Autoscaler, spravujte více modelová vydání s kanárkovými verzemi a orchestrace složitých ML potrubí s KubeFlow. To uvidíme v dalším článku seriálu.
Řada MLOps: Související články
- MLOps: Od experimentu k produkci - Základy životního cyklu ML
- ML kanál s CI/CD: GitHub Actions + Docker - Automatizujte školení a nasazení
- Sledování experimentů s MLflow - Správa experimentů a registrů modelů
- Škálování ML na Kubernetes - Další v řadě: škálování a orchestrace







