Sorun: Python Tek İş Parçacıklıdır

Python'da Global Yorumlayıcı Kilidi (GIL) vardır: yalnızca bir Python iş parçacığı bayt kodunu çalıştırır bir anda. Bu genellikle yanlış anlaşılır. GIL rekabeti engellemez — önler CPU paralelliği ipliklerden. G/Ç bağlantılı iş yükleri için (API'ler genellikle öyledir), asyncio verimlilik açısından iş parçacıklarından daha iyi performans gösterir çünkü İşletim sistemi bağlamı değiştirme yükünü ortadan kaldırır.

Ne Öğreneceksiniz

  • Olay döngüsü: Eşzamansız yürütmeyi yöneten döngü
  • Eşyordamlar: Duraklatabilen ve devam ettirebilen işlevler
  • async/await: eşyordamları okunabilir kılan sözdizimi
  • Görev ve Gelecek: asyncio ilkelleri
  • G/Ç bağlantılı ve CPU bağlantılı: zaman uyumsuzluk yardımcı olduğunda ve olmadığında
  • Yapılandırılmış eşzamanlılık için asyncio.gather ve asyncio.TaskGroup
  • Gerçek iş yüklerinde FastAPI senkronizasyonu ile eşzamansız karşılaştırmayı karşılaştırın

Olay Döngüsü: Asyncio'nun Kalbi

Olay döngüsü, bir geri arama kuyruğunu yöneten ve eşyordamlar. Bir eşyordam bir G/Ç işlemiyle karşılaştığında (await), askıya alır ve yürütebileceği olay döngüsünün kontrolünü geri döndürür bu arada diğer koroutinler. G/Ç tamamlandığında eşyordam sıfırlanır filme alınmak için sıraya giriyor

# Visualizzazione concettuale dell'event loop (pseudocodice)
#
# Event Loop Iteration:
# 1. Guarda la coda delle callback pronte
# 2. Esegui la prima callback/coroutine
# 3. Se incontra un await su I/O:
#    - Registra l'operazione I/O con il sistema operativo (epoll/kqueue/IOCP)
#    - Metti la coroutine in "sospensione" (waiting)
#    - Torna al passo 1 (esegui la prossima callback disponibile)
# 4. Quando l'I/O completa (notifica OS):
#    - Rimetti la coroutine nella coda "pronta"
# 5. Ripeti

# In Python reale:
import asyncio

async def fetch_data(url: str) -> str:
    # Simulazione di una richiesta HTTP asincrona
    # await sospende questa coroutine finche la risposta non arriva
    # L'event loop nel frattempo puo eseguire altre coroutine
    await asyncio.sleep(1)  # Simula latenza di rete
    return f"Data from {url}"

async def main():
    # Esecuzione sequenziale: 3 secondi totali
    result1 = await fetch_data("https://api1.example.com")
    result2 = await fetch_data("https://api2.example.com")
    result3 = await fetch_data("https://api3.example.com")
    return [result1, result2, result3]

# asyncio.run() crea l'event loop e lo esegue
asyncio.run(main())

Eşyordamlar ve Normal İşlevler

Normal bir fonksiyon (def) baştan sona olmadan çalışır kesintiler. Bir eşyordam (async def) bir fonksiyondur belirli noktalarda askıya alınabilir (await) ve vazgeçmek olay döngüsü kontrolü.

import asyncio
import time

# --- FUNZIONE SINCRONA ---
def fetch_sync(url: str) -> str:
    time.sleep(1)  # BLOCCA l'intero thread per 1 secondo
    return f"Data from {url}"

def main_sync():
    start = time.time()
    results = [
        fetch_sync("https://api1.example.com"),  # aspetta 1s
        fetch_sync("https://api2.example.com"),  # aspetta 1s
        fetch_sync("https://api3.example.com"),  # aspetta 1s
    ]
    print(f"Sync: {time.time() - start:.2f}s")  # ~3.00s
    return results

# --- COROUTINE ASINCRONA ---
async def fetch_async(url: str) -> str:
    await asyncio.sleep(1)  # SOSPENDE la coroutine, NON il thread
    return f"Data from {url}"

async def main_async():
    start = time.time()
    # gather esegue le tre coroutine CONCORRENTEMENTE
    results = await asyncio.gather(
        fetch_async("https://api1.example.com"),
        fetch_async("https://api2.example.com"),
        fetch_async("https://api3.example.com"),
    )
    print(f"Async: {time.time() - start:.2f}s")  # ~1.00s
    return results

# La differenza: 3s vs 1s per lo stesso workload I/O

Görev ve asyncio.gather

asyncio.gather() yürütmenin en yaygın yoludur eşzamanlı olarak koroutinler. Tüm eşyordamlar tamamlandığında geri döner (veya varsayılan olarak biri başarısız olduğunda).

import asyncio
from typing import Any

# asyncio.gather: esecuzione concorrente di piu coroutine
async def concurrent_fetches():
    # Tutte e tre iniziano quasi simultaneamente
    results = await asyncio.gather(
        fetch_async("https://api1.example.com"),
        fetch_async("https://api2.example.com"),
        fetch_async("https://api3.example.com"),
        return_exceptions=True,  # Errori restituiti come valori invece di eccezioni
    )

    for url, result in zip(["api1", "api2", "api3"], results):
        if isinstance(result, Exception):
            print(f"{url}: Error - {result}")
        else:
            print(f"{url}: {result}")

# asyncio.create_task: esecuzione in background
async def background_tasks():
    # Task 1 inizia subito
    task1 = asyncio.create_task(fetch_async("https://api1.example.com"))

    # Fai altro mentre task1 e in background
    await asyncio.sleep(0.5)  # Simula altro lavoro

    # task2 parte dopo 0.5s
    task2 = asyncio.create_task(fetch_async("https://api2.example.com"))

    # Aspetta entrambi
    result1 = await task1
    result2 = await task2
    return result1, result2

# asyncio.TaskGroup (Python 3.11+): concorrenza strutturata
async def structured_concurrency():
    results = []
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_async("https://api1.example.com"))
        task2 = tg.create_task(fetch_async("https://api2.example.com"))
        task3 = tg.create_task(fetch_async("https://api3.example.com"))
    # Qui tutti i task sono completati (o c'e stata un'eccezione)
    return [task1.result(), task2.result(), task3.result()]

FastAPI'de async def: Ne Zaman Kullanılmalı

FastAPI her ikisini de destekler async def che def onlar için normal rota. Seçim, fonksiyonun ne yaptığına bağlıdır:

# FastAPI: async def vs def
from fastapi import FastAPI
import asyncio
import httpx  # Client HTTP asincrono

app = FastAPI()

# USA async def quando:
# - Fai operazioni I/O con librerie async (httpx, asyncpg, aioredis, etc.)
# - Chiami altre coroutine con await
@app.get("/async-example")
async def async_endpoint():
    # httpx.AsyncClient e la versione async di requests
    async with httpx.AsyncClient() as client:
        response = await client.get("https://jsonplaceholder.typicode.com/posts/1")
        return response.json()

# USA def normale quando:
# - La funzione e puramente CPU (calcoli, elaborazione in memoria)
# - Usi librerie sincrone che non supportano async
# FastAPI esegue le funzioni sync in un thread pool separato
# per non bloccare l'event loop
@app.get("/sync-example")
def sync_endpoint():
    import json
    # Operazione CPU-bound: OK in def normale
    data = {"numbers": list(range(1000))}
    return json.dumps(data)

# CASO CRITICO: MAI fare I/O sincrono bloccante in async def
@app.get("/bad-example")
async def bad_endpoint():
    import requests  # SBAGLIATO: requests e sincrono
    # Questo BLOCCA l'event loop per la durata della richiesta HTTP!
    response = requests.get("https://api.example.com")  # NON FARE QUESTO
    return response.json()

# VERSIONE CORRETTA:
@app.get("/good-example")
async def good_endpoint():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
        return response.json()

Eşzamansız Ortamda Eşzamanlı Kütüphanelerin Tehlikesi

Senkron bir G/Ç kitaplığı kullanın (örneğin requests, psycopg2, veya herhangi bir geleneksel DB istemcisi) bir ortak rutinde async def blok olay döngüsünün tamamı: şu ana kadar başka hiçbir istek yerine getirilemez: işlem tamamlanmıyor. Veritabanı kullanımı için asyncpg o SQLAlchemy 2.0 async. HTTP kullanımı için httpx o aiohttp. Redis'in kullanımı için redis.asyncio.

G/Ç'ye Bağlı ve CPU'ya Bağlı: Temel Ayrım

Async yalnızca iş yüklerine yardımcı olur G/Ç bağlı: operasyonlar program harici kaynakları (veritabanı, HTTP API, dosya sistemi) bekler. İş yüküne göre CPU'ya bağlı (makine öğrenimi, video kodlama, yoğun hesaplamalar) asyncio yardımcı olmuyor - yardımcı oluyor multiprocessing veya bir uygulayıcı.

# Workload I/O-bound: asyncio aiuta molto
async def io_bound_handler():
    # Fa 3 chiamate API in ~1 secondo invece di ~3 secondi
    results = await asyncio.gather(
        fetch_user_from_db(user_id=1),      # ~50ms
        fetch_user_orders(user_id=1),        # ~80ms
        fetch_user_preferences(user_id=1),   # ~40ms
    )
    return results  # Pronto in ~80ms (il piu lento), non 170ms

# Workload CPU-bound: asyncio NON aiuta, usa ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import asyncio

executor = ProcessPoolExecutor(max_workers=4)

def cpu_intensive_task(data: list) -> list:
    # Sorting O(n log n), computazione pura
    return sorted(data, key=lambda x: x ** 2)

@app.post("/process")
async def process_data(data: list):
    loop = asyncio.get_event_loop()
    # run_in_executor esegue la funzione in un processo separato
    # senza bloccare l'event loop
    result = await loop.run_in_executor(
        executor,
        cpu_intensive_task,
        data,
    )
    return {"processed": result}

Karşılaştırma: FastAPI'de Senkronizasyon ve Async Karşılaştırması

Uç nokta senkronizasyonu ile eşzamansız arasındaki farkı gösteren gerçekçi bir karşılaştırmayı burada bulabilirsiniz 100 eşzamanlı istekle G/Ç bağlantılı iş yüklerinde:

# Benchmark con httpx e asyncio (script di test)
# pip install httpx
import asyncio
import httpx
import time

async def benchmark(endpoint: str, n_requests: int = 100):
    async with httpx.AsyncClient(base_url="http://localhost:8000") as client:
        start = time.time()
        tasks = [client.get(endpoint) for _ in range(n_requests)]
        responses = await asyncio.gather(*tasks)
        elapsed = time.time() - start

        success = sum(1 for r in responses if r.status_code == 200)
        rps = n_requests / elapsed

        print(f"{endpoint}: {elapsed:.2f}s, {rps:.1f} req/s, {success}/{n_requests} success")

# Endpoint test nel server FastAPI
@app.get("/test/sync")
def sync_test():
    import time
    time.sleep(0.1)  # Simula 100ms DB query
    return {"data": "ok"}

@app.get("/test/async")
async def async_test():
    await asyncio.sleep(0.1)  # Simula 100ms DB query async
    return {"data": "ok"}

# Risultati tipici su un server con 4 worker Uvicorn:
# /test/sync:  10.23s, 9.8 req/s   (quasi sequenziale!)
# /test/async: 1.05s, 95.2 req/s   (quasi perfettamente concorrente)
#
# Con 100ms di latenza simulata:
# Sync:  100 richieste * 100ms = ~10s
# Async: concorrente = ~100ms + overhead

asyncio.run(benchmark("/test/sync"))
asyncio.run(benchmark("/test/async"))

Sonuçlar

FastAPI'de Python async'in gücü gerçektir ancak anlaşılması gerekir model: olay döngüsü tek iş parçacıklıdır ancak G/Ç için engelleme yapmaz, diğer idarecileri engellemeden ortak rutinler askıya alınır ve yarışma işbirlikçidir (işletim sistemi iş parçacıkları gibi önleyici değildir). Bir sonraki adım FastAPI'nin dayandığı veri doğrulamayı sağlayan Pydantic v2'yi anlayın.

FastAPI Serisinde Gelecek Makaleler

  • Madde 3: Pydantic v2 — Gelişmiş Doğrulama, BaseModel ve TypeAdapter
  • Madde 4: FastAPI'de Bağımlılık Enjeksiyonu: Depends() ve Desen Temizliği
  • Madde 5: SQLAlchemy 2.0 ve Alembic ile Eşzamansız Veritabanı