Servirea modelelor ML: FastAPI + Uvicorn în producție
Ați antrenat un model care depășește toate liniile de bază, valorile pentru MLflow sunt excelente, iar echipa dvs. si entuziast. Apoi vine întrebarea inevitabilă: „Când îl putem folosi în producție?”. Și aici mulți ingineri ML au probleme: decalajul dintre un notebook Jupyter și un serviciu HTTP este scalabil, fiabil și monitorizat și mult mai larg decât pare.
FastAPI și a devenit standardul de facto pentru modelul Python care servește în 2024-2025, cu peste 80 de milioane de descărcări lunare pe PyPI. Combinația sa de indicii de tip nativ, validare automat prin Pydantic, documentație OpenAPI generată automat și suport nativ asincron îl face ideal pentru construirea de API-uri de inferență pregătite pentru producție. Flancat de Uvicorn (server ASGI de înaltă performanță) și cele mai bune practici de containerizare Docker, FastAPI permite pentru a aduce în producție un model scikit-learn, PyTorch sau Hugging Face în doar câteva ore.
În acest ghid vom construi un serviciu complet de servire a modelelor: de la punctul final de predicție de bază la inferență asincronă cu loturi, de la verificări de sănătate la monitorizare cu Prometheus și Grafana, până la la implementare containerizată și scalabilă pe Docker și Kubernetes. Fiecare exemplu este testat și gata pentru utilizare în medii reale.
Ce vei învăța
- Structurați o aplicație FastAPI pentru difuzarea modelelor cu managementul ciclului de viață
- Implementați inferența sincronă și asincronă cu pool-uri de fire pentru sarcini legate de CPU
- Gestionați loturile dinamice pentru a maximiza debitul GPU/CPU
- Adăugați verificări de sănătate, sonde de pregătire și monitorizare Prometheus
- Containerizați cu build-uri în mai multe etape Docker și optimizați pentru producție
- Comparați FastAPI cu BentoML, TorchServe și Triton Inference Server
- Implementați testarea de încărcare cu Locust pentru a valida performanța
de ce FastAPI pentru Model Serving
Înainte de a intra în cod, merită să înțelegeți de ce FastAPI și-a câștigat această poziție dominantă în servirea modelului Python. Comparația cu Flask, alegerea tradițională, este lămuritoare.
Flask folosește WSGI (Web Server Gateway Interface), o arhitectură sincronă, de blocare. Fiecare cerere preia un fir de execuție de server până se finalizează. Cu modele care necesită doar 50 ms de inferență, Flask cu 4 lucrători se ocupă de aproximativ 80 de solicitări/sec înainte de a începe să se degradeze. FastAPI folosește ASGI (Asynchronous Server Gateway Interface), permițând unui singur proces gestionați mii de conexiuni simultane într-un mod neblocant. Cu Uvicorn și 4 muncitori, Același hardware gestionează cu ușurință peste 500 de solicitări/sec pentru inferențe ușoare.
Avertisment: Async nu înseamnă automat mai rapid pentru inferență
O greșeală comună este definirea punctului final de predicție ca async def și apoi sună
direct modelul. Inferența ML este legată de CPU (sau de GPU): rulați-o în firul asincron
principal blochează bucla de evenimente și înrăutățește performanța. Soluția corectă este utilizarea
asyncio.get_event_loop().run_in_executor() sau a lui Starlette
run_in_threadpool() pentru a rula inferența într-un fir separat.
Configurarea proiectului
Să începem cu structura proiectului. O bună organizare a codului este esențială pentru menținerea în producție.
# Struttura del progetto
ml-serving/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app e lifecycle
│ ├── models/
│ │ ├── __init__.py
│ │ ├── predictor.py # Wrapper del modello ML
│ │ └── schemas.py # Pydantic schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── predict.py # Endpoint predizione
│ │ └── health.py # Health check endpoints
│ └── middleware/
│ ├── __init__.py
│ └── metrics.py # Prometheus metrics
├── tests/
│ ├── test_predict.py
│ └── test_health.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── locustfile.py
Să instalăm dependențele necesare:
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.10.0
scikit-learn==1.5.2
numpy==1.26.4
pandas==2.2.3
joblib==1.4.2
prometheus-fastapi-instrumentator==7.0.0
prometheus-client==0.21.0
httpx==0.28.0 # per test async
python-multipart==0.0.20
# Installazione
pip install -r requirements.txt
Aplicația FastAPI cu managementul ciclului de viață
Esența servirii modelului este să încărcați modelul o singură dată când pornește aplicația, nu la orice cerere. FastAPI 0.93+ introduce i managerii de context al duratei de viață, modalitatea modernă și curată de a gestiona resursele care trebuie inițializate la pornire e eliberat la oprire.
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
import logging
import time
from app.models.predictor import ModelPredictor
from app.routers import predict, health
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Stato globale dell'applicazione (immutabile dopo init)
class AppState:
def __init__(self):
self.predictor: ModelPredictor | None = None
self.model_load_time: float = 0.0
self.model_version: str = ""
app_state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Gestione lifecycle: load all'avvio, cleanup allo shutdown"""
# Startup
logger.info("Avvio applicazione - caricamento modello...")
start_time = time.time()
try:
app_state.predictor = ModelPredictor(
model_path="models/churn_model.pkl",
scaler_path="models/scaler.pkl"
)
app_state.model_load_time = time.time() - start_time
app_state.model_version = app_state.predictor.get_version()
logger.info(
f"Modello caricato in {app_state.model_load_time:.2f}s "
f"(versione: {app_state.model_version})"
)
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise RuntimeError(f"Impossibile avviare il servizio: {e}")
yield # L'app e in esecuzione
# Shutdown
logger.info("Shutdown applicazione - cleanup risorse...")
app_state.predictor = None
# Inizializzazione FastAPI
app = FastAPI(
title="ML Model Serving API",
description="Production-ready inference API con FastAPI e Uvicorn",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
# CORS (configura per il tuo ambiente)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In prod: specifica i domini
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus instrumentation
Instrumentator().instrument(app).expose(app)
# Router
app.include_router(predict.router, prefix="/api/v1", tags=["prediction"])
app.include_router(health.router, tags=["health"])
# Dependency injection dello stato
app.state.app_state = app_state
Model Predictor: ML Model Wrapper
Il ModelPredictor și inima serviciului. Încapsulați modelul ML cu a
Interfață curată, se ocupă de pre-procesarea intrărilor și post-procesarea intrărilor
ieșire și oferă metadate utile pentru monitorizare.
# app/models/predictor.py
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import logging
from typing import Any
import hashlib
import time
logger = logging.getLogger(__name__)
class ModelPredictor:
"""Wrapper produzione-ready per modelli scikit-learn.
Responsabilità:
- Caricamento e validazione del modello
- Pre/post processing degli input/output
- Raccolta metriche di performance
"""
def __init__(self, model_path: str, scaler_path: str):
model_file = Path(model_path)
scaler_file = Path(scaler_path)
if not model_file.exists():
raise FileNotFoundError(f"Modello non trovato: {model_path}")
if not scaler_file.exists():
raise FileNotFoundError(f"Scaler non trovato: {scaler_path}")
self._model = joblib.load(model_file)
self._scaler = joblib.load(scaler_file)
self._model_hash = self._compute_hash(model_file)
self._load_timestamp = time.time()
# Feature names attese (definite al training)
self._feature_names = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_phone_service", "has_internet",
"contract_type", "payment_method"
]
logger.info(f"ModelPredictor inizializzato - hash: {self._model_hash[:8]}")
def predict(self, features: dict[str, Any]) -> dict[str, Any]:
"""Predizione singola con timing e validazione."""
start_time = time.perf_counter()
# Pre-processing
df = self._preprocess(features)
# Inference
prediction = self._model.predict(df)[0]
probability = self._model.predict_proba(df)[0].tolist()
inference_time_ms = (time.perf_counter() - start_time) * 1000
return {
"prediction": int(prediction),
"probability": {
"no_churn": round(probability[0], 4),
"churn": round(probability[1], 4)
},
"inference_time_ms": round(inference_time_ms, 2),
"model_version": self.get_version()
}
def predict_batch(
self,
batch: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Predizione batch ottimizzata (una sola chiamata al modello)."""
start_time = time.perf_counter()
# Costruisci DataFrame dall'intero batch
rows = [self._preprocess(item).iloc[0] for item in batch]
df_batch = pd.DataFrame(rows)
# Inference batch (una sola chiamata)
predictions = self._model.predict(df_batch)
probabilities = self._model.predict_proba(df_batch)
inference_time_ms = (time.perf_counter() - start_time) * 1000
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"prediction": int(pred),
"probability": {
"no_churn": round(float(prob[0]), 4),
"churn": round(float(prob[1]), 4)
},
"batch_index": i
})
logger.info(
f"Batch inference: {len(batch)} items in {inference_time_ms:.1f}ms "
f"({inference_time_ms/len(batch):.2f}ms/item)"
)
return results
def _preprocess(self, features: dict[str, Any]) -> pd.DataFrame:
"""Preprocessing input: validazione, encoding, scaling."""
df = pd.DataFrame([features])
# Encoding categorico
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
df["contract_type"] = df["contract_type"].map(contract_map).fillna(0)
df["payment_method"] = df["payment_method"].map(payment_map).fillna(0)
# Seleziona features nell'ordine corretto
df = df[self._feature_names]
# Scaling
df_scaled = self._scaler.transform(df)
return pd.DataFrame(df_scaled, columns=self._feature_names)
def get_version(self) -> str:
return self._model_hash[:12]
def get_metadata(self) -> dict[str, Any]:
return {
"model_hash": self._model_hash[:12],
"load_timestamp": self._load_timestamp,
"feature_names": self._feature_names,
"model_type": type(self._model).__name__
}
@staticmethod
def _compute_hash(file_path: Path) -> str:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
Scheme pidantice: validarea intrărilor
Pydantic v2 (implicit în FastAPI 0.100+) oferă validare ultra-rapidă datorită rescrierii în Rust. Definirea unor scheme stricte protejează modelul de intrări incorecte și oferă documentație automată API.
# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal
from enum import Enum
class ContractType(str, Enum):
MONTH_TO_MONTH = "month-to-month"
ONE_YEAR = "one-year"
TWO_YEAR = "two-year"
class PaymentMethod(str, Enum):
ELECTRONIC = "electronic"
MAILED = "mailed"
BANK = "bank"
CREDIT = "credit"
class PredictionRequest(BaseModel):
"""Schema input per predizione churn singola."""
tenure_months: int = Field(
..., ge=0, le=120,
description="Mesi di tenure del cliente"
)
monthly_charges: float = Field(
..., ge=0, le=500,
description="Addebito mensile in EUR"
)
total_charges: float = Field(
..., ge=0,
description="Addebito totale storico in EUR"
)
num_products: int = Field(
..., ge=1, le=10,
description="Numero di prodotti sottoscritti"
)
has_phone_service: bool = Field(
..., description="Il cliente ha servizio telefonico"
)
has_internet: bool = Field(
..., description="Il cliente ha servizio internet"
)
contract_type: ContractType = Field(
..., description="Tipo di contratto"
)
payment_method: PaymentMethod = Field(
..., description="Metodo di pagamento"
)
@model_validator(mode='after')
def validate_total_charges(self) -> 'PredictionRequest':
"""total_charges non può essere minore di monthly_charges."""
if self.total_charges < self.monthly_charges:
raise ValueError(
f"total_charges ({self.total_charges}) non può essere "
f"minore di monthly_charges ({self.monthly_charges})"
)
return self
model_config = {
"json_schema_extra": {
"example": {
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
}
}
}
class PredictionResponse(BaseModel):
"""Schema output per predizione singola."""
prediction: Literal[0, 1]
probability: dict[str, float]
inference_time_ms: float
model_version: str
class BatchPredictionRequest(BaseModel):
"""Schema input per batch prediction (max 100 items)."""
items: list[PredictionRequest] = Field(
..., min_length=1, max_length=100,
description="Lista di richieste da processare in batch"
)
class BatchPredictionResponse(BaseModel):
"""Schema output per batch prediction."""
results: list[dict]
batch_size: int
total_inference_time_ms: float
Puncte finale de predicție: sincron și asincron
Implementăm punctele finale de predicție urmând modelul corect pentru sarcinile legate de CPU: inferența este efectuată într-un pool de fire separat pentru a nu bloca bucla de evenimente asincrone.
# app/routers/predict.py
from fastapi import APIRouter, Depends, HTTPException, Request
from starlette.concurrency import run_in_threadpool
import logging
import time
from app.models.predictor import ModelPredictor
from app.models.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse
)
from app.middleware.metrics import (
PREDICTION_COUNTER, PREDICTION_LATENCY,
BATCH_SIZE_HISTOGRAM, ERROR_COUNTER
)
logger = logging.getLogger(__name__)
router = APIRouter()
def get_predictor(request: Request) -> ModelPredictor:
"""Dependency injection del predictor."""
predictor = request.app.state.app_state.predictor
if predictor is None:
raise HTTPException(
status_code=503,
detail="Modello non disponibile - servizio in fase di avvio"
)
return predictor
@router.post(
"/predict",
response_model=PredictionResponse,
summary="Predizione singola",
description="Inferenza su un singolo cliente per churn prediction"
)
async def predict_single(
request: PredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> PredictionResponse:
"""
Endpoint di predizione singola.
Usa run_in_threadpool per eseguire l'inference CPU-bound
senza bloccare l'event loop async.
"""
try:
# CORRETTO: esegui task CPU-bound in threadpool
result = await run_in_threadpool(
predictor.predict,
request.model_dump()
)
# Aggiorna metriche Prometheus
PREDICTION_COUNTER.labels(
model_version=result["model_version"],
outcome="success"
).inc()
PREDICTION_LATENCY.observe(result["inference_time_ms"] / 1000)
return PredictionResponse(**result)
except Exception as e:
ERROR_COUNTER.labels(endpoint="predict", error_type=type(e).__name__).inc()
logger.error(f"Errore predizione: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Errore durante l'inference: {str(e)}"
)
@router.post(
"/predict/batch",
response_model=BatchPredictionResponse,
summary="Predizione batch",
description="Inferenza batch ottimizzata (max 100 items per request)"
)
async def predict_batch(
batch_request: BatchPredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> BatchPredictionResponse:
"""
Endpoint batch: una sola chiamata al modello per N items.
Throughput 3-5x superiore rispetto a N chiamate singole.
"""
start_time = time.perf_counter()
batch_size = len(batch_request.items)
try:
items_dicts = [item.model_dump() for item in batch_request.items]
results = await run_in_threadpool(
predictor.predict_batch,
items_dicts
)
total_time_ms = (time.perf_counter() - start_time) * 1000
BATCH_SIZE_HISTOGRAM.observe(batch_size)
PREDICTION_COUNTER.labels(
model_version=predictor.get_version(),
outcome="success_batch"
).inc(batch_size)
return BatchPredictionResponse(
results=results,
batch_size=batch_size,
total_inference_time_ms=round(total_time_ms, 2)
)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict_batch",
error_type=type(e).__name__
).inc()
logger.error(f"Errore batch inference: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
Controale de sănătate: Vioitate și disponibilitate
Într-o implementare Kubernetes (sau Docker Compose cu verificarea stării de sănătate), distinge între sondă de viaţă (procesul este viu?) e sonda de pregătire (serviciul este pregătit să primească trafic?) și esențial pentru funcționarea corectă de rutare și implementare rulantă.
# app/routers/health.py
from fastapi import APIRouter, Request
from pydantic import BaseModel
import time
import psutil
import os
router = APIRouter()
class HealthResponse(BaseModel):
status: str
timestamp: float
uptime_seconds: float
class ReadinessResponse(BaseModel):
status: str
model_loaded: bool
model_version: str
model_load_time_seconds: float
memory_usage_mb: float
cpu_percent: float
_start_time = time.time()
@router.get(
"/health",
response_model=HealthResponse,
summary="Liveness probe",
tags=["health"]
)
async def liveness() -> HealthResponse:
"""
Liveness probe: verifica che il processo sia attivo.
Kubernetes usa questo per decidere se riavviare il pod.
Risponde sempre 200 se il processo e in esecuzione.
"""
return HealthResponse(
status="alive",
timestamp=time.time(),
uptime_seconds=round(time.time() - _start_time, 1)
)
@router.get(
"/health/ready",
response_model=ReadinessResponse,
summary="Readiness probe",
tags=["health"]
)
async def readiness(request: Request) -> ReadinessResponse:
"""
Readiness probe: verifica che il servizio sia pronto a ricevere traffico.
Kubernetes usa questo per il load balancing.
Risponde 503 se il modello non e ancora caricato.
"""
from fastapi import HTTPException
app_state = request.app.state.app_state
model_loaded = app_state.predictor is not None
# Metriche sistema
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=0.1)
response = ReadinessResponse(
status="ready" if model_loaded else "not_ready",
model_loaded=model_loaded,
model_version=app_state.model_version if model_loaded else "",
model_load_time_seconds=round(app_state.model_load_time, 3),
memory_usage_mb=round(memory_mb, 1),
cpu_percent=round(cpu_percent, 1)
)
if not model_loaded:
raise HTTPException(status_code=503, detail=response.model_dump())
return response
@router.get("/metrics/model", tags=["health"])
async def model_metrics(request: Request):
"""Metadata e statistiche del modello caricato."""
app_state = request.app.state.app_state
if app_state.predictor is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="Modello non disponibile")
return app_state.predictor.get_metadata()
Monitorizare cu Prometheus și Grafana
Monitorizarea unui serviciu ML în producție depășește cu mult valorile HTTP standard.
Dorim să urmărim latența de inferență, distribuția predicțiilor, rata de eroare
și utilizarea resurselor. Biblioteca prometheus-fastapi-instrumentator
furnizează valori HTTP de bază; adăugăm valori personalizate specifice ML.
# app/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Contatore predizioni per versione modello e outcome
PREDICTION_COUNTER = Counter(
"ml_predictions_total",
"Numero totale di predizioni eseguite",
["model_version", "outcome"]
)
# Latenza inference (in secondi)
PREDICTION_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Durata inference in secondi",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
# Dimensione batch
BATCH_SIZE_HISTOGRAM = Histogram(
"ml_batch_size",
"Dimensione delle richieste batch",
buckets=[1, 5, 10, 25, 50, 100]
)
# Contatore errori
ERROR_COUNTER = Counter(
"ml_errors_total",
"Numero totale di errori",
["endpoint", "error_type"]
)
# Distribuzione predizioni (gauge aggiornato periodicamente)
CHURN_RATE_GAUGE = Gauge(
"ml_churn_rate_rolling",
"Tasso di churn predetto (finestra mobile 1000 predizioni)"
)
# Utilizzo memoria modello
MODEL_MEMORY_GAUGE = Gauge(
"ml_model_memory_bytes",
"Memoria utilizzata dal modello ML"
)
class PredictionTracker:
"""Tracker per statistiche rolling delle predizioni."""
def __init__(self, window_size: int = 1000):
self._window_size = window_size
self._predictions: list[int] = []
def record(self, prediction: int) -> None:
self._predictions.append(prediction)
if len(self._predictions) > self._window_size:
self._predictions = self._predictions[-self._window_size:]
# Aggiorna gauge churn rate
if self._predictions:
churn_rate = sum(self._predictions) / len(self._predictions)
CHURN_RATE_GAUGE.set(churn_rate)
# Istanza globale
prediction_tracker = PredictionTracker()
Configurarea Docker Compose pentru stiva completă de monitorizare:
# docker-compose.yml
version: "3.9"
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.pkl
- SCALER_PATH=/app/models/scaler.pkl
- LOG_LEVEL=INFO
volumes:
- ./models:/app/models:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: "2.0"
prometheus:
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=mlops2025
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards:ro
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "ml-api"
static_configs:
- targets: ["ml-api:8000"]
metrics_path: "/metrics"
scrape_interval: 10s
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
Construire în mai multe etape Dockerfile
Un Dockerfile optimizat pentru producție utilizează construcție în mai multe etape pentru a separa dependențele de construcție de cele de rulare, reducând semnificativ dimensiunea imaginii finale (de la ~2GB la ~400MB pentru scikit-learn).
# Dockerfile
# Stage 1: Builder - installa dipendenze
FROM python:3.12-slim AS builder
WORKDIR /build
# Installa tool di build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
# Copia requirements e installa in directory separata
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime - immagine finale minimale
FROM python:3.12-slim AS runtime
# Utente non-root per sicurezza
RUN useradd --create-home --shell /bin/bash mlserving
WORKDIR /app
# Copia dipendenze dal builder
COPY --from=builder /install /usr/local
# Copia il codice applicativo
COPY --chown=mlserving:mlserving app/ ./app/
# Crea directory modelli (i modelli sono montati come volume)
RUN mkdir -p /app/models && chown mlserving:mlserving /app/models
USER mlserving
# Healthcheck integrato
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import httpx; r = httpx.get('http://localhost:8000/health'); exit(0 if r.status_code == 200 else 1)"
# Esposizione porta
EXPOSE 8000
# Avvio con Uvicorn: 4 worker, timeouts produzione
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--timeout-keep-alive", "30",
"--access-log",
"--log-level", "info"]
Câți lucrători Uvicorn în producție?
Regula generală e 2 x nuclee CPU + 1. Pentru un pod cu 2 vCPU, utilizați 5 lucrători. Atenție: fiecare lucrător încarcă o copie a modelului în memorie. Cu un model de 500 MB și 4 muncitori, containerul necesită aproximativ 2 GB de RAM. Echilibrați numărul de lucrători cu memorie disponibile. Pentru modelele mari (LLM), adesea 1 lucrător cu dozare este cea mai bună alegere.
BentoML: Cadrul specializat pentru servirea modelelor
În timp ce FastAPI este excelent pentru serviciile generale, BentoML și a fost proiectat special pentru servirea modelelor și rezolvă automat multe probleme care în FastAPI trebuie să gestionați manual: loturi dinamice, versiunea integrată a modelului, abstractizarea runnerului pentru scalarea inferenței independente și generarea automată a fișierelor Docker e Kubernetes se manifestă.
# bentoml_service.py
import bentoml
import numpy as np
import pandas as pd
from bentoml.io import JSON
from pydantic import BaseModel, Field
from typing import Annotated
# 1. Salva il modello nel BentoML Model Store
# (esegui una sola volta dopo il training)
def save_model_to_store(sklearn_model, scaler):
"""Salva modello e scaler nel BentoML registry locale."""
bento_model = bentoml.sklearn.save_model(
"churn_classifier",
sklearn_model,
signatures={
"predict": {"batchable": True, "batch_dim": 0},
"predict_proba": {"batchable": True, "batch_dim": 0},
},
custom_objects={"scaler": scaler},
metadata={
"framework": "scikit-learn",
"task": "churn_prediction",
"training_date": "2025-01-15",
"metrics": {"auc_roc": 0.89, "f1": 0.82}
}
)
print(f"Modello salvato: {bento_model.tag}")
return bento_model
# 2. Definisci il Runner (inferenza scalabile)
# Il runner e l'astrazione che gestisce il modello ML
churn_runner = bentoml.sklearn.get("churn_classifier:latest").to_runner()
# 3. Definisci i Pydantic schemas
class ChurnRequest(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: str
payment_method: str
class ChurnResponse(BaseModel):
churn_prediction: int
churn_probability: float
model_tag: str
# 4. Definisci il Service BentoML
svc = bentoml.Service(
name="churn-prediction-service",
runners=[churn_runner]
)
@svc.api(
input=JSON(pydantic_model=ChurnRequest),
output=JSON(pydantic_model=ChurnResponse),
route="/predict"
)
async def predict(request: ChurnRequest) -> ChurnResponse:
"""Predizione churn con BentoML - batching automatico."""
# BentoML gestisce il batching automaticamente
# quando batchable=True e configurato nel runner
# Prepara features (stesso preprocessing del training)
features = preprocess(request)
# Chiamata async al runner (BentoML gestisce threading/batching)
prediction = await churn_runner.predict.async_run(features)
probability = await churn_runner.predict_proba.async_run(features)
return ChurnResponse(
churn_prediction=int(prediction[0]),
churn_probability=round(float(probability[0][1]), 4),
model_tag=str(bentoml.sklearn.get("churn_classifier:latest").tag)
)
def preprocess(request: ChurnRequest) -> np.ndarray:
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
return np.array([[
request.tenure_months,
request.monthly_charges,
request.total_charges,
request.num_products,
int(request.has_phone_service),
int(request.has_internet),
contract_map.get(request.contract_type, 0),
payment_map.get(request.payment_method, 0)
]])
Implementați cu BentoML în trei comenzi:
# 1. Build del Bento (artifact deployabile)
bentoml build
# Output:
# Successfully built Bento(tag="churn-prediction-service:a1b2c3d4")
# Bento size: 245MB (modello + codice + deps)
# 2. Genera immagine Docker automaticamente
bentoml containerize churn-prediction-service:latest
# 3. Avvia il container
docker run -p 3000:3000 churn-prediction-service:latest
# Oppure: deploy su BentoCloud (managed)
# bentoml deploy churn-prediction-service:latest --name prod-churn
Loturi dinamice: Maximizarea debitului
Il loturi dinamice și o tehnică care colectează mai multe cereri primite și le procesează împreună într-un singur model de apel. Pe GPU, acest lucru este valabil mai ales eficient deoarece GPU-ul este proiectat pentru operațiuni paralele pe loturi mari. Pe CPU, beneficiul este mai mic, dar totuși semnificativ pentru modelele cu supraîncărcare fixă mare.
# app/batching/dynamic_batcher.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class PendingRequest:
"""Una singola richiesta in attesa di essere processata in batch."""
data: dict[str, Any]
future: asyncio.Future
arrival_time: float
class DynamicBatcher:
"""
Dynamic batcher per ML inference.
Raccoglie richieste per max_wait_ms millisecondi (o fino a
max_batch_size richieste) e poi le processa insieme.
Parametri da tuner per il proprio use case:
- max_batch_size: dipende dalla memoria GPU/CPU disponibile
- max_wait_ms: tradeoff tra latenza singola e throughput
"""
def __init__(
self,
predictor,
max_batch_size: int = 32,
max_wait_ms: float = 10.0
):
self._predictor = predictor
self._max_batch_size = max_batch_size
self._max_wait_ms = max_wait_ms
self._queue: deque[PendingRequest] = deque()
self._lock = asyncio.Lock()
self._batch_task: asyncio.Task | None = None
async def predict(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Aggiunge la richiesta alla coda e attende il risultato.
Chiamata concorrente - sicura per uso multi-thread.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
pending = PendingRequest(
data=data,
future=future,
arrival_time=time.perf_counter()
)
async with self._lock:
self._queue.append(pending)
# Avvia il task di batch processing se non e già in esecuzione
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(
self._process_batch()
)
return await future
async def _process_batch(self) -> None:
"""Processa un batch di richieste."""
# Attendi max_wait_ms o fino a max_batch_size richieste
await asyncio.sleep(self._max_wait_ms / 1000)
async with self._lock:
if not self._queue:
return
# Prendi fino a max_batch_size richieste dalla coda
batch = []
while self._queue and len(batch) < self._max_batch_size:
batch.append(self._queue.popleft())
if not batch:
return
# Processa il batch (CPU-bound in threadpool)
try:
from starlette.concurrency import run_in_threadpool
items = [req.data for req in batch]
results = await run_in_threadpool(
self._predictor.predict_batch,
items
)
# Distribuisci i risultati ai rispettivi futures
for pending_req, result in zip(batch, results):
if not pending_req.future.done():
pending_req.future.set_result(result)
wait_times = [
(time.perf_counter() - req.arrival_time) * 1000
for req in batch
]
logger.info(
f"Batch processato: {len(batch)} items, "
f"attesa media {sum(wait_times)/len(wait_times):.1f}ms"
)
except Exception as e:
logger.error(f"Errore batch processing: {e}")
for pending_req in batch:
if not pending_req.future.done():
pending_req.future.set_exception(e)
Testare de încărcare cu Locust
Înainte de a intra în producție, este esențial să validați performanța serviciului sub sarcină reală. Lăcustă și instrumentul standard Python pentru testarea încărcării, cu un DSL intuitiv pentru a simula comportamentele complexe ale utilizatorilor.
# locustfile.py
from locust import HttpUser, task, between
import json
import random
SAMPLE_REQUESTS = [
{
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
},
{
"tenure_months": 60,
"monthly_charges": 45.0,
"total_charges": 2700.0,
"num_products": 2,
"has_phone_service": True,
"has_internet": False,
"contract_type": "two-year",
"payment_method": "bank"
},
]
class MLApiUser(HttpUser):
"""Simula un utente che chiama l'API di predizione."""
wait_time = between(0.1, 0.5) # Attesa tra richieste: 100-500ms
@task(weight=8)
def predict_single(self):
"""80% delle richieste: predizione singola."""
payload = random.choice(SAMPLE_REQUESTS)
with self.client.post(
"/api/v1/predict",
json=payload,
catch_response=True
) as response:
if response.status_code == 200:
data = response.json()
if "prediction" not in data:
response.failure("Response mancante campo 'prediction'")
else:
response.failure(f"Status code: {response.status_code}")
@task(weight=2)
def predict_batch(self):
"""20% delle richieste: batch prediction (10 items)."""
batch_size = random.randint(5, 20)
payload = {
"items": [
random.choice(SAMPLE_REQUESTS)
for _ in range(batch_size)
]
}
with self.client.post(
"/api/v1/predict/batch",
json=payload,
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Batch failed: {response.status_code}")
@task(weight=1)
def health_check(self):
"""Health check periodico."""
self.client.get("/health/ready")
# Avvio load test:
# locust --headless --users 100 --spawn-rate 10 \
# --host http://localhost:8000 --run-time 2m \
# --html report.html
Comparație cadru: Când să folosiți Ce
Alegerea cadrului de servire depinde de context. Iată un ghid practic:
| Cadre | Caz de utilizare ideal | Pro | Împotriva | Latență (p99) |
|---|---|---|---|---|
| FastAPI + Uvicorn | API personalizate, microservicii, echipa Python | Flexibilitate maximă, ecosistem bogat, documentare excelentă | Fără dozare automată, monitorizare manuală | 5-20 ms |
| BentoML | Ambalare model, echipă axată pe ML | Loturi automate, magazin de modele integrat, Docker automat/K8s gen | Cadru general, curba de învățare | 8-30 ms |
| TorchServe | Modele PyTorch în producție | Optimizat pentru PyTorch, suport TorchScript, multi-model | Numai PyTorch, elemente interne bazate pe Java | 3-15 ms |
| Triton Inference Server | Servire GPU cu randament ridicat | Performanță maximă GPU, TensorRT, multi-cadru | Complexitate ridicată, necesită GPU NVIDIA | 1-5 ms (GPU) |
| Modele MLflow | Rapid Prototyping, echipa MLflow | Integrare nativă MLflow, configurație zero | Nu este potrivit pentru trafic mare, personalizare limitată | 20-100 ms |
Recomandare pentru IMM-uri (Buget < 5K EUR/an)
Pentru majoritatea IMM-urilor italiene care încep cu modelul de servire, stiva FastAPI + Uvicorn + Docker + Prometheus + Grafana și alegerea optimă: și 100% open-source, nu necesită abilități specializate în cadre ML, scalare cu ușurință cu Kubernetes atunci când este necesar și are o comunitate imensă pentru asistență. BentoML merită explorat atunci când echipa are mai multe modele de gestionat și dorește ambalarea automată. Triton și TorchServe sunt relevante doar cu GPU-uri dedicate și cerințe de latență sub 5 ms.
Cele mai bune practici și anti-modele
După ce am văzut implementarea completă, să rezumăm cele mai bune practici critice și cele mai comune anti-modele în servirea modelelor cu FastAPI.
Anti-modele de evitat absolut
- Încărcați modelul cu fiecare solicitare: încărcarea durează 1-10 secunde si distruge performantele. Utilizați întotdeauna managerul de context al duratei de viață.
- Apelați modelul în def asincron fără run_in_threadpool: bloc bucla de evenimente și face serviciul de facto cu un singur thread.
- Fără validare de intrare: o valoare anormală poate provoca excepții obscure din model. Utilizați întotdeauna Pydantic cu constrângeri stricte.
- Fără pregătire pentru verificarea sănătății: Kubernetes va începe să trimită trafic înainte ca modelul să fie încărcat, provocând 500 de erori la pornirea la rece.
- Jurnalele prea detaliate în calea caldă: înregistrează fiecare predicție la nivel INFO poate deveni un blocaj de la sine în cazul traficului mare. Utilizați DEBUG pentru predicții unice, INFO pentru statistici agregate.
Cele mai bune practici fundamentale
-
Versiune API: folosiți întotdeauna prefixul
/api/v1/. Când actualizați modelul cu modificări nerespective ale intrărilor, incrementați a/api/v2/menținerea v1 activă pentru compatibilitate cu clienții existenți. -
Timeout-uri explicite: configurați timpul de expirare a inferenței (de exemplu, 5 secunde)
cu
asyncio.wait_for()pentru a preveni ca cererile lente să sature pool-ul de fire. -
Întrerupătoare: implementați un întrerupător pentru a opri
trimiteți cereri către model atunci când rata de eroare depășește un prag (de exemplu, 50%
în 60 de secunde). Biblioteca
pybreakerși o opțiune bună. -
Oprire grațioasă: configura Uvicorn cu
--timeout-graceful-shutdown 30pentru a finaliza cererile în curs înainte ca containerul să se închidă. -
Înregistrare structurată: STATELE UNITE ALE AMERICII
structlogsau format JSON pentru bușteni de producție. Ușurează integrarea cu Elasticsearch/Loki.
Porniți Serviciul
Odată ce totul este implementat, porniți serviciul în dezvoltare și producție și simplu:
# Sviluppo locale (hot reload)
uvicorn app.main:app --reload --port 8000
# Produzione con Uvicorn diretto (senza Docker)
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 30 \
--access-log \
--log-level info \
--timeout-graceful-shutdown 30
# Con Docker Compose (raccomandato per produzione locale)
docker compose up -d
# Verifica il servizio
curl http://localhost:8000/health/ready
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 24, "monthly_charges": 65.5, "total_charges": 1572.0, "num_products": 3, "has_phone_service": true, "has_internet": true, "contract_type": "month-to-month", "payment_method": "electronic"}'
# Documentazione API interattiva
# http://localhost:8000/docs (Swagger UI)
# http://localhost:8000/redoc (ReDoc)
# http://localhost:8000/metrics (Prometheus metrics)
# http://localhost:3000 (Grafana dashboard)
Concluzii și pașii următori
În acest ghid, am construit un serviciu de servire model gata de producție cu FastAPI și Uvicorn: de la managementul ciclului de viață la monitorizarea cu Prometheus, de la loturi Dockerfile dinamic optimizat cu construirea în mai multe etape. Am văzut și noi BentoML ca alternativă specializată și a comparat principalele cadre disponibile în 2025.
Codul complet din acest ghid, inclusiv teste, tabloul de bord Grafana preconfigurat și manifestele Kubernetes și sunt disponibile în depozitul GitHub din seria MLOps. Stiva FastAPI + Uvicorn + Docker + Prometheus acoperă marea majoritate de model care servește cazuri de utilizare pentru echipe de până la 20-30 ML ingineri, cu costuri de infrastructură de conținut și flexibilitate maximă.
Următorul pas natural după ce ai stăpânit servirea și iată scalare pe Kubernetes: Implementați cu Horizontal Pod Autoscaler, gestionați mai multe lansări de model cu versiuni canare și orchestrare de conducte ML complexe cu KubeFlow. O vom vedea în următorul articol al seriei.
Seria MLOps: Articole similare
- MLOps: de la experiment la producție - Fundamentele ciclului de viață ML
- Conductă ML cu CI/CD: GitHub Actions + Docker - Automatizați instruirea și implementarea
- Urmărirea experimentelor cu MLflow - Gestionați experimentele și registrele de modele
- Scalare ML pe Kubernetes - Următorul în serie: scalare și orchestrare







