Obsługa modeli ML: FastAPI + Uvicorn w produkcji
Wytrenowałeś model, który przewyższa wszystkie wartości bazowe, wskaźniki MLflow są doskonałe, a Twój zespół i entuzjastyczny. Następnie pojawia się nieuniknione pytanie: „Kiedy będziemy mogli wykorzystać go w produkcji?”. I tu właśnie pojawia się problem wielu inżynierów ML: luka pomiędzy notebookiem Jupyter a usługą HTTP jest skalowalny, niezawodny i możliwy do monitorowania oraz znacznie szerszy, niż się wydaje.
FastAPI i stał się de facto standardem obsługi modelu Pythona w latach 2024-2025, z ponad 80 milionami pobrań miesięcznie na PyPI. Kombinacja wskazówek typu natywnego, walidacja automatyczne poprzez Pydantic, automatycznie generowana dokumentacja OpenAPI i natywna obsługa asynchronii lo sprawia, że idealnie nadaje się do tworzenia gotowych do produkcji interfejsów API wnioskowania. Otoczony przez Uvicorn (wysokowydajny serwer ASGI) i najlepsze praktyki konteneryzacji Docker, na które pozwala FastAPI aby w ciągu zaledwie kilku godzin wprowadzić do produkcji model scikit-learn, PyTorch lub Hugging Face.
W tym przewodniku zbudujemy kompletną usługę obsługującą model: od podstawowego punktu końcowego predykcji po wnioskowanie asynchroniczne z przetwarzaniem wsadowym, od kontroli stanu po monitorowanie za pomocą Prometheusa i Grafany, aż do do kontenerowego i skalowalnego wdrożenia w Dockerze i Kubernetesie. Każdy egzemplarz jest przetestowany i gotowy do użytku w rzeczywistych środowiskach.
Czego się nauczysz
- Zbuduj aplikację FastAPI do obsługi modeli z zarządzaniem cyklem życia
- Implementuj wnioskowanie synchroniczne i asynchroniczne za pomocą pul wątków dla zadań związanych z procesorem
- Zarządzaj dynamicznym przetwarzaniem wsadowym, aby zmaksymalizować przepustowość procesora graficznego/procesora
- Dodaj kontrole stanu, sondy gotowości i monitorowanie Prometheus
- Konteneryzuj za pomocą wieloetapowych kompilacji platformy Docker i optymalizuj pod kątem produkcji
- Porównaj FastAPI z BentoML, TorchServe i Triton Inference Server
- Zaimplementuj testy obciążenia za pomocą Locust, aby sprawdzić wydajność
dlaczego FastAPI do obsługi modeli
Zanim przejdziemy do kodu, warto zrozumieć, dlaczego FastAPI wypracowało sobie tę dominującą pozycję w obsłudze modelu Pythona. Porównanie z tradycyjnym wyborem Flask jest pouczające.
Flask wykorzystuje WSGI (Web Server Gateway Interface), synchroniczną architekturę blokującą. Każde żądanie zajmuje wątek serwera aż do jego zakończenia. W przypadku modeli wymagających zaledwie 50 ms wnioskując, Kolba z 4 pracownikami radzi sobie z około 80 wymaganiami na sekundę, zanim zacznie się degradować. FastAPI wykorzystuje ASGI (Asynchronous Server Gateway Interface), umożliwiając pojedynczemu procesowi zarządzaj tysiącami jednoczesnych połączeń w sposób nieblokujący. Z Uvicornem i 4 pracownikami, ten sam sprzęt z łatwością obsługuje ponad 500 żądań na sekundę w przypadku wnioskowania o świetle.
Ostrzeżenie: Async nie oznacza automatycznie szybszego wnioskowania
Częstym błędem jest definiowanie punktu końcowego predykcji jako async def i wtedy zadzwoń
bezpośrednio model. Wnioskowanie ML jest powiązane z procesorem (lub procesorem graficznym): uruchom je w wątku asynchronicznym
main blokuje pętlę zdarzeń i pogarsza wydajność. Prawidłowym rozwiązaniem jest użycie
asyncio.get_event_loop().run_in_executor() lub Starlette
run_in_threadpool() aby przeprowadzić wnioskowanie w osobnym wątku.
Konfiguracja projektu
Zacznijmy od struktury projektu. Dobra organizacja kodu jest niezbędna łatwość konserwacji w produkcji.
# Struttura del progetto
ml-serving/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app e lifecycle
│ ├── models/
│ │ ├── __init__.py
│ │ ├── predictor.py # Wrapper del modello ML
│ │ └── schemas.py # Pydantic schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── predict.py # Endpoint predizione
│ │ └── health.py # Health check endpoints
│ └── middleware/
│ ├── __init__.py
│ └── metrics.py # Prometheus metrics
├── tests/
│ ├── test_predict.py
│ └── test_health.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── locustfile.py
Zainstalujmy niezbędne zależności:
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.10.0
scikit-learn==1.5.2
numpy==1.26.4
pandas==2.2.3
joblib==1.4.2
prometheus-fastapi-instrumentator==7.0.0
prometheus-client==0.21.0
httpx==0.28.0 # per test async
python-multipart==0.0.20
# Installazione
pip install -r requirements.txt
Aplikacja FastAPI z zarządzaniem cyklem życia
Istotą udostępniania modelu jest ładowanie modelu tylko raz podczas uruchamiania aplikacji, nie na każde żądanie. FastAPI 0.93+ wprowadza m.in menedżerowie kontekstu życia, nowoczesny i czysty sposób zarządzania zasobami, które należy inicjować przy uruchomieniu, np zwolniony przy wyłączeniu.
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
import logging
import time
from app.models.predictor import ModelPredictor
from app.routers import predict, health
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Stato globale dell'applicazione (immutabile dopo init)
class AppState:
def __init__(self):
self.predictor: ModelPredictor | None = None
self.model_load_time: float = 0.0
self.model_version: str = ""
app_state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Gestione lifecycle: load all'avvio, cleanup allo shutdown"""
# Startup
logger.info("Avvio applicazione - caricamento modello...")
start_time = time.time()
try:
app_state.predictor = ModelPredictor(
model_path="models/churn_model.pkl",
scaler_path="models/scaler.pkl"
)
app_state.model_load_time = time.time() - start_time
app_state.model_version = app_state.predictor.get_version()
logger.info(
f"Modello caricato in {app_state.model_load_time:.2f}s "
f"(versione: {app_state.model_version})"
)
except Exception as e:
logger.error(f"Errore caricamento modello: {e}")
raise RuntimeError(f"Impossibile avviare il servizio: {e}")
yield # L'app e in esecuzione
# Shutdown
logger.info("Shutdown applicazione - cleanup risorse...")
app_state.predictor = None
# Inizializzazione FastAPI
app = FastAPI(
title="ML Model Serving API",
description="Production-ready inference API con FastAPI e Uvicorn",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
# CORS (configura per il tuo ambiente)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In prod: specifica i domini
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus instrumentation
Instrumentator().instrument(app).expose(app)
# Router
app.include_router(predict.router, prefix="/api/v1", tags=["prediction"])
app.include_router(health.router, tags=["health"])
# Dependency injection dello stato
app.state.app_state = app_state
Predyktor modelu: opakowanie modelu ML
Il ModelPredictor i serce służby. Hermetyzuj model ML za pomocą a
Przejrzysty interfejs, obsługuje wstępne przetwarzanie danych wejściowych i przetwarzanie końcowe danych wejściowych
wyników i zapewnia przydatne metadane do monitorowania.
# app/models/predictor.py
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import logging
from typing import Any
import hashlib
import time
logger = logging.getLogger(__name__)
class ModelPredictor:
"""Wrapper produzione-ready per modelli scikit-learn.
Responsabilità:
- Caricamento e validazione del modello
- Pre/post processing degli input/output
- Raccolta metriche di performance
"""
def __init__(self, model_path: str, scaler_path: str):
model_file = Path(model_path)
scaler_file = Path(scaler_path)
if not model_file.exists():
raise FileNotFoundError(f"Modello non trovato: {model_path}")
if not scaler_file.exists():
raise FileNotFoundError(f"Scaler non trovato: {scaler_path}")
self._model = joblib.load(model_file)
self._scaler = joblib.load(scaler_file)
self._model_hash = self._compute_hash(model_file)
self._load_timestamp = time.time()
# Feature names attese (definite al training)
self._feature_names = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_phone_service", "has_internet",
"contract_type", "payment_method"
]
logger.info(f"ModelPredictor inizializzato - hash: {self._model_hash[:8]}")
def predict(self, features: dict[str, Any]) -> dict[str, Any]:
"""Predizione singola con timing e validazione."""
start_time = time.perf_counter()
# Pre-processing
df = self._preprocess(features)
# Inference
prediction = self._model.predict(df)[0]
probability = self._model.predict_proba(df)[0].tolist()
inference_time_ms = (time.perf_counter() - start_time) * 1000
return {
"prediction": int(prediction),
"probability": {
"no_churn": round(probability[0], 4),
"churn": round(probability[1], 4)
},
"inference_time_ms": round(inference_time_ms, 2),
"model_version": self.get_version()
}
def predict_batch(
self,
batch: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Predizione batch ottimizzata (una sola chiamata al modello)."""
start_time = time.perf_counter()
# Costruisci DataFrame dall'intero batch
rows = [self._preprocess(item).iloc[0] for item in batch]
df_batch = pd.DataFrame(rows)
# Inference batch (una sola chiamata)
predictions = self._model.predict(df_batch)
probabilities = self._model.predict_proba(df_batch)
inference_time_ms = (time.perf_counter() - start_time) * 1000
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"prediction": int(pred),
"probability": {
"no_churn": round(float(prob[0]), 4),
"churn": round(float(prob[1]), 4)
},
"batch_index": i
})
logger.info(
f"Batch inference: {len(batch)} items in {inference_time_ms:.1f}ms "
f"({inference_time_ms/len(batch):.2f}ms/item)"
)
return results
def _preprocess(self, features: dict[str, Any]) -> pd.DataFrame:
"""Preprocessing input: validazione, encoding, scaling."""
df = pd.DataFrame([features])
# Encoding categorico
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
df["contract_type"] = df["contract_type"].map(contract_map).fillna(0)
df["payment_method"] = df["payment_method"].map(payment_map).fillna(0)
# Seleziona features nell'ordine corretto
df = df[self._feature_names]
# Scaling
df_scaled = self._scaler.transform(df)
return pd.DataFrame(df_scaled, columns=self._feature_names)
def get_version(self) -> str:
return self._model_hash[:12]
def get_metadata(self) -> dict[str, Any]:
return {
"model_hash": self._model_hash[:12],
"load_timestamp": self._load_timestamp,
"feature_names": self._feature_names,
"model_type": type(self._model).__name__
}
@staticmethod
def _compute_hash(file_path: Path) -> str:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
Schematy pydantyczne: weryfikacja danych wejściowych
Pydantic v2 (domyślnie w FastAPI 0.100+) oferuje ultraszybką walidację dzięki przepisywania w Ruście. Zdefiniowanie ścisłych schematów chroni model przed zniekształconymi danymi wejściowymi i zapewnia automatyczna dokumentacja API.
# app/models/schemas.py
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal
from enum import Enum
class ContractType(str, Enum):
MONTH_TO_MONTH = "month-to-month"
ONE_YEAR = "one-year"
TWO_YEAR = "two-year"
class PaymentMethod(str, Enum):
ELECTRONIC = "electronic"
MAILED = "mailed"
BANK = "bank"
CREDIT = "credit"
class PredictionRequest(BaseModel):
"""Schema input per predizione churn singola."""
tenure_months: int = Field(
..., ge=0, le=120,
description="Mesi di tenure del cliente"
)
monthly_charges: float = Field(
..., ge=0, le=500,
description="Addebito mensile in EUR"
)
total_charges: float = Field(
..., ge=0,
description="Addebito totale storico in EUR"
)
num_products: int = Field(
..., ge=1, le=10,
description="Numero di prodotti sottoscritti"
)
has_phone_service: bool = Field(
..., description="Il cliente ha servizio telefonico"
)
has_internet: bool = Field(
..., description="Il cliente ha servizio internet"
)
contract_type: ContractType = Field(
..., description="Tipo di contratto"
)
payment_method: PaymentMethod = Field(
..., description="Metodo di pagamento"
)
@model_validator(mode='after')
def validate_total_charges(self) -> 'PredictionRequest':
"""total_charges non può essere minore di monthly_charges."""
if self.total_charges < self.monthly_charges:
raise ValueError(
f"total_charges ({self.total_charges}) non può essere "
f"minore di monthly_charges ({self.monthly_charges})"
)
return self
model_config = {
"json_schema_extra": {
"example": {
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
}
}
}
class PredictionResponse(BaseModel):
"""Schema output per predizione singola."""
prediction: Literal[0, 1]
probability: dict[str, float]
inference_time_ms: float
model_version: str
class BatchPredictionRequest(BaseModel):
"""Schema input per batch prediction (max 100 items)."""
items: list[PredictionRequest] = Field(
..., min_length=1, max_length=100,
description="Lista di richieste da processare in batch"
)
class BatchPredictionResponse(BaseModel):
"""Schema output per batch prediction."""
results: list[dict]
batch_size: int
total_inference_time_ms: float
Punkty końcowe przewidywania: synchroniczne i asynchroniczne
Implementujemy przewidywane punkty końcowe według prawidłowego wzorca dla zadań związanych z procesorem: wnioskowanie jest wykonywane w oddzielnej puli wątków, aby nie blokować pętli zdarzeń asynchronicznych.
# app/routers/predict.py
from fastapi import APIRouter, Depends, HTTPException, Request
from starlette.concurrency import run_in_threadpool
import logging
import time
from app.models.predictor import ModelPredictor
from app.models.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse
)
from app.middleware.metrics import (
PREDICTION_COUNTER, PREDICTION_LATENCY,
BATCH_SIZE_HISTOGRAM, ERROR_COUNTER
)
logger = logging.getLogger(__name__)
router = APIRouter()
def get_predictor(request: Request) -> ModelPredictor:
"""Dependency injection del predictor."""
predictor = request.app.state.app_state.predictor
if predictor is None:
raise HTTPException(
status_code=503,
detail="Modello non disponibile - servizio in fase di avvio"
)
return predictor
@router.post(
"/predict",
response_model=PredictionResponse,
summary="Predizione singola",
description="Inferenza su un singolo cliente per churn prediction"
)
async def predict_single(
request: PredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> PredictionResponse:
"""
Endpoint di predizione singola.
Usa run_in_threadpool per eseguire l'inference CPU-bound
senza bloccare l'event loop async.
"""
try:
# CORRETTO: esegui task CPU-bound in threadpool
result = await run_in_threadpool(
predictor.predict,
request.model_dump()
)
# Aggiorna metriche Prometheus
PREDICTION_COUNTER.labels(
model_version=result["model_version"],
outcome="success"
).inc()
PREDICTION_LATENCY.observe(result["inference_time_ms"] / 1000)
return PredictionResponse(**result)
except Exception as e:
ERROR_COUNTER.labels(endpoint="predict", error_type=type(e).__name__).inc()
logger.error(f"Errore predizione: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Errore durante l'inference: {str(e)}"
)
@router.post(
"/predict/batch",
response_model=BatchPredictionResponse,
summary="Predizione batch",
description="Inferenza batch ottimizzata (max 100 items per request)"
)
async def predict_batch(
batch_request: BatchPredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> BatchPredictionResponse:
"""
Endpoint batch: una sola chiamata al modello per N items.
Throughput 3-5x superiore rispetto a N chiamate singole.
"""
start_time = time.perf_counter()
batch_size = len(batch_request.items)
try:
items_dicts = [item.model_dump() for item in batch_request.items]
results = await run_in_threadpool(
predictor.predict_batch,
items_dicts
)
total_time_ms = (time.perf_counter() - start_time) * 1000
BATCH_SIZE_HISTOGRAM.observe(batch_size)
PREDICTION_COUNTER.labels(
model_version=predictor.get_version(),
outcome="success_batch"
).inc(batch_size)
return BatchPredictionResponse(
results=results,
batch_size=batch_size,
total_inference_time_ms=round(total_time_ms, 2)
)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict_batch",
error_type=type(e).__name__
).inc()
logger.error(f"Errore batch inference: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
Kontrole stanu zdrowia: żywotność i gotowość
We wdrożeniu Kubernetes (lub Docker Compose z kontrolą stanu) należy rozróżnić sonda żywotności (czy proces jest żywy?) e sonda gotowości (czy usługa jest gotowa do odbioru ruchu?) i jest niezbędna do prawidłowego działania routingu i wdrażania stopniowego.
# app/routers/health.py
from fastapi import APIRouter, Request
from pydantic import BaseModel
import time
import psutil
import os
router = APIRouter()
class HealthResponse(BaseModel):
status: str
timestamp: float
uptime_seconds: float
class ReadinessResponse(BaseModel):
status: str
model_loaded: bool
model_version: str
model_load_time_seconds: float
memory_usage_mb: float
cpu_percent: float
_start_time = time.time()
@router.get(
"/health",
response_model=HealthResponse,
summary="Liveness probe",
tags=["health"]
)
async def liveness() -> HealthResponse:
"""
Liveness probe: verifica che il processo sia attivo.
Kubernetes usa questo per decidere se riavviare il pod.
Risponde sempre 200 se il processo e in esecuzione.
"""
return HealthResponse(
status="alive",
timestamp=time.time(),
uptime_seconds=round(time.time() - _start_time, 1)
)
@router.get(
"/health/ready",
response_model=ReadinessResponse,
summary="Readiness probe",
tags=["health"]
)
async def readiness(request: Request) -> ReadinessResponse:
"""
Readiness probe: verifica che il servizio sia pronto a ricevere traffico.
Kubernetes usa questo per il load balancing.
Risponde 503 se il modello non e ancora caricato.
"""
from fastapi import HTTPException
app_state = request.app.state.app_state
model_loaded = app_state.predictor is not None
# Metriche sistema
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=0.1)
response = ReadinessResponse(
status="ready" if model_loaded else "not_ready",
model_loaded=model_loaded,
model_version=app_state.model_version if model_loaded else "",
model_load_time_seconds=round(app_state.model_load_time, 3),
memory_usage_mb=round(memory_mb, 1),
cpu_percent=round(cpu_percent, 1)
)
if not model_loaded:
raise HTTPException(status_code=503, detail=response.model_dump())
return response
@router.get("/metrics/model", tags=["health"])
async def model_metrics(request: Request):
"""Metadata e statistiche del modello caricato."""
app_state = request.app.state.app_state
if app_state.predictor is None:
from fastapi import HTTPException
raise HTTPException(status_code=503, detail="Modello non disponibile")
return app_state.predictor.get_metadata()
Monitoring za pomocą Prometeusza i Grafany
Monitorowanie usługi ML w środowisku produkcyjnym wykracza daleko poza standardowe metryki HTTP.
Chcemy śledzić opóźnienia wnioskowania, rozkład przewidywań, poziom błędów
i wykorzystanie zasobów. Biblioteka prometheus-fastapi-instrumentator
zapewnia podstawowe metryki HTTP; dodajemy niestandardowe metryki specyficzne dla ML.
# app/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary
import time
# Contatore predizioni per versione modello e outcome
PREDICTION_COUNTER = Counter(
"ml_predictions_total",
"Numero totale di predizioni eseguite",
["model_version", "outcome"]
)
# Latenza inference (in secondi)
PREDICTION_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Durata inference in secondi",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
# Dimensione batch
BATCH_SIZE_HISTOGRAM = Histogram(
"ml_batch_size",
"Dimensione delle richieste batch",
buckets=[1, 5, 10, 25, 50, 100]
)
# Contatore errori
ERROR_COUNTER = Counter(
"ml_errors_total",
"Numero totale di errori",
["endpoint", "error_type"]
)
# Distribuzione predizioni (gauge aggiornato periodicamente)
CHURN_RATE_GAUGE = Gauge(
"ml_churn_rate_rolling",
"Tasso di churn predetto (finestra mobile 1000 predizioni)"
)
# Utilizzo memoria modello
MODEL_MEMORY_GAUGE = Gauge(
"ml_model_memory_bytes",
"Memoria utilizzata dal modello ML"
)
class PredictionTracker:
"""Tracker per statistiche rolling delle predizioni."""
def __init__(self, window_size: int = 1000):
self._window_size = window_size
self._predictions: list[int] = []
def record(self, prediction: int) -> None:
self._predictions.append(prediction)
if len(self._predictions) > self._window_size:
self._predictions = self._predictions[-self._window_size:]
# Aggiorna gauge churn rate
if self._predictions:
churn_rate = sum(self._predictions) / len(self._predictions)
CHURN_RATE_GAUGE.set(churn_rate)
# Istanza globale
prediction_tracker = PredictionTracker()
Konfiguracja Docker Compose dla pełnego stosu monitorowania:
# docker-compose.yml
version: "3.9"
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.pkl
- SCALER_PATH=/app/models/scaler.pkl
- LOG_LEVEL=INFO
volumes:
- ./models:/app/models:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: "2.0"
prometheus:
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=mlops2025
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
- ./monitoring/dashboards:/etc/grafana/provisioning/dashboards:ro
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "ml-api"
static_configs:
- targets: ["ml-api:8000"]
metrics_path: "/metrics"
scrape_interval: 10s
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
Wieloetapowa kompilacja pliku Dockerfile
Zoptymalizowany pod kątem produkcji plik Dockerfile używa budowa wieloetapowa aby oddzielić zależności kompilacji od zależności wykonawczych, co znacznie zmniejsza rozmiar końcowego obrazu (od ~2 GB do ~400 MB dla scikit-learn).
# Dockerfile
# Stage 1: Builder - installa dipendenze
FROM python:3.12-slim AS builder
WORKDIR /build
# Installa tool di build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
# Copia requirements e installa in directory separata
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime - immagine finale minimale
FROM python:3.12-slim AS runtime
# Utente non-root per sicurezza
RUN useradd --create-home --shell /bin/bash mlserving
WORKDIR /app
# Copia dipendenze dal builder
COPY --from=builder /install /usr/local
# Copia il codice applicativo
COPY --chown=mlserving:mlserving app/ ./app/
# Crea directory modelli (i modelli sono montati come volume)
RUN mkdir -p /app/models && chown mlserving:mlserving /app/models
USER mlserving
# Healthcheck integrato
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import httpx; r = httpx.get('http://localhost:8000/health'); exit(0 if r.status_code == 200 else 1)"
# Esposizione porta
EXPOSE 8000
# Avvio con Uvicorn: 4 worker, timeouts produzione
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--timeout-keep-alive", "30",
"--access-log",
"--log-level", "info"]
Ilu pracowników Uvicorn pracuje na produkcji?
Praktyczna zasada E 2 x rdzenie procesora + 1. W przypadku podu z 2 procesorami wirtualnymi użyj 5 pracowników. Uwaga: każdy proces roboczy ładuje kopię modelu do pamięci. W przypadku modelu 500 MB i 4 pracowników, kontener wymaga około 2 GB pamięci RAM. Zrównoważ liczbę pracowników z pamięcią dostępne. W przypadku dużych modeli (LLM) często najlepszym wyborem jest 1 pracownik zajmujący się dozowaniem.
BentoML: Specjalistyczne środowisko udostępniania modeli
Chociaż FastAPI doskonale nadaje się do usług ogólnych, BentoML i został zaprojektowany specjalnie do obsługi modeli i automatycznie rozwiązuje wiele problemów występujących w FastAPI musisz zarządzać ręcznie: dynamicznym grupowaniem, zintegrowanym wersjonowaniem modelu, abstrakcją modułu runner do niezależnego skalowania wnioskowania i automatycznego generowania pliku Dockerfile np Manifest Kubernetesa.
# bentoml_service.py
import bentoml
import numpy as np
import pandas as pd
from bentoml.io import JSON
from pydantic import BaseModel, Field
from typing import Annotated
# 1. Salva il modello nel BentoML Model Store
# (esegui una sola volta dopo il training)
def save_model_to_store(sklearn_model, scaler):
"""Salva modello e scaler nel BentoML registry locale."""
bento_model = bentoml.sklearn.save_model(
"churn_classifier",
sklearn_model,
signatures={
"predict": {"batchable": True, "batch_dim": 0},
"predict_proba": {"batchable": True, "batch_dim": 0},
},
custom_objects={"scaler": scaler},
metadata={
"framework": "scikit-learn",
"task": "churn_prediction",
"training_date": "2025-01-15",
"metrics": {"auc_roc": 0.89, "f1": 0.82}
}
)
print(f"Modello salvato: {bento_model.tag}")
return bento_model
# 2. Definisci il Runner (inferenza scalabile)
# Il runner e l'astrazione che gestisce il modello ML
churn_runner = bentoml.sklearn.get("churn_classifier:latest").to_runner()
# 3. Definisci i Pydantic schemas
class ChurnRequest(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: str
payment_method: str
class ChurnResponse(BaseModel):
churn_prediction: int
churn_probability: float
model_tag: str
# 4. Definisci il Service BentoML
svc = bentoml.Service(
name="churn-prediction-service",
runners=[churn_runner]
)
@svc.api(
input=JSON(pydantic_model=ChurnRequest),
output=JSON(pydantic_model=ChurnResponse),
route="/predict"
)
async def predict(request: ChurnRequest) -> ChurnResponse:
"""Predizione churn con BentoML - batching automatico."""
# BentoML gestisce il batching automaticamente
# quando batchable=True e configurato nel runner
# Prepara features (stesso preprocessing del training)
features = preprocess(request)
# Chiamata async al runner (BentoML gestisce threading/batching)
prediction = await churn_runner.predict.async_run(features)
probability = await churn_runner.predict_proba.async_run(features)
return ChurnResponse(
churn_prediction=int(prediction[0]),
churn_probability=round(float(probability[0][1]), 4),
model_tag=str(bentoml.sklearn.get("churn_classifier:latest").tag)
)
def preprocess(request: ChurnRequest) -> np.ndarray:
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
return np.array([[
request.tenure_months,
request.monthly_charges,
request.total_charges,
request.num_products,
int(request.has_phone_service),
int(request.has_internet),
contract_map.get(request.contract_type, 0),
payment_map.get(request.payment_method, 0)
]])
Wdróż za pomocą BentoML za pomocą trzech poleceń:
# 1. Build del Bento (artifact deployabile)
bentoml build
# Output:
# Successfully built Bento(tag="churn-prediction-service:a1b2c3d4")
# Bento size: 245MB (modello + codice + deps)
# 2. Genera immagine Docker automaticamente
bentoml containerize churn-prediction-service:latest
# 3. Avvia il container
docker run -p 3000:3000 churn-prediction-service:latest
# Oppure: deploy su BentoCloud (managed)
# bentoml deploy churn-prediction-service:latest --name prod-churn
Dynamiczne dozowanie: maksymalizacja przepustowości
Il dynamiczne dozowanie oraz technikę zbierającą wiele przychodzących żądań i przetwarza je razem w jednym wywołaniu modelu. Jest to szczególnie prawdziwe w przypadku procesorów graficznych efektywne, ponieważ procesor graficzny jest przeznaczony do operacji równoległych na dużych partiach. W przypadku procesora korzyść jest mniejsza, ale nadal znacząca w przypadku modeli o wysokim stałym obciążeniu.
# app/batching/dynamic_batcher.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class PendingRequest:
"""Una singola richiesta in attesa di essere processata in batch."""
data: dict[str, Any]
future: asyncio.Future
arrival_time: float
class DynamicBatcher:
"""
Dynamic batcher per ML inference.
Raccoglie richieste per max_wait_ms millisecondi (o fino a
max_batch_size richieste) e poi le processa insieme.
Parametri da tuner per il proprio use case:
- max_batch_size: dipende dalla memoria GPU/CPU disponibile
- max_wait_ms: tradeoff tra latenza singola e throughput
"""
def __init__(
self,
predictor,
max_batch_size: int = 32,
max_wait_ms: float = 10.0
):
self._predictor = predictor
self._max_batch_size = max_batch_size
self._max_wait_ms = max_wait_ms
self._queue: deque[PendingRequest] = deque()
self._lock = asyncio.Lock()
self._batch_task: asyncio.Task | None = None
async def predict(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Aggiunge la richiesta alla coda e attende il risultato.
Chiamata concorrente - sicura per uso multi-thread.
"""
loop = asyncio.get_event_loop()
future = loop.create_future()
pending = PendingRequest(
data=data,
future=future,
arrival_time=time.perf_counter()
)
async with self._lock:
self._queue.append(pending)
# Avvia il task di batch processing se non e già in esecuzione
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(
self._process_batch()
)
return await future
async def _process_batch(self) -> None:
"""Processa un batch di richieste."""
# Attendi max_wait_ms o fino a max_batch_size richieste
await asyncio.sleep(self._max_wait_ms / 1000)
async with self._lock:
if not self._queue:
return
# Prendi fino a max_batch_size richieste dalla coda
batch = []
while self._queue and len(batch) < self._max_batch_size:
batch.append(self._queue.popleft())
if not batch:
return
# Processa il batch (CPU-bound in threadpool)
try:
from starlette.concurrency import run_in_threadpool
items = [req.data for req in batch]
results = await run_in_threadpool(
self._predictor.predict_batch,
items
)
# Distribuisci i risultati ai rispettivi futures
for pending_req, result in zip(batch, results):
if not pending_req.future.done():
pending_req.future.set_result(result)
wait_times = [
(time.perf_counter() - req.arrival_time) * 1000
for req in batch
]
logger.info(
f"Batch processato: {len(batch)} items, "
f"attesa media {sum(wait_times)/len(wait_times):.1f}ms"
)
except Exception as e:
logger.error(f"Errore batch processing: {e}")
for pending_req in batch:
if not pending_req.future.done():
pending_req.future.set_exception(e)
Testowanie obciążenia za pomocą Locust
Przed przystąpieniem do produkcji należy zweryfikować działanie usługi pod rzeczywistym obciążeniem. Szarańcza oraz standardowe narzędzie Pythona do testowania obciążenia, z intuicyjnym DSL do symulacji złożonych zachowań użytkowników.
# locustfile.py
from locust import HttpUser, task, between
import json
import random
SAMPLE_REQUESTS = [
{
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
},
{
"tenure_months": 60,
"monthly_charges": 45.0,
"total_charges": 2700.0,
"num_products": 2,
"has_phone_service": True,
"has_internet": False,
"contract_type": "two-year",
"payment_method": "bank"
},
]
class MLApiUser(HttpUser):
"""Simula un utente che chiama l'API di predizione."""
wait_time = between(0.1, 0.5) # Attesa tra richieste: 100-500ms
@task(weight=8)
def predict_single(self):
"""80% delle richieste: predizione singola."""
payload = random.choice(SAMPLE_REQUESTS)
with self.client.post(
"/api/v1/predict",
json=payload,
catch_response=True
) as response:
if response.status_code == 200:
data = response.json()
if "prediction" not in data:
response.failure("Response mancante campo 'prediction'")
else:
response.failure(f"Status code: {response.status_code}")
@task(weight=2)
def predict_batch(self):
"""20% delle richieste: batch prediction (10 items)."""
batch_size = random.randint(5, 20)
payload = {
"items": [
random.choice(SAMPLE_REQUESTS)
for _ in range(batch_size)
]
}
with self.client.post(
"/api/v1/predict/batch",
json=payload,
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Batch failed: {response.status_code}")
@task(weight=1)
def health_check(self):
"""Health check periodico."""
self.client.get("/health/ready")
# Avvio load test:
# locust --headless --users 100 --spawn-rate 10 \
# --host http://localhost:8000 --run-time 2m \
# --html report.html
Porównanie ram: kiedy czego używać
Wybór platformy obsługi zależy od kontekstu. Oto praktyczny przewodnik:
| Ramy | Idealny przypadek użycia | Zawodowiec | Przeciwko | Opóźnienie (p99) |
|---|---|---|---|---|
| FastAPI + Uvicorn | Niestandardowe API, mikroserwisy, zespół Pythona | Maksymalna elastyczność, bogaty ekosystem, doskonała dokumentacja | Brak automatycznego dozowania, ręczne monitorowanie | 5-20 ms |
| BentoML | Opakowanie modelowe, zespół skupiony na uczeniu maszynowym | Automatyczne grupowanie, zintegrowany magazyn modeli, automatyczny Docker/K8s gen | Ramy ogólne, krzywa uczenia się | 8-30 ms |
| Serwis Pochodni | Modele PyTorch w produkcji | Zoptymalizowany dla PyTorch, obsługa TorchScript, wiele modeli | Tylko PyTorch, elementy wewnętrzne oparte na Javie | 3-15 ms |
| Serwer wnioskowania Triton | Obsługa procesora graficznego o wysokiej przepustowości | Maksymalna wydajność procesora graficznego, TensorRT, obsługa wielu platform | Wysoka złożoność wymaga procesora graficznego NVIDIA | 1-5 ms (GPU) |
| Modele MLflow | Szybkie prototypowanie, zespół MLflow | Natywna integracja z MLflow, konfiguracja zerowa | Nie nadaje się do dużego ruchu, ograniczone dostosowywanie | 20-100 ms |
Rekomendacja dla MŚP (Budżet < 5 tys. EUR/rok)
Dla większości włoskich MŚP, począwszy od serwowania modeli, stos FastAPI + Uvicorn + Docker + Prometheus + Grafana i optymalny wybór: i w 100% open-source, nie wymaga specjalistycznych umiejętności w zakresie frameworków ML, skala w razie potrzeby z łatwością korzysta z Kubernetes i ma ogromną społeczność, która zapewnia wsparcie. BentoML warto poznać, gdy zespół ma więcej modeli do zarządzania i chce zautomatyzować pakowanie. Triton i TorchServe są istotne tylko z dedykowanymi procesorami graficznymi i wymagania dotyczące opóźnienia poniżej 5 ms.
Najlepsze praktyki i anty-wzorce
Po obejrzeniu pełnej implementacji podsumujmy najważniejsze najlepsze praktyki oraz najczęstsze antywzorce w modelach obsługujących FastAPI.
Anty-wzorce, których należy bezwzględnie unikać
- Załaduj model przy każdym żądaniu: ładowanie trwa 1-10 sekund i niszczy występy. Zawsze używaj menedżera kontekstu żywotności.
- Wywołaj model w trybie asynchronicznym bez run_in_threadpool: blok pętlę zdarzeń i sprawia, że usługa jest de facto jednowątkowa.
- Brak sprawdzania poprawności danych wejściowych: może spowodować nietypowa wartość niejasne wyjątki w modelu. Zawsze używaj Pydantic ze ścisłymi ograniczeniami.
- Brak gotowości do kontroli stanu: Kubernetes rozpocznie wysyłanie ruchu przed załadowaniem modelu, powodując 500 błędów przy zimnym starcie.
- Zbyt szczegółowe logi w gorącej ścieżce: rejestruj każdą prognozę na poziomie INFO może samo w sobie stać się wąskim gardłem przy dużym natężeniu ruchu. Użyj DEBUGUJ dla pojedynczych przewidywań, INFO dla statystyk zagregowanych.
Podstawowe najlepsze praktyki
-
Wersjonowanie API: zawsze używaj przedrostka
/api/v1/. Kiedy aktualizujesz model, wprowadzając istotne zmiany w danych wejściowych, zwiększ a/api/v2/utrzymywanie wersji 1 aktywnej w celu zapewnienia zgodności z istniejącymi klientami. -
Wyraźne limity czasu: skonfiguruj limit czasu wnioskowania (np. 5 sekund)
z
asyncio.wait_for()aby zapobiec nasyceniu puli wątków przez powolne żądania. -
Wyłączniki automatyczne: zastosować wyłącznik automatyczny, aby zatrzymać
wysyłaj żądania do modelu, gdy poziom błędów przekroczy próg (np. 50%
w 60 sekund). Biblioteka
pybreakeri dobra opcja. -
Eleganckie zamknięcie: skonfiguruj Uvicorn za pomocą
--timeout-graceful-shutdown 30aby zakończyć bieżące żądania przed zamknięciem kontenera. -
Logowanie strukturalne: USA
structloglub w formacie JSON dla dzienników produkcyjnych. Ułatwia integrację z Elasticsearch/Loki.
Uruchom usługę
Gdy wszystko zostanie wdrożone, uruchom usługę w fazie rozwoju i produkcji i po prostu:
# Sviluppo locale (hot reload)
uvicorn app.main:app --reload --port 8000
# Produzione con Uvicorn diretto (senza Docker)
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 30 \
--access-log \
--log-level info \
--timeout-graceful-shutdown 30
# Con Docker Compose (raccomandato per produzione locale)
docker compose up -d
# Verifica il servizio
curl http://localhost:8000/health/ready
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 24, "monthly_charges": 65.5, "total_charges": 1572.0, "num_products": 3, "has_phone_service": true, "has_internet": true, "contract_type": "month-to-month", "payment_method": "electronic"}'
# Documentazione API interattiva
# http://localhost:8000/docs (Swagger UI)
# http://localhost:8000/redoc (ReDoc)
# http://localhost:8000/metrics (Prometheus metrics)
# http://localhost:3000 (Grafana dashboard)
Wnioski i dalsze kroki
W tym przewodniku stworzyliśmy gotowy do produkcji model obsługujący usługę FastAPI i Uvicorn: od zarządzania cyklem życia po monitorowanie za pomocą Prometheusa i grupowanie Dynamiczny plik Dockerfile zoptymalizowany pod kątem wieloetapowej kompilacji. Widzieliśmy także BentoML jako wyspecjalizowaną alternatywę i porównał główne dostępne frameworki w 2025 roku.
Kompletny kod w tym przewodniku, łącznie z testami, wstępnie skonfigurowanym dashboardem Grafana i manifesty Kubernetes i są dostępne w repozytorium GitHub serii MLOps. Stos FastAPI + Uvicorn + Docker + Prometheus obejmuje zdecydowaną większość modeli obsługujących przypadki użycia dla zespołów liczących do 20-30 inżynierów ML, przy kosztach: infrastrukturę treści i maksymalną elastyczność.
Następny naturalny krok po opanowaniu serwowania i lo skalowanie na Kubernetesie: Wdróż za pomocą automatycznego skalowania podów poziomych i zarządzaj więcej wydania modelu z wydaniami kanaryjskimi i orkiestracja złożonych potoków uczenia maszynowego z KubeFlowem. Zobaczymy to w kolejnym artykule z serii.
Seria MLOps: powiązane artykuły
- MLOps: od eksperymentu do produkcji - Podstawy cyklu życia uczenia maszynowego
- Potok ML z CI/CD: GitHub Actions + Docker - Zautomatyzuj szkolenie i wdrażanie
- Śledzenie eksperymentów za pomocą MLflow - Zarządzanie eksperymentami i rejestrami modeli
- Skalowanie ML na Kubernetesie - Następny w serii: skalowanie i orkiestracja







