Problema: Python este cu un singur thread

Python are Global Interpreter Lock (GIL): doar un fir Python execută bytecode la un moment dat. Acest lucru este adesea greșit înțeles. GIL nu împiedică concurența — previne Paralelism CPU de fire. Pentru sarcinile de lucru legate de I/O (API-urile sunt de obicei), asyncio depășește firele de execuție în eficiență deoarece elimină suprasarcina de comutare a contextului sistemului de operare.

Ce vei învăța

  • Bucla de evenimente: bucla care guvernează execuția asincronă
  • Corutine: Funcții care se pot întrerupe și relua
  • async/wait: sintaxa care face coroutinele lizibile
  • Sarcină vs viitor: primitive asincrone
  • I/O-bound vs CPU-bound: când asincron ajută și când nu
  • asyncio.gather și asyncio.TaskGroup pentru concurență structurată
  • Evaluați sincronizarea FastAPI vs asincron pe sarcini de lucru reale

Bucla de evenimente: inima asincronului

Bucla de evenimente este o buclă infinită care gestionează o coadă de apeluri inverse și corutine. Când o rutină întâmpină o operație I/O (await), suspendă și returnează controlul buclei de evenimente, pe care o poate executa alte corutine intre timp. Când I/O se termină, corutine este resetată la coadă pentru a fi filmat.

# Visualizzazione concettuale dell'event loop (pseudocodice)
#
# Event Loop Iteration:
# 1. Guarda la coda delle callback pronte
# 2. Esegui la prima callback/coroutine
# 3. Se incontra un await su I/O:
#    - Registra l'operazione I/O con il sistema operativo (epoll/kqueue/IOCP)
#    - Metti la coroutine in "sospensione" (waiting)
#    - Torna al passo 1 (esegui la prossima callback disponibile)
# 4. Quando l'I/O completa (notifica OS):
#    - Rimetti la coroutine nella coda "pronta"
# 5. Ripeti

# In Python reale:
import asyncio

async def fetch_data(url: str) -> str:
    # Simulazione di una richiesta HTTP asincrona
    # await sospende questa coroutine finche la risposta non arriva
    # L'event loop nel frattempo puo eseguire altre coroutine
    await asyncio.sleep(1)  # Simula latenza di rete
    return f"Data from {url}"

async def main():
    # Esecuzione sequenziale: 3 secondi totali
    result1 = await fetch_data("https://api1.example.com")
    result2 = await fetch_data("https://api2.example.com")
    result3 = await fetch_data("https://api3.example.com")
    return [result1, result2, result3]

# asyncio.run() crea l'event loop e lo esegue
asyncio.run(main())

Coroutine vs Funcții normale

O funcție normală (def) rulează de la început până la sfârșit fără întreruperi. O corutine (async def) este o funcţie care poate suspenda în anumite puncte (await) si renunta la controlul buclei de evenimente.

import asyncio
import time

# --- FUNZIONE SINCRONA ---
def fetch_sync(url: str) -> str:
    time.sleep(1)  # BLOCCA l'intero thread per 1 secondo
    return f"Data from {url}"

def main_sync():
    start = time.time()
    results = [
        fetch_sync("https://api1.example.com"),  # aspetta 1s
        fetch_sync("https://api2.example.com"),  # aspetta 1s
        fetch_sync("https://api3.example.com"),  # aspetta 1s
    ]
    print(f"Sync: {time.time() - start:.2f}s")  # ~3.00s
    return results

# --- COROUTINE ASINCRONA ---
async def fetch_async(url: str) -> str:
    await asyncio.sleep(1)  # SOSPENDE la coroutine, NON il thread
    return f"Data from {url}"

async def main_async():
    start = time.time()
    # gather esegue le tre coroutine CONCORRENTEMENTE
    results = await asyncio.gather(
        fetch_async("https://api1.example.com"),
        fetch_async("https://api2.example.com"),
        fetch_async("https://api3.example.com"),
    )
    print(f"Async: {time.time() - start:.2f}s")  # ~1.00s
    return results

# La differenza: 3s vs 1s per lo stesso workload I/O

Sarcină și asincronizare.adunare

asyncio.gather() este cel mai comun mod de a executa corutine concomitent. Revine când toate corutinele sunt finalizate (sau când unul eșuează, implicit).

import asyncio
from typing import Any

# asyncio.gather: esecuzione concorrente di piu coroutine
async def concurrent_fetches():
    # Tutte e tre iniziano quasi simultaneamente
    results = await asyncio.gather(
        fetch_async("https://api1.example.com"),
        fetch_async("https://api2.example.com"),
        fetch_async("https://api3.example.com"),
        return_exceptions=True,  # Errori restituiti come valori invece di eccezioni
    )

    for url, result in zip(["api1", "api2", "api3"], results):
        if isinstance(result, Exception):
            print(f"{url}: Error - {result}")
        else:
            print(f"{url}: {result}")

# asyncio.create_task: esecuzione in background
async def background_tasks():
    # Task 1 inizia subito
    task1 = asyncio.create_task(fetch_async("https://api1.example.com"))

    # Fai altro mentre task1 e in background
    await asyncio.sleep(0.5)  # Simula altro lavoro

    # task2 parte dopo 0.5s
    task2 = asyncio.create_task(fetch_async("https://api2.example.com"))

    # Aspetta entrambi
    result1 = await task1
    result2 = await task2
    return result1, result2

# asyncio.TaskGroup (Python 3.11+): concorrenza strutturata
async def structured_concurrency():
    results = []
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_async("https://api1.example.com"))
        task2 = tg.create_task(fetch_async("https://api2.example.com"))
        task3 = tg.create_task(fetch_async("https://api3.example.com"))
    # Qui tutti i task sono completati (o c'e stata un'eccezione)
    return [task1.result(), task2.result(), task3.result()]

async def în FastAPI: Când să îl utilizați

FastAPI acceptă ambele async defdef normal pentru ei traseu. Alegerea depinde de ceea ce face funcția:

# FastAPI: async def vs def
from fastapi import FastAPI
import asyncio
import httpx  # Client HTTP asincrono

app = FastAPI()

# USA async def quando:
# - Fai operazioni I/O con librerie async (httpx, asyncpg, aioredis, etc.)
# - Chiami altre coroutine con await
@app.get("/async-example")
async def async_endpoint():
    # httpx.AsyncClient e la versione async di requests
    async with httpx.AsyncClient() as client:
        response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
        return response.json()

# USA def normale quando:
# - La funzione e puramente CPU (calcoli, elaborazione in memoria)
# - Usi librerie sincrone che non supportano async
# FastAPI esegue le funzioni sync in un thread pool separato
# per non bloccare l'event loop
@app.get("/sync-example")
def sync_endpoint():
    import json
    # Operazione CPU-bound: OK in def normale
    data = {"numbers": list(range(1000))}
    return json.dumps(data)

# CASO CRITICO: MAI fare I/O sincrono bloccante in async def
@app.get("/bad-example")
async def bad_endpoint():
    import requests  # SBAGLIATO: requests e sincrono
    # Questo BLOCCA l'event loop per la durata della richiesta HTTP!
    response = requests.get("https://api.example.com")  # NON FARE QUESTO
    return response.json()

# VERSIONE CORRETTA:
@app.get("/good-example")
async def good_endpoint():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
        return response.json()

Pericolul bibliotecilor sincrone în asincron

Utilizați o bibliotecă I/O sincronă (cum ar fi requests, psycopg2, sau orice client DB tradițional) într-o rutină async def bloc întreaga buclă a evenimentului: nicio altă solicitare nu poate fi servită până când operațiunea nu se finalizează. Pentru utilizarea bazei de date asyncpg o SQLAlchemy 2.0 async. Pentru utilizare HTTP httpx o aiohttp. Pentru utilizarea Redis redis.asyncio.

I/O-Bound vs CPU-Bound: Distincția cheie

Async ajută doar pentru sarcinile de lucru I/O legat: operațiuni în care programul așteaptă resurse externe (bază de date, HTTP API, sistem de fișiere). După volumul de muncă Legat de procesor (învățare automată, codare video, calcule intensive) asyncio nu ajută — ajută multiprocessing sau un executor.

# Workload I/O-bound: asyncio aiuta molto
async def io_bound_handler():
    # Fa 3 chiamate API in ~1 secondo invece di ~3 secondi
    results = await asyncio.gather(
        fetch_user_from_db(user_id=1),      # ~50ms
        fetch_user_orders(user_id=1),        # ~80ms
        fetch_user_preferences(user_id=1),   # ~40ms
    )
    return results  # Pronto in ~80ms (il piu lento), non 170ms

# Workload CPU-bound: asyncio NON aiuta, usa ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import asyncio

executor = ProcessPoolExecutor(max_workers=4)

def cpu_intensive_task(data: list) -> list:
    # Sorting O(n log n), computazione pura
    return sorted(data, key=lambda x: x ** 2)

@app.post("/process")
async def process_data(data: list):
    loop = asyncio.get_event_loop()
    # run_in_executor esegue la funzione in un processo separato
    # senza bloccare l'event loop
    result = await loop.run_in_executor(
        executor,
        cpu_intensive_task,
        data,
    )
    return {"processed": result}

Benchmark: Sync vs Async pe FastAPI

Iată un benchmark realist care arată diferența dintre sincronizarea punctului final și asincron pe sarcini de lucru legate de I/O cu 100 de solicitări simultane:

# Benchmark con httpx e asyncio (script di test)
# pip install httpx
import asyncio
import httpx
import time

async def benchmark(endpoint: str, n_requests: int = 100):
    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        start = time.time()
        tasks = [client.get(endpoint) for _ in range(n_requests)]
        responses = await asyncio.gather(*tasks)
        elapsed = time.time() - start

        success = sum(1 for r in responses if r.status_code == 200)
        rps = n_requests / elapsed

        print(f"{endpoint}: {elapsed:.2f}s, {rps:.1f} req/s, {success}/{n_requests} success")

# Endpoint test nel server FastAPI
@app.get("/test/sync")
def sync_test():
    import time
    time.sleep(0.1)  # Simula 100ms DB query
    return {"data": "ok"}

@app.get("/test/async")
async def async_test():
    await asyncio.sleep(0.1)  # Simula 100ms DB query async
    return {"data": "ok"}

# Risultati tipici su un server con 4 worker Uvicorn:
# /test/sync:  10.23s, 9.8 req/s   (quasi sequenziale!)
# /test/async: 1.05s, 95.2 req/s   (quasi perfettamente concorrente)
#
# Con 100ms di latenza simulata:
# Sync:  100 richieste * 100ms = ~10s
# Async: concorrente = ~100ms + overhead

asyncio.run(benchmark("/test/sync"))
asyncio.run(benchmark("/test/async"))

Concluzii

Puterea Python Async în FastAPI este reală, dar necesită înțelegere modelul: bucla de evenimente este cu un singur fir, dar nu blochează I/O, corutinele sunt suspendate fără a bloca alți manipulatori, iar competiția este cooperant (nu preventiv ca firele de operare). Următorul pas este înțelegeți Pydantic v2, care oferă validarea datelor pe care se bazează FastAPI.

Articole viitoare din seria FastAPI

  • Articolul 3: Pydantic v2 — Validare avansată, BaseModel și TypeAdapter
  • Articolul 4: Injecție de dependență în FastAPI: Depends() și Pattern Clean
  • Articolul 5: Baza de date asincronă cu SQLAlchemy 2.0 și Alambic