Async/Await in Python: Capire l'Event Loop, Coroutine e Concorrenza I/O-Bound
Spiegazione pratica di come funziona asyncio in Python: event loop, coroutine, task e la differenza cruciale tra I/O-bound e CPU-bound workload — con benchmark reali su FastAPI.
Il Problema: Python è Single-Threaded
Python ha il Global Interpreter Lock (GIL): solo un thread Python esegue bytecode alla volta. Questo è spesso frainteso. Il GIL non impedisce la concorrenza — impedisce il parallelismo CPU dei thread. Per workload I/O-bound (le API di solito lo sono), asyncio supera i thread in efficienza perché elimina l'overhead del context switching dell'OS.
Cosa Imparerai
- Event loop: il ciclo che governa l'esecuzione asincrona
- Coroutine: funzioni che possono sospendersi e riprendere
- async/await: la sintassi che rende le coroutine leggibili
- Task vs Future: le primitive di asyncio
- I/O-bound vs CPU-bound: quando async aiuta e quando no
- asyncio.gather e asyncio.TaskGroup per concorrenza strutturata
- Benchmark FastAPI sync vs async su workload reali
L'Event Loop: il Cuore di asyncio
L'event loop è un ciclo infinito che gestisce una coda di callback e
coroutine. Quando una coroutine incontra un'operazione I/O (await),
si sospende e restituisce il controllo all'event loop, che può eseguire
altre coroutine nel frattempo. Quando l'I/O completa, la coroutine viene rimessa
in coda per essere ripresa.
# Visualizzazione concettuale dell'event loop (pseudocodice)
#
# Event Loop Iteration:
# 1. Guarda la coda delle callback pronte
# 2. Esegui la prima callback/coroutine
# 3. Se incontra un await su I/O:
# - Registra l'operazione I/O con il sistema operativo (epoll/kqueue/IOCP)
# - Metti la coroutine in "sospensione" (waiting)
# - Torna al passo 1 (esegui la prossima callback disponibile)
# 4. Quando l'I/O completa (notifica OS):
# - Rimetti la coroutine nella coda "pronta"
# 5. Ripeti
# In Python reale:
import asyncio
async def fetch_data(url: str) -> str:
# Simulazione di una richiesta HTTP asincrona
# await sospende questa coroutine finche la risposta non arriva
# L'event loop nel frattempo puo eseguire altre coroutine
await asyncio.sleep(1) # Simula latenza di rete
return f"Data from {url}"
async def main():
# Esecuzione sequenziale: 3 secondi totali
result1 = await fetch_data("https://api1.example.com")
result2 = await fetch_data("https://api2.example.com")
result3 = await fetch_data("https://api3.example.com")
return [result1, result2, result3]
# asyncio.run() crea l'event loop e lo esegue
asyncio.run(main())
Coroutine vs Funzioni Normali
Una funzione normale (def) esegue dall'inizio alla fine senza
interruzioni. Una coroutine (async def) è una funzione che
può sospendersi in punti specifici (await) e cedere il
controllo all'event loop.
import asyncio
import time
# --- FUNZIONE SINCRONA ---
def fetch_sync(url: str) -> str:
time.sleep(1) # BLOCCA l'intero thread per 1 secondo
return f"Data from {url}"
def main_sync():
start = time.time()
results = [
fetch_sync("https://api1.example.com"), # aspetta 1s
fetch_sync("https://api2.example.com"), # aspetta 1s
fetch_sync("https://api3.example.com"), # aspetta 1s
]
print(f"Sync: {time.time() - start:.2f}s") # ~3.00s
return results
# --- COROUTINE ASINCRONA ---
async def fetch_async(url: str) -> str:
await asyncio.sleep(1) # SOSPENDE la coroutine, NON il thread
return f"Data from {url}"
async def main_async():
start = time.time()
# gather esegue le tre coroutine CONCORRENTEMENTE
results = await asyncio.gather(
fetch_async("https://api1.example.com"),
fetch_async("https://api2.example.com"),
fetch_async("https://api3.example.com"),
)
print(f"Async: {time.time() - start:.2f}s") # ~1.00s
return results
# La differenza: 3s vs 1s per lo stesso workload I/O
Task e asyncio.gather
asyncio.gather() è il modo più comune per eseguire
coroutine concorrentemente. Restituisce quando tutte le coroutine completano
(o quando una fallisce, per default).
import asyncio
from typing import Any
# asyncio.gather: esecuzione concorrente di piu coroutine
async def concurrent_fetches():
# Tutte e tre iniziano quasi simultaneamente
results = await asyncio.gather(
fetch_async("https://api1.example.com"),
fetch_async("https://api2.example.com"),
fetch_async("https://api3.example.com"),
return_exceptions=True, # Errori restituiti come valori invece di eccezioni
)
for url, result in zip(["api1", "api2", "api3"], results):
if isinstance(result, Exception):
print(f"{url}: Error - {result}")
else:
print(f"{url}: {result}")
# asyncio.create_task: esecuzione in background
async def background_tasks():
# Task 1 inizia subito
task1 = asyncio.create_task(fetch_async("https://api1.example.com"))
# Fai altro mentre task1 e in background
await asyncio.sleep(0.5) # Simula altro lavoro
# task2 parte dopo 0.5s
task2 = asyncio.create_task(fetch_async("https://api2.example.com"))
# Aspetta entrambi
result1 = await task1
result2 = await task2
return result1, result2
# asyncio.TaskGroup (Python 3.11+): concorrenza strutturata
async def structured_concurrency():
results = []
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_async("https://api1.example.com"))
task2 = tg.create_task(fetch_async("https://api2.example.com"))
task3 = tg.create_task(fetch_async("https://api3.example.com"))
# Qui tutti i task sono completati (o c'e stata un'eccezione)
return [task1.result(), task2.result(), task3.result()]
async def in FastAPI: Quando Usarlo
FastAPI supporta sia async def che def normale per le
route. La scelta dipende da cosa fa la funzione:
# FastAPI: async def vs def
from fastapi import FastAPI
import asyncio
import httpx # Client HTTP asincrono
app = FastAPI()
# USA async def quando:
# - Fai operazioni I/O con librerie async (httpx, asyncpg, aioredis, etc.)
# - Chiami altre coroutine con await
@app.get("/async-example")
async def async_endpoint():
# httpx.AsyncClient e la versione async di requests
async with httpx.AsyncClient() as client:
response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
return response.json()
# USA def normale quando:
# - La funzione e puramente CPU (calcoli, elaborazione in memoria)
# - Usi librerie sincrone che non supportano async
# FastAPI esegue le funzioni sync in un thread pool separato
# per non bloccare l'event loop
@app.get("/sync-example")
def sync_endpoint():
import json
# Operazione CPU-bound: OK in def normale
data = {"numbers": list(range(1000))}
return json.dumps(data)
# CASO CRITICO: MAI fare I/O sincrono bloccante in async def
@app.get("/bad-example")
async def bad_endpoint():
import requests # SBAGLIATO: requests e sincrono
# Questo BLOCCA l'event loop per la durata della richiesta HTTP!
response = requests.get("https://api.example.com") # NON FARE QUESTO
return response.json()
# VERSIONE CORRETTA:
@app.get("/good-example")
async def good_endpoint():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()
Il Pericolo delle Librerie Sincrone in Async
Usare una libreria I/O sincrona (come requests, psycopg2,
o qualsiasi client DB tradizionale) in una coroutine async def blocca
l'intero event loop: nessun'altra richiesta può essere servita finché
l'operazione non completa. Per database usa asyncpg o
SQLAlchemy 2.0 async. Per HTTP usa httpx o
aiohttp. Per Redis usa redis.asyncio.
I/O-Bound vs CPU-Bound: La Distinzione Fondamentale
Async aiuta solo per workload I/O-bound: operazioni dove il
programma aspetta risorse esterne (database, API HTTP, filesystem). Per workload
CPU-bound (machine learning, encoding video, calcoli intensivi)
asyncio non aiuta — serve multiprocessing o un executor.
# Workload I/O-bound: asyncio aiuta molto
async def io_bound_handler():
# Fa 3 chiamate API in ~1 secondo invece di ~3 secondi
results = await asyncio.gather(
fetch_user_from_db(user_id=1), # ~50ms
fetch_user_orders(user_id=1), # ~80ms
fetch_user_preferences(user_id=1), # ~40ms
)
return results # Pronto in ~80ms (il piu lento), non 170ms
# Workload CPU-bound: asyncio NON aiuta, usa ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import asyncio
executor = ProcessPoolExecutor(max_workers=4)
def cpu_intensive_task(data: list) -> list:
# Sorting O(n log n), computazione pura
return sorted(data, key=lambda x: x ** 2)
@app.post("/process")
async def process_data(data: list):
loop = asyncio.get_event_loop()
# run_in_executor esegue la funzione in un processo separato
# senza bloccare l'event loop
result = await loop.run_in_executor(
executor,
cpu_intensive_task,
data,
)
return {"processed": result}
Benchmark: Sync vs Async su FastAPI
Ecco un benchmark realistico che mostra la differenza tra endpoint sync e async su workload I/O-bound con 100 richieste concorrenti:
# Benchmark con httpx e asyncio (script di test)
# pip install httpx
import asyncio
import httpx
import time
async def benchmark(endpoint: str, n_requests: int = 100):
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
start = time.time()
tasks = [client.get(endpoint) for _ in range(n_requests)]
responses = await asyncio.gather(*tasks)
elapsed = time.time() - start
success = sum(1 for r in responses if r.status_code == 200)
rps = n_requests / elapsed
print(f"{endpoint}: {elapsed:.2f}s, {rps:.1f} req/s, {success}/{n_requests} success")
# Endpoint test nel server FastAPI
@app.get("/test/sync")
def sync_test():
import time
time.sleep(0.1) # Simula 100ms DB query
return {"data": "ok"}
@app.get("/test/async")
async def async_test():
await asyncio.sleep(0.1) # Simula 100ms DB query async
return {"data": "ok"}
# Risultati tipici su un server con 4 worker Uvicorn:
# /test/sync: 10.23s, 9.8 req/s (quasi sequenziale!)
# /test/async: 1.05s, 95.2 req/s (quasi perfettamente concorrente)
#
# Con 100ms di latenza simulata:
# Sync: 100 richieste * 100ms = ~10s
# Async: concorrente = ~100ms + overhead
asyncio.run(benchmark("/test/sync"))
asyncio.run(benchmark("/test/async"))
Conclusies
De kracht van Python async in FastAPI is reëel, maar vereist begrip het model: de gebeurtenislus is single-threaded maar niet-blokkerend voor I/O, de coroutines worden geschorst zonder andere handlers en de concurrentie te blokkeren het is coöperatief (niet preventief zoals OS-threads). De volgende stap is begrijp Pydantic v2, dat de gegevensvalidatie biedt waar FastAPI op vertrouwt.
Aankomende artikelen in de FastAPI-serie
- Artikel 3: Pydantic v2 — Geavanceerde validatie, BaseModel en TypeAdapter
- Artikel 4: Afhankelijkheidsinjectie in FastAPI: Depends() en Pattern Clean
- Artikel 5: Asynchrone database met SQLAlchemy 2.0 en Alembic







