Async/Await în Python: înțelegerea buclelor de evenimente, a corutinelor și a concurenței legate de I/O
Explicație practică a modului în care funcționează asyncio în Python: buclă de evenimente, corutine, sarcină și diferența crucială dintre sarcinile de lucru legate de I/O și CPU - cu benchmark-uri real pe FastAPI.
Problema: Python este cu un singur thread
Python are Global Interpreter Lock (GIL): doar un fir Python execută bytecode la un moment dat. Acest lucru este adesea greșit înțeles. GIL nu împiedică concurența — previne Paralelism CPU de fire. Pentru sarcinile de lucru legate de I/O (API-urile sunt de obicei), asyncio depășește firele de execuție în eficiență deoarece elimină suprasarcina de comutare a contextului sistemului de operare.
Ce vei învăța
- Bucla de evenimente: bucla care guvernează execuția asincronă
- Corutine: Funcții care se pot întrerupe și relua
- async/wait: sintaxa care face coroutinele lizibile
- Sarcină vs viitor: primitive asincrone
- I/O-bound vs CPU-bound: când asincron ajută și când nu
- asyncio.gather și asyncio.TaskGroup pentru concurență structurată
- Evaluați sincronizarea FastAPI vs asincron pe sarcini de lucru reale
Bucla de evenimente: inima asincronului
Bucla de evenimente este o buclă infinită care gestionează o coadă de apeluri inverse și
corutine. Când o rutină întâmpină o operație I/O (await),
suspendă și returnează controlul buclei de evenimente, pe care o poate executa
alte corutine intre timp. Când I/O se termină, corutine este resetată
la coadă pentru a fi filmat.
# Visualizzazione concettuale dell'event loop (pseudocodice)
#
# Event Loop Iteration:
# 1. Guarda la coda delle callback pronte
# 2. Esegui la prima callback/coroutine
# 3. Se incontra un await su I/O:
# - Registra l'operazione I/O con il sistema operativo (epoll/kqueue/IOCP)
# - Metti la coroutine in "sospensione" (waiting)
# - Torna al passo 1 (esegui la prossima callback disponibile)
# 4. Quando l'I/O completa (notifica OS):
# - Rimetti la coroutine nella coda "pronta"
# 5. Ripeti
# In Python reale:
import asyncio
async def fetch_data(url: str) -> str:
# Simulazione di una richiesta HTTP asincrona
# await sospende questa coroutine finche la risposta non arriva
# L'event loop nel frattempo puo eseguire altre coroutine
await asyncio.sleep(1) # Simula latenza di rete
return f"Data from {url}"
async def main():
# Esecuzione sequenziale: 3 secondi totali
result1 = await fetch_data("https://api1.example.com")
result2 = await fetch_data("https://api2.example.com")
result3 = await fetch_data("https://api3.example.com")
return [result1, result2, result3]
# asyncio.run() crea l'event loop e lo esegue
asyncio.run(main())
Coroutine vs Funcții normale
O funcție normală (def) rulează de la început până la sfârșit fără
întreruperi. O corutine (async def) este o funcţie care
poate suspenda în anumite puncte (await) si renunta la
controlul buclei de evenimente.
import asyncio
import time
# --- FUNZIONE SINCRONA ---
def fetch_sync(url: str) -> str:
time.sleep(1) # BLOCCA l'intero thread per 1 secondo
return f"Data from {url}"
def main_sync():
start = time.time()
results = [
fetch_sync("https://api1.example.com"), # aspetta 1s
fetch_sync("https://api2.example.com"), # aspetta 1s
fetch_sync("https://api3.example.com"), # aspetta 1s
]
print(f"Sync: {time.time() - start:.2f}s") # ~3.00s
return results
# --- COROUTINE ASINCRONA ---
async def fetch_async(url: str) -> str:
await asyncio.sleep(1) # SOSPENDE la coroutine, NON il thread
return f"Data from {url}"
async def main_async():
start = time.time()
# gather esegue le tre coroutine CONCORRENTEMENTE
results = await asyncio.gather(
fetch_async("https://api1.example.com"),
fetch_async("https://api2.example.com"),
fetch_async("https://api3.example.com"),
)
print(f"Async: {time.time() - start:.2f}s") # ~1.00s
return results
# La differenza: 3s vs 1s per lo stesso workload I/O
Sarcină și asincronizare.adunare
asyncio.gather() este cel mai comun mod de a executa
corutine concomitent. Revine când toate corutinele sunt finalizate
(sau când unul eșuează, implicit).
import asyncio
from typing import Any
# asyncio.gather: esecuzione concorrente di piu coroutine
async def concurrent_fetches():
# Tutte e tre iniziano quasi simultaneamente
results = await asyncio.gather(
fetch_async("https://api1.example.com"),
fetch_async("https://api2.example.com"),
fetch_async("https://api3.example.com"),
return_exceptions=True, # Errori restituiti come valori invece di eccezioni
)
for url, result in zip(["api1", "api2", "api3"], results):
if isinstance(result, Exception):
print(f"{url}: Error - {result}")
else:
print(f"{url}: {result}")
# asyncio.create_task: esecuzione in background
async def background_tasks():
# Task 1 inizia subito
task1 = asyncio.create_task(fetch_async("https://api1.example.com"))
# Fai altro mentre task1 e in background
await asyncio.sleep(0.5) # Simula altro lavoro
# task2 parte dopo 0.5s
task2 = asyncio.create_task(fetch_async("https://api2.example.com"))
# Aspetta entrambi
result1 = await task1
result2 = await task2
return result1, result2
# asyncio.TaskGroup (Python 3.11+): concorrenza strutturata
async def structured_concurrency():
results = []
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_async("https://api1.example.com"))
task2 = tg.create_task(fetch_async("https://api2.example.com"))
task3 = tg.create_task(fetch_async("https://api3.example.com"))
# Qui tutti i task sono completati (o c'e stata un'eccezione)
return [task1.result(), task2.result(), task3.result()]
async def în FastAPI: Când să îl utilizați
FastAPI acceptă ambele async def Că def normal pentru ei
traseu. Alegerea depinde de ceea ce face funcția:
# FastAPI: async def vs def
from fastapi import FastAPI
import asyncio
import httpx # Client HTTP asincrono
app = FastAPI()
# USA async def quando:
# - Fai operazioni I/O con librerie async (httpx, asyncpg, aioredis, etc.)
# - Chiami altre coroutine con await
@app.get("/async-example")
async def async_endpoint():
# httpx.AsyncClient e la versione async di requests
async with httpx.AsyncClient() as client:
response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
return response.json()
# USA def normale quando:
# - La funzione e puramente CPU (calcoli, elaborazione in memoria)
# - Usi librerie sincrone che non supportano async
# FastAPI esegue le funzioni sync in un thread pool separato
# per non bloccare l'event loop
@app.get("/sync-example")
def sync_endpoint():
import json
# Operazione CPU-bound: OK in def normale
data = {"numbers": list(range(1000))}
return json.dumps(data)
# CASO CRITICO: MAI fare I/O sincrono bloccante in async def
@app.get("/bad-example")
async def bad_endpoint():
import requests # SBAGLIATO: requests e sincrono
# Questo BLOCCA l'event loop per la durata della richiesta HTTP!
response = requests.get("https://api.example.com") # NON FARE QUESTO
return response.json()
# VERSIONE CORRETTA:
@app.get("/good-example")
async def good_endpoint():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()
Pericolul bibliotecilor sincrone în asincron
Utilizați o bibliotecă I/O sincronă (cum ar fi requests, psycopg2,
sau orice client DB tradițional) într-o rutină async def bloc
întreaga buclă a evenimentului: nicio altă solicitare nu poate fi servită până când
operațiunea nu se finalizează. Pentru utilizarea bazei de date asyncpg o
SQLAlchemy 2.0 async. Pentru utilizare HTTP httpx o
aiohttp. Pentru utilizarea Redis redis.asyncio.
I/O-Bound vs CPU-Bound: Distincția cheie
Async ajută doar pentru sarcinile de lucru I/O legat: operațiuni în care
programul așteaptă resurse externe (bază de date, HTTP API, sistem de fișiere). După volumul de muncă
Legat de procesor (învățare automată, codare video, calcule intensive)
asyncio nu ajută — ajută multiprocessing sau un executor.
# Workload I/O-bound: asyncio aiuta molto
async def io_bound_handler():
# Fa 3 chiamate API in ~1 secondo invece di ~3 secondi
results = await asyncio.gather(
fetch_user_from_db(user_id=1), # ~50ms
fetch_user_orders(user_id=1), # ~80ms
fetch_user_preferences(user_id=1), # ~40ms
)
return results # Pronto in ~80ms (il piu lento), non 170ms
# Workload CPU-bound: asyncio NON aiuta, usa ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import asyncio
executor = ProcessPoolExecutor(max_workers=4)
def cpu_intensive_task(data: list) -> list:
# Sorting O(n log n), computazione pura
return sorted(data, key=lambda x: x ** 2)
@app.post("/process")
async def process_data(data: list):
loop = asyncio.get_event_loop()
# run_in_executor esegue la funzione in un processo separato
# senza bloccare l'event loop
result = await loop.run_in_executor(
executor,
cpu_intensive_task,
data,
)
return {"processed": result}
Benchmark: Sync vs Async pe FastAPI
Iată un benchmark realist care arată diferența dintre sincronizarea punctului final și asincron pe sarcini de lucru legate de I/O cu 100 de solicitări simultane:
# Benchmark con httpx e asyncio (script di test)
# pip install httpx
import asyncio
import httpx
import time
async def benchmark(endpoint: str, n_requests: int = 100):
async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
start = time.time()
tasks = [client.get(endpoint) for _ in range(n_requests)]
responses = await asyncio.gather(*tasks)
elapsed = time.time() - start
success = sum(1 for r in responses if r.status_code == 200)
rps = n_requests / elapsed
print(f"{endpoint}: {elapsed:.2f}s, {rps:.1f} req/s, {success}/{n_requests} success")
# Endpoint test nel server FastAPI
@app.get("/test/sync")
def sync_test():
import time
time.sleep(0.1) # Simula 100ms DB query
return {"data": "ok"}
@app.get("/test/async")
async def async_test():
await asyncio.sleep(0.1) # Simula 100ms DB query async
return {"data": "ok"}
# Risultati tipici su un server con 4 worker Uvicorn:
# /test/sync: 10.23s, 9.8 req/s (quasi sequenziale!)
# /test/async: 1.05s, 95.2 req/s (quasi perfettamente concorrente)
#
# Con 100ms di latenza simulata:
# Sync: 100 richieste * 100ms = ~10s
# Async: concorrente = ~100ms + overhead
asyncio.run(benchmark("/test/sync"))
asyncio.run(benchmark("/test/async"))
Concluzii
Puterea Python Async în FastAPI este reală, dar necesită înțelegere modelul: bucla de evenimente este cu un singur fir, dar nu blochează I/O, corutinele sunt suspendate fără a bloca alți manipulatori, iar competiția este cooperant (nu preventiv ca firele de operare). Următorul pas este înțelegeți Pydantic v2, care oferă validarea datelor pe care se bazează FastAPI.
Articole viitoare din seria FastAPI
- Articolul 3: Pydantic v2 — Validare avansată, BaseModel și TypeAdapter
- Articolul 4: Injecție de dependență în FastAPI: Depends() și Pattern Clean
- Articolul 5: Baza de date asincronă cu SQLAlchemy 2.0 și Alambic







