Serving ML Models: FastAPI, Uvicorn and Containerization in Production
You've trained a model that beats every baseline, MLflow metrics look outstanding, and your team is excited. Then comes the inevitable question: "When can we use it in production?". This is where many ML engineers struggle: the gap between a Jupyter notebook and a scalable, reliable, monitorable HTTP service is far wider than it appears.
FastAPI has become the de facto standard for Python model serving in 2024-2025, with over 80 million monthly downloads on PyPI. Its combination of native type hints, automatic validation via Pydantic, auto-generated OpenAPI documentation, and native async support makes it ideal for building production-ready inference APIs. Paired with Uvicorn (high-performance ASGI server) and Docker containerization best practices, FastAPI lets you deploy a scikit-learn, PyTorch, or Hugging Face model to production in just a few hours.
In this guide, we'll build a complete model serving service from scratch: from a basic prediction endpoint to async inference with batching, from health checks to monitoring with Prometheus and Grafana, all the way to containerized and scalable deployment on Docker and Kubernetes. Every example is tested and ready for use in real production environments.
What You'll Learn
- Structure a FastAPI app for model serving with lifecycle management
- Implement synchronous and asynchronous inference using thread pools for CPU-bound tasks
- Implement dynamic batching to maximize GPU/CPU throughput
- Add health checks, readiness probes, and Prometheus monitoring
- Containerize with Docker multi-stage build and optimize for production
- Compare FastAPI with BentoML, TorchServe, and Triton Inference Server
- Perform load testing with Locust to validate performance under realistic load
Why FastAPI for Model Serving
Before diving into code, it's worth understanding why FastAPI has earned this dominant position in Python model serving. The comparison with Flask, the traditional choice, is illuminating.
Flask uses WSGI (Web Server Gateway Interface), a synchronous blocking architecture. Each request occupies a server thread until it completes. With models that take just 50ms for inference, Flask with 4 workers handles roughly 80 req/sec before degrading. FastAPI uses ASGI (Asynchronous Server Gateway Interface), allowing a single process to handle thousands of concurrent connections in a non-blocking fashion. With Uvicorn and 4 workers, the same hardware easily handles 500+ req/sec for lightweight inference.
Warning: Async Does Not Automatically Mean Faster Inference
A common mistake is defining the prediction endpoint as async def and then calling
the model directly. ML inference is CPU-bound (or GPU-bound): running it on the main async
thread blocks the event loop and effectively makes your service single-threaded. The correct
approach is to use asyncio.get_event_loop().run_in_executor() or Starlette's
run_in_threadpool() to run inference in a separate thread.
Project Setup
Let's start with the project structure. Good code organization is fundamental for production maintainability.
# Project structure
ml-serving/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app and lifecycle
│ ├── models/
│ │ ├── __init__.py
│ │ ├── predictor.py # ML model wrapper
│ │ └── schemas.py # Pydantic schemas
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── predict.py # Prediction endpoints
│ │ └── health.py # Health check endpoints
│ └── middleware/
│ ├── __init__.py
│ └── metrics.py # Prometheus metrics
├── tests/
│ ├── test_predict.py
│ └── test_health.py
├── Dockerfile
├── docker-compose.yml
├── requirements.txt
└── locustfile.py
Install the required dependencies:
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.10.0
scikit-learn==1.5.2
numpy==1.26.4
pandas==2.2.3
joblib==1.4.2
prometheus-fastapi-instrumentator==7.0.0
prometheus-client==0.21.0
httpx==0.28.0 # for async tests
python-multipart==0.0.20
# Installation
pip install -r requirements.txt
FastAPI App with Lifecycle Management
The critical point in model serving is loading the model exactly once at application startup, not on every request. FastAPI 0.93+ introduces lifespan context managers, the modern and clean way to manage resources that need to be initialized at startup and released on shutdown.
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
import logging
import time
from app.models.predictor import ModelPredictor
from app.routers import predict, health
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class AppState:
def __init__(self):
self.predictor: ModelPredictor | None = None
self.model_load_time: float = 0.0
self.model_version: str = ""
app_state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle management: load on startup, cleanup on shutdown"""
# Startup
logger.info("Application starting - loading model...")
start_time = time.time()
try:
app_state.predictor = ModelPredictor(
model_path="models/churn_model.pkl",
scaler_path="models/scaler.pkl"
)
app_state.model_load_time = time.time() - start_time
app_state.model_version = app_state.predictor.get_version()
logger.info(
f"Model loaded in {app_state.model_load_time:.2f}s "
f"(version: {app_state.model_version})"
)
except Exception as e:
logger.error(f"Model loading error: {e}")
raise RuntimeError(f"Cannot start service: {e}")
yield # App is running
# Shutdown
logger.info("Application shutdown - cleaning up resources...")
app_state.predictor = None
# FastAPI initialization
app = FastAPI(
title="ML Model Serving API",
description="Production-ready inference API with FastAPI and Uvicorn",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In prod: specify exact domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Prometheus instrumentation (auto-exposes /metrics)
Instrumentator().instrument(app).expose(app)
app.include_router(predict.router, prefix="/api/v1", tags=["prediction"])
app.include_router(health.router, tags=["health"])
app.state.app_state = app_state
Model Predictor: ML Model Wrapper
The ModelPredictor is the heart of the service. It encapsulates the ML model
with a clean interface, handles input pre-processing and output post-processing, and provides
metadata useful for monitoring and debugging.
# app/models/predictor.py
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
import logging
from typing import Any
import hashlib
import time
logger = logging.getLogger(__name__)
class ModelPredictor:
"""Production-ready wrapper for scikit-learn models.
Responsibilities:
- Model loading and validation
- Input/output pre/post processing
- Performance metrics collection
"""
def __init__(self, model_path: str, scaler_path: str):
model_file = Path(model_path)
scaler_file = Path(scaler_path)
if not model_file.exists():
raise FileNotFoundError(f"Model not found: {model_path}")
if not scaler_file.exists():
raise FileNotFoundError(f"Scaler not found: {scaler_path}")
self._model = joblib.load(model_file)
self._scaler = joblib.load(scaler_file)
self._model_hash = self._compute_hash(model_file)
self._load_timestamp = time.time()
# Feature names expected (defined at training time)
self._feature_names = [
"tenure_months", "monthly_charges", "total_charges",
"num_products", "has_phone_service", "has_internet",
"contract_type", "payment_method"
]
logger.info(f"ModelPredictor initialized - hash: {self._model_hash[:8]}")
def predict(self, features: dict[str, Any]) -> dict[str, Any]:
"""Single prediction with timing and validation."""
start_time = time.perf_counter()
df = self._preprocess(features)
prediction = self._model.predict(df)[0]
probability = self._model.predict_proba(df)[0].tolist()
inference_time_ms = (time.perf_counter() - start_time) * 1000
return {
"prediction": int(prediction),
"probability": {
"no_churn": round(probability[0], 4),
"churn": round(probability[1], 4)
},
"inference_time_ms": round(inference_time_ms, 2),
"model_version": self.get_version()
}
def predict_batch(
self,
batch: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Optimized batch prediction (single model call for N items)."""
start_time = time.perf_counter()
rows = [self._preprocess(item).iloc[0] for item in batch]
df_batch = pd.DataFrame(rows)
# Single inference call for the entire batch
predictions = self._model.predict(df_batch)
probabilities = self._model.predict_proba(df_batch)
inference_time_ms = (time.perf_counter() - start_time) * 1000
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"prediction": int(pred),
"probability": {
"no_churn": round(float(prob[0]), 4),
"churn": round(float(prob[1]), 4)
},
"batch_index": i
})
logger.info(
f"Batch inference: {len(batch)} items in {inference_time_ms:.1f}ms "
f"({inference_time_ms/len(batch):.2f}ms/item)"
)
return results
def _preprocess(self, features: dict[str, Any]) -> pd.DataFrame:
"""Input preprocessing: validation, encoding, scaling."""
df = pd.DataFrame([features])
contract_map = {"month-to-month": 0, "one-year": 1, "two-year": 2}
payment_map = {"electronic": 0, "mailed": 1, "bank": 2, "credit": 3}
df["contract_type"] = df["contract_type"].map(contract_map).fillna(0)
df["payment_method"] = df["payment_method"].map(payment_map).fillna(0)
df = df[self._feature_names]
df_scaled = self._scaler.transform(df)
return pd.DataFrame(df_scaled, columns=self._feature_names)
def get_version(self) -> str:
return self._model_hash[:12]
def get_metadata(self) -> dict[str, Any]:
return {
"model_hash": self._model_hash[:12],
"load_timestamp": self._load_timestamp,
"feature_names": self._feature_names,
"model_type": type(self._model).__name__
}
@staticmethod
def _compute_hash(file_path: Path) -> str:
with open(file_path, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
Pydantic Schemas: Input Validation
Pydantic v2 (default in FastAPI 0.100+) provides ultra-fast validation thanks to its Rust rewrite. Defining strict schemas protects the model from malformed inputs and provides automatic API documentation at no extra cost.
# app/models/schemas.py
from pydantic import BaseModel, Field, model_validator
from typing import Literal
from enum import Enum
class ContractType(str, Enum):
MONTH_TO_MONTH = "month-to-month"
ONE_YEAR = "one-year"
TWO_YEAR = "two-year"
class PaymentMethod(str, Enum):
ELECTRONIC = "electronic"
MAILED = "mailed"
BANK = "bank"
CREDIT = "credit"
class PredictionRequest(BaseModel):
"""Input schema for single churn prediction."""
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0, le=500)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: ContractType
payment_method: PaymentMethod
@model_validator(mode='after')
def validate_total_charges(self) -> 'PredictionRequest':
if self.total_charges < self.monthly_charges:
raise ValueError(
f"total_charges ({self.total_charges}) cannot be less than "
f"monthly_charges ({self.monthly_charges})"
)
return self
model_config = {
"json_schema_extra": {
"example": {
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
}
}
}
class PredictionResponse(BaseModel):
prediction: Literal[0, 1]
probability: dict[str, float]
inference_time_ms: float
model_version: str
class BatchPredictionRequest(BaseModel):
items: list[PredictionRequest] = Field(
..., min_length=1, max_length=100
)
class BatchPredictionResponse(BaseModel):
results: list[dict]
batch_size: int
total_inference_time_ms: float
Prediction Endpoints: Sync and Async
We implement prediction endpoints following the correct pattern for CPU-bound tasks: inference runs in a separate thread pool so it doesn't block the async event loop.
# app/routers/predict.py
from fastapi import APIRouter, Depends, HTTPException, Request
from starlette.concurrency import run_in_threadpool
import logging
import time
from app.models.predictor import ModelPredictor
from app.models.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse
)
from app.middleware.metrics import (
PREDICTION_COUNTER, PREDICTION_LATENCY,
BATCH_SIZE_HISTOGRAM, ERROR_COUNTER
)
logger = logging.getLogger(__name__)
router = APIRouter()
def get_predictor(request: Request) -> ModelPredictor:
"""Dependency injection for the predictor."""
predictor = request.app.state.app_state.predictor
if predictor is None:
raise HTTPException(
status_code=503,
detail="Model unavailable - service is starting up"
)
return predictor
@router.post(
"/predict",
response_model=PredictionResponse,
summary="Single prediction"
)
async def predict_single(
request: PredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> PredictionResponse:
"""
Single prediction endpoint.
Uses run_in_threadpool to execute CPU-bound inference
without blocking the async event loop.
"""
try:
# CORRECT: run CPU-bound task in threadpool
result = await run_in_threadpool(
predictor.predict,
request.model_dump()
)
PREDICTION_COUNTER.labels(
model_version=result["model_version"],
outcome="success"
).inc()
PREDICTION_LATENCY.observe(result["inference_time_ms"] / 1000)
return PredictionResponse(**result)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict",
error_type=type(e).__name__
).inc()
logger.error(f"Prediction error: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Inference error: {str(e)}"
)
@router.post(
"/predict/batch",
response_model=BatchPredictionResponse,
summary="Batch prediction (max 100 items)"
)
async def predict_batch(
batch_request: BatchPredictionRequest,
predictor: ModelPredictor = Depends(get_predictor)
) -> BatchPredictionResponse:
"""
Batch endpoint: single model call for N items.
3-5x higher throughput compared to N individual calls.
"""
start_time = time.perf_counter()
batch_size = len(batch_request.items)
try:
items_dicts = [item.model_dump() for item in batch_request.items]
results = await run_in_threadpool(
predictor.predict_batch,
items_dicts
)
total_time_ms = (time.perf_counter() - start_time) * 1000
BATCH_SIZE_HISTOGRAM.observe(batch_size)
return BatchPredictionResponse(
results=results,
batch_size=batch_size,
total_inference_time_ms=round(total_time_ms, 2)
)
except Exception as e:
ERROR_COUNTER.labels(
endpoint="predict_batch",
error_type=type(e).__name__
).inc()
raise HTTPException(status_code=500, detail=str(e))
Health Checks: Liveness and Readiness
In a Kubernetes deployment, distinguishing between the liveness probe (is the process alive?) and the readiness probe (is the service ready to receive traffic?) is fundamental for correct routing and zero-downtime rolling deployments.
# app/routers/health.py
from fastapi import APIRouter, Request
from pydantic import BaseModel
import time
import psutil
import os
router = APIRouter()
class HealthResponse(BaseModel):
status: str
timestamp: float
uptime_seconds: float
class ReadinessResponse(BaseModel):
status: str
model_loaded: bool
model_version: str
model_load_time_seconds: float
memory_usage_mb: float
cpu_percent: float
_start_time = time.time()
@router.get("/health", response_model=HealthResponse)
async def liveness() -> HealthResponse:
"""
Liveness probe: verifies the process is alive.
Kubernetes uses this to decide whether to restart the pod.
Always returns 200 if the process is running.
"""
return HealthResponse(
status="alive",
timestamp=time.time(),
uptime_seconds=round(time.time() - _start_time, 1)
)
@router.get("/health/ready", response_model=ReadinessResponse)
async def readiness(request: Request) -> ReadinessResponse:
"""
Readiness probe: verifies the service is ready to handle traffic.
Returns 503 if the model is not yet loaded.
Kubernetes uses this for load balancing decisions.
"""
from fastapi import HTTPException
app_state = request.app.state.app_state
model_loaded = app_state.predictor is not None
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=0.1)
response = ReadinessResponse(
status="ready" if model_loaded else "not_ready",
model_loaded=model_loaded,
model_version=app_state.model_version if model_loaded else "",
model_load_time_seconds=round(app_state.model_load_time, 3),
memory_usage_mb=round(memory_mb, 1),
cpu_percent=round(cpu_percent, 1)
)
if not model_loaded:
raise HTTPException(status_code=503, detail=response.model_dump())
return response
Prometheus and Grafana Monitoring
Monitoring an ML service in production goes well beyond standard HTTP metrics. We want to
track inference latency, prediction distribution, error rates, and resource utilization.
The prometheus-fastapi-instrumentator library provides baseline HTTP metrics;
we add custom ML-specific metrics on top.
# app/middleware/metrics.py
from prometheus_client import Counter, Histogram, Gauge
PREDICTION_COUNTER = Counter(
"ml_predictions_total",
"Total number of predictions executed",
["model_version", "outcome"]
)
PREDICTION_LATENCY = Histogram(
"ml_inference_duration_seconds",
"Inference duration in seconds",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
BATCH_SIZE_HISTOGRAM = Histogram(
"ml_batch_size",
"Batch request sizes",
buckets=[1, 5, 10, 25, 50, 100]
)
ERROR_COUNTER = Counter(
"ml_errors_total",
"Total number of errors",
["endpoint", "error_type"]
)
CHURN_RATE_GAUGE = Gauge(
"ml_churn_rate_rolling",
"Predicted churn rate (rolling window of 1000 predictions)"
)
MODEL_MEMORY_GAUGE = Gauge(
"ml_model_memory_bytes",
"Memory used by the ML model"
)
# docker-compose.yml
version: "3.9"
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.pkl
- SCALER_PATH=/app/models/scaler.pkl
- LOG_LEVEL=INFO
volumes:
- ./models:/app/models:ro
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 20s
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: "2.0"
prometheus:
image: prom/prometheus:v2.55.0
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
grafana:
image: grafana/grafana:11.3.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=mlops2025
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- prometheus
volumes:
prometheus-data:
grafana-data:
Optimized Dockerfile with Multi-Stage Build
A production-optimized Dockerfile uses multi-stage build to separate build dependencies from runtime dependencies, significantly reducing the final image size (from ~2GB to ~400MB for scikit-learn workloads).
# Dockerfile
# Stage 1: Builder - install dependencies
FROM python:3.12-slim AS builder
WORKDIR /build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime - minimal final image
FROM python:3.12-slim AS runtime
# Non-root user for security
RUN useradd --create-home --shell /bin/bash mlserving
WORKDIR /app
# Copy dependencies from builder
COPY --from=builder /install /usr/local
# Copy application code
COPY --chown=mlserving:mlserving app/ ./app/
# Create models directory (models are mounted as volumes)
RUN mkdir -p /app/models && chown mlserving:mlserving /app/models
USER mlserving
HEALTHCHECK --interval=30s --timeout=10s --start-period=20s --retries=3 \
CMD python -c "import httpx; r = httpx.get('http://localhost:8000/health'); exit(0 if r.status_code == 200 else 1)"
EXPOSE 8000
# Production Uvicorn: 4 workers, production timeouts
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "4",
"--timeout-keep-alive", "30",
"--access-log",
"--log-level", "info",
"--timeout-graceful-shutdown", "30"]
How Many Uvicorn Workers in Production?
The rule of thumb is 2 x CPU cores + 1. For a pod with 2 vCPUs, use 5 workers. Important caveat: each worker loads a separate copy of the model into memory. With a 500MB model and 4 workers, the container needs roughly 2GB of RAM. For large models (LLMs), 1 worker with dynamic batching is often the better choice - memory efficiency beats concurrency.
BentoML: The Specialized Model Serving Framework
While FastAPI excels as a general-purpose framework, BentoML was designed specifically for model serving and automatically solves many problems you'd manage manually in FastAPI: dynamic batching, integrated model versioning, a runner abstraction for independent inference scaling, and automatic generation of Dockerfiles and Kubernetes manifests.
# bentoml_service.py
import bentoml
import numpy as np
from bentoml.io import JSON
from pydantic import BaseModel, Field
# 1. Save the model to BentoML Model Store
# (run once after training)
bento_model = bentoml.sklearn.save_model(
"churn_classifier",
sklearn_model,
signatures={
"predict": {"batchable": True, "batch_dim": 0},
"predict_proba": {"batchable": True, "batch_dim": 0},
},
custom_objects={"scaler": scaler},
metadata={
"framework": "scikit-learn",
"task": "churn_prediction",
"metrics": {"auc_roc": 0.89, "f1": 0.82}
}
)
# 2. Define the Runner (scalable inference layer)
churn_runner = bentoml.sklearn.get("churn_classifier:latest").to_runner()
# 3. Pydantic schemas
class ChurnRequest(BaseModel):
tenure_months: int = Field(..., ge=0, le=120)
monthly_charges: float = Field(..., ge=0)
total_charges: float = Field(..., ge=0)
num_products: int = Field(..., ge=1, le=10)
has_phone_service: bool
has_internet: bool
contract_type: str
payment_method: str
class ChurnResponse(BaseModel):
churn_prediction: int
churn_probability: float
model_tag: str
# 4. Define the BentoML Service
svc = bentoml.Service(
name="churn-prediction-service",
runners=[churn_runner]
)
@svc.api(
input=JSON(pydantic_model=ChurnRequest),
output=JSON(pydantic_model=ChurnResponse),
route="/predict"
)
async def predict(request: ChurnRequest) -> ChurnResponse:
"""Churn prediction with BentoML - automatic batching."""
features = preprocess(request)
# BentoML handles threading and batching automatically
prediction = await churn_runner.predict.async_run(features)
probability = await churn_runner.predict_proba.async_run(features)
return ChurnResponse(
churn_prediction=int(prediction[0]),
churn_probability=round(float(probability[0][1]), 4),
model_tag=str(bentoml.sklearn.get("churn_classifier:latest").tag)
)
Deploy with BentoML in three commands:
# 1. Build the Bento (deployable artifact)
bentoml build
# Output: Successfully built Bento(tag="churn-prediction-service:a1b2c3d4")
# 2. Automatically generate Docker image
bentoml containerize churn-prediction-service:latest
# 3. Run the container
docker run -p 3000:3000 churn-prediction-service:latest
# Or: deploy to BentoCloud (managed hosting)
# bentoml deploy churn-prediction-service:latest --name prod-churn
Dynamic Batching: Maximizing Throughput
Dynamic batching collects multiple incoming requests and processes them together in a single model call. On GPU, this is particularly effective because GPUs are designed for parallel operations on large batches. On CPU, the benefit is smaller but still meaningful for models with high fixed overhead per call.
# app/batching/dynamic_batcher.py
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Any
@dataclass
class PendingRequest:
data: dict[str, Any]
future: asyncio.Future
arrival_time: float
class DynamicBatcher:
"""
Dynamic batcher for ML inference.
Collects requests for max_wait_ms milliseconds (or until
max_batch_size requests accumulate) then processes them together.
Tuning guidelines:
- max_batch_size: limited by GPU/CPU memory
- max_wait_ms: tradeoff between single latency and throughput
"""
def __init__(
self,
predictor,
max_batch_size: int = 32,
max_wait_ms: float = 10.0
):
self._predictor = predictor
self._max_batch_size = max_batch_size
self._max_wait_ms = max_wait_ms
self._queue: deque[PendingRequest] = deque()
self._lock = asyncio.Lock()
self._batch_task: asyncio.Task | None = None
async def predict(self, data: dict[str, Any]) -> dict[str, Any]:
"""Add request to queue and await result. Thread-safe."""
loop = asyncio.get_event_loop()
future = loop.create_future()
pending = PendingRequest(
data=data,
future=future,
arrival_time=time.perf_counter()
)
async with self._lock:
self._queue.append(pending)
if self._batch_task is None or self._batch_task.done():
self._batch_task = asyncio.create_task(
self._process_batch()
)
return await future
async def _process_batch(self) -> None:
"""Process a batch of queued requests."""
await asyncio.sleep(self._max_wait_ms / 1000)
async with self._lock:
if not self._queue:
return
batch = []
while self._queue and len(batch) < self._max_batch_size:
batch.append(self._queue.popleft())
if not batch:
return
try:
from starlette.concurrency import run_in_threadpool
items = [req.data for req in batch]
results = await run_in_threadpool(
self._predictor.predict_batch,
items
)
for pending_req, result in zip(batch, results):
if not pending_req.future.done():
pending_req.future.set_result(result)
except Exception as e:
for pending_req in batch:
if not pending_req.future.done():
pending_req.future.set_exception(e)
Load Testing with Locust
Before going live, it's essential to validate service performance under realistic load. Locust is the standard Python load testing tool, with an intuitive DSL for simulating complex user behaviors.
# locustfile.py
from locust import HttpUser, task, between
import random
SAMPLE_REQUESTS = [
{
"tenure_months": 24,
"monthly_charges": 65.5,
"total_charges": 1572.0,
"num_products": 3,
"has_phone_service": True,
"has_internet": True,
"contract_type": "month-to-month",
"payment_method": "electronic"
},
{
"tenure_months": 60,
"monthly_charges": 45.0,
"total_charges": 2700.0,
"num_products": 2,
"has_phone_service": True,
"has_internet": False,
"contract_type": "two-year",
"payment_method": "bank"
},
]
class MLApiUser(HttpUser):
wait_time = between(0.1, 0.5)
@task(weight=8)
def predict_single(self):
"""80% of traffic: single predictions."""
payload = random.choice(SAMPLE_REQUESTS)
with self.client.post(
"/api/v1/predict",
json=payload,
catch_response=True
) as response:
if response.status_code == 200:
data = response.json()
if "prediction" not in data:
response.failure("Missing 'prediction' field")
else:
response.failure(f"Status: {response.status_code}")
@task(weight=2)
def predict_batch(self):
"""20% of traffic: batch predictions."""
batch_size = random.randint(5, 20)
payload = {
"items": [
random.choice(SAMPLE_REQUESTS)
for _ in range(batch_size)
]
}
with self.client.post(
"/api/v1/predict/batch",
json=payload,
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Batch failed: {response.status_code}")
@task(weight=1)
def health_check(self):
self.client.get("/health/ready")
# Run load test:
# locust --headless --users 100 --spawn-rate 10 \
# --host http://localhost:8000 --run-time 2m \
# --html report.html
Framework Comparison: When to Use What
The choice of serving framework depends on your context. Here's a practical decision guide:
| Framework | Ideal Use Case | Pros | Cons | p99 Latency |
|---|---|---|---|---|
| FastAPI + Uvicorn | Custom APIs, microservices, Python teams | Maximum flexibility, rich ecosystem, excellent docs | Manual batching, manual monitoring setup | 5-20ms |
| BentoML | Model packaging, ML-focused teams | Auto batching, built-in model store, Docker/K8s generation | Framework overhead, learning curve | 8-30ms |
| TorchServe | PyTorch models in production | Optimized for PyTorch, TorchScript, multi-model | PyTorch only, Java-based internals | 3-15ms |
| Triton Inference Server | High-throughput GPU serving | Maximum GPU performance, TensorRT, multi-framework | High complexity, NVIDIA GPU required | 1-5ms (GPU) |
| MLflow Models | Rapid prototyping, MLflow teams | Native MLflow integration, zero configuration | Not suitable for high traffic, limited customization | 20-100ms |
Recommendation for Small Teams (Budget under 5K EUR/year)
For most small to medium teams starting with model serving, the FastAPI + Uvicorn + Docker + Prometheus + Grafana stack is the optimal choice: it is 100% open-source, requires no specialized ML framework expertise, scales smoothly with Kubernetes when needed, and has a massive community for support. BentoML is worth exploring when your team manages multiple models and wants to automate packaging. Triton and TorchServe become relevant only when you have dedicated GPUs and sub-5ms latency requirements.
Best Practices and Anti-Patterns
Anti-Patterns to Absolutely Avoid
- Loading the model on every request: model loading takes 1-10 seconds and destroys performance. Always use the lifespan context manager.
- Calling the model in async def without run_in_threadpool: this blocks the event loop and effectively makes your service single-threaded.
- No input validation: an anomalous value can cause obscure exceptions deep in the model. Always use Pydantic with strict constraints.
- No readiness health check: Kubernetes will start routing traffic before the model is loaded, causing 500 errors during cold start.
- Overly verbose logging in the hot path: logging every prediction at INFO level can become a bottleneck itself under high traffic. Use DEBUG for individual predictions, INFO for aggregated statistics.
Key Best Practices
-
API versioning: always use the
/api/v1/prefix. When updating the model with breaking changes to the input schema, increment to/api/v2/while keeping v1 active for backward compatibility. -
Explicit timeouts: configure inference timeouts (e.g., 5 seconds)
with
asyncio.wait_for()to prevent slow requests from saturating the thread pool. -
Circuit breaker: implement a circuit breaker to stop sending requests
to the model when the error rate exceeds a threshold (e.g., 50% over 60 seconds).
The
pybreakerlibrary is a solid option. -
Graceful shutdown: configure Uvicorn with
--timeout-graceful-shutdown 30to complete in-flight requests before the container shuts down. -
Structured logging: use
structlogor JSON format for production logs. This enables seamless integration with Elasticsearch or Grafana Loki.
Running the Service
# Local development (with hot reload)
uvicorn app.main:app --reload --port 8000
# Production with direct Uvicorn (no Docker)
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--timeout-keep-alive 30 \
--access-log \
--log-level info \
--timeout-graceful-shutdown 30
# With Docker Compose (recommended for production)
docker compose up -d
# Verify the service
curl http://localhost:8000/health/ready
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"tenure_months": 24, "monthly_charges": 65.5, "total_charges": 1572.0, "num_products": 3, "has_phone_service": true, "has_internet": true, "contract_type": "month-to-month", "payment_method": "electronic"}'
# Available endpoints:
# http://localhost:8000/docs (Swagger UI)
# http://localhost:8000/redoc (ReDoc)
# http://localhost:8000/metrics (Prometheus metrics)
# http://localhost:3000 (Grafana dashboard)
Conclusion and Next Steps
In this guide, we built a production-ready model serving service with FastAPI and Uvicorn: from lifecycle management to Prometheus monitoring, from dynamic batching to an optimized multi-stage Dockerfile. We also explored BentoML as a specialized alternative and compared the main frameworks available in 2025.
The complete code for this guide, including tests, a pre-configured Grafana dashboard, and Kubernetes manifests, is available in the MLOps series GitHub repository. The FastAPI + Uvicorn + Docker + Prometheus stack covers the vast majority of model serving use cases for teams of up to 20-30 ML engineers, with low infrastructure costs and maximum flexibility.
The natural next step after mastering model serving is scaling on Kubernetes: deployment with Horizontal Pod Autoscaler, managing multiple model versions with canary releases, and orchestrating complex ML pipelines with KubeFlow. We'll cover all of this in the next article in the series.
MLOps Series: Related Articles
- MLOps 101: From Experiment to Production - The fundamentals of the ML lifecycle
- ML Pipelines with CI/CD: GitHub Actions and Docker - Automating training and deployment
- Experiment Tracking with MLflow - Managing experiments and model registry
- Scaling ML on Kubernetes - Next in the series: scaling and orchestration







