Problém státu v Edge Computing

Předchozí články této série ukázaly, jak Cloudflare Workers vyniká pro bezstavové úlohy: každý požadavek je zpracován izolovaně, bez sdílené paměti mezi vyvoláním. Tato funkce je silnou stránkou škálovatelnosti horizontální, ale stává se překážkou, jakmile aplikace vyžaduje koordinaci.

Zvažte tyto běžné scénáře: Chatovací místnost, kde musí být zprávy objednáno a doručeno všem účastníkům; společný dokument, kde je více uživatelé upravují současně; omezovač rychlosti pro rozhraní API, která potřebují počítat požadavky globálně koherentním způsobem. Ve všech těchto případech jde o bezstátní model Dělníků nestačí: potřebuješ jeden byl vysoce konzistentní a kapacitu ke koordinaci konkurenčních požadavků na jednom místě.

To je přesně ten problém, který i Odolné předměty řeší.

Co se naučíte

  • Co je trvanlivý objekt a jak se liší od Workers KV
  • Model silné konzistence: single-writer, globální koordinace
  • Jak implementovat WebSocket se správou relací pomocí Durable Objects
  • Transakční úložiště: čtení, zápis a atomické transakce
  • Vzory pro chatovací místnosti, omezení rychlosti a spolupráci na dokumentech
  • Alarmy: plánované operace v rámci Trvanlivého objektu
  • Limity, ceny a kdy zvolit DO vs KV vs D1

Co je trvanlivý předmět

Trvanlivý objekt (DO) je třída JavaScript/TypeScript, kterou Cloudflare vytváří instance v jedné geografické lokalitě pro daný identifikátor. Na rozdíl od Workers KV (možná konzistence na 300+ PoP), DO má tyto záruky:

  • Konzistence jednoho zápisu: pro dané ID existuje celosvětově pouze jedna instance DO
  • Serializace požadavků: Volání DO se provádějí postupně, nikdy souběžně
  • Odolné úložiště: soukromý transakční úložiště párů klíč–hodnota pro každou instanci
  • Hibernace WebSocket: Připojení WebSocket přežijí období nečinnosti bez nákladů

Umístění DO je automaticky určeno při první aktivaci a zůstává fixní. Cloudflare vybere datové centrum nejblíže prvnímu požadavku. Všechny následující požadavky, kdekoli na světě, jsou směrovány na tento jediný instance prostřednictvím směrování Cloudflare anycast.

Primitivní Konzistence Soutěž Latence čtení Případ použití
Dělníci KV Případné (minuty) Multi-spisovatel ~1 ms (v mezipaměti) Konfigurace, aktiva, relace náročné na čtení
Odolné předměty Silný (linearizovatelný) Jediný spisovatel ~50-150 ms (od vzdáleného okraje) Chat, omezovač rychlosti, stav hry, CRDT
D1 SQLite Silný (primární) Multi-reader, single-writer ~5-20 ms (z nedalekého PoP) Relační dotazy, reporty, OLTP
R2 Object Storage Silný (podle předmětu) Multi-writer (detekce konfliktů) ~50-200 ms Soubory, obrázky, zálohy

Struktura trvanlivého předmětu

Trvanlivý objekt je třída se specifickými metodami běhového prostředí Cloudflare uznává. Nejdůležitější je fetch(), volal pro každého HTTP požadavek předán DO. Podívejme se na základní strukturu:

// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza

export class CounterDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
    // state.storage e il key-value store privato di questa istanza
    // Persiste attraverso i riavvii del DO
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    switch (url.pathname) {
      case '/increment': {
        // Legge il valore corrente (undefined se non esiste)
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = current + 1;
        // Scrittura atomica: garantita durabile prima del return
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/decrement': {
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = Math.max(0, current - 1);
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/value': {
        const count = (await this.state.storage.get<number>('count')) ?? 0;
        return Response.json({ count });
      }

      case '/reset': {
        await this.state.storage.put('count', 0);
        return Response.json({ count: 0, reset: true });
      }

      default:
        return new Response('Not Found', { status: 404 });
    }
  }
}

interface Env {
  COUNTER: DurableObjectNamespace;
}

Chcete-li použít DO ze vstupního bodu Worker, musíte jej vytvořit pomocí vazby jmenného prostoru a ID:

// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Estrae l'ID dalla query string: /counter?id=room-1
    const counterId = url.searchParams.get('id') ?? 'global';

    // idFromName() crea un ID deterministico da una stringa
    // Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
    const id = env.COUNTER.idFromName(counterId);

    // Ottiene lo stub per comunicare con l'istanza
    const stub = env.COUNTER.get(id);

    // Invia la richiesta al Durable Object
    // La richiesta viene instradata al datacenter corretto automaticamente
    return stub.fetch(request);
  },
};

interface Env {
  COUNTER: DurableObjectNamespace;
}

Konfigurace wrangler.toml musí deklarovat závaznost:

# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]

[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"

[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]

WebSocket s odolnými objekty: Chatovací místnost

Nejsilnějším případem použití odolných objektů je správa relací Sdílený stavový WebSocket. Každá chatovací místnost je samostatnou instancí DO, který udržuje seznam aktivních připojení a historii zpráv.

Cloudflare podporuje WebSocket Hibernation API: DO přichází „hibernováno“, když nejsou k dispozici žádné zprávy ke zpracování, což výrazně snižuje náklady (platíte pouze za dobu zpracování, nikoli za otevřená připojení).

// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation

export class ChatRoomDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/ws') {
      // Verifica che sia una richiesta di upgrade WebSocket
      const upgradeHeader = request.headers.get('Upgrade');
      if (!upgradeHeader || upgradeHeader !== 'websocket') {
        return new Response('Expected WebSocket upgrade', { status: 426 });
      }

      // Crea la coppia WebSocket server/client
      const { 0: clientWs, 1: serverWs } = new WebSocketPair();

      // Accetta la connessione tramite la Hibernation API
      // Il DO sara ibernato tra i messaggi (no costo di CPU idle)
      this.state.acceptWebSocket(serverWs);

      // Opzionale: associa metadata alla connessione
      // Utile per identificare l'utente nei messaggi successivi
      const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
      serverWs.serializeAttachment({ userId });

      // Invia la storia recente al nuovo utente
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      if (history.length > 0) {
        serverWs.send(JSON.stringify({ type: 'history', messages: history }));
      }

      return new Response(null, {
        status: 101,
        webSocket: clientWs,
      });
    }

    if (url.pathname === '/messages' && request.method === 'GET') {
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      return Response.json({ messages: history });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };

    let parsed: ClientMessage;
    try {
      parsed = JSON.parse(message as string);
    } catch {
      ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
      return;
    }

    if (parsed.type === 'chat') {
      const msg: Message = {
        id: crypto.randomUUID(),
        userId,
        text: parsed.text,
        timestamp: Date.now(),
      };

      // Salva nella history (mantieni solo gli ultimi 100 messaggi)
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      const newHistory = [...history, msg].slice(-100);
      await this.state.storage.put('history', newHistory);

      // Broadcast a tutte le connessioni WebSocket attive nel DO
      const allWebSockets = this.state.getWebSockets();
      const payload = JSON.stringify({ type: 'message', message: msg });
      for (const socket of allWebSockets) {
        try {
          socket.send(payload);
        } catch {
          // Connessione chiusa, ignorala
        }
      }
    }
  }

  // Chiamato quando una connessione WebSocket viene chiusa
  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };
    ws.close(code, reason);

    // Notifica gli altri utenti dell'uscita
    const notification = JSON.stringify({
      type: 'user_left',
      userId,
      timestamp: Date.now(),
    });
    for (const socket of this.state.getWebSockets()) {
      try {
        socket.send(notification);
      } catch { /* ignore */ }
    }
  }

  // Chiamato in caso di errore sulla connessione WebSocket
  async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
    console.error('WebSocket error:', error);
    ws.close(1011, 'Internal error');
  }
}

interface Message {
  id: string;
  userId: string;
  text: string;
  timestamp: number;
}

interface ClientMessage {
  type: 'chat' | 'ping';
  text?: string;
}

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Vstupní bod Pracovník směruje požadavky do správné místnosti na základě cesty:

// src/worker.ts
export { ChatRoomDO } from './chat-room-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // /room/:roomId/ws -> WebSocket per la stanza
    // /room/:roomId/messages -> GET cronologia
    const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
    if (!match) {
      return new Response('Not Found', { status: 404 });
    }

    const roomId = match[1];
    const subpath = match[2] ?? '/ws';

    // Ogni stanza e una istanza distinta del DO
    const id = env.CHAT_ROOM.idFromName(roomId);
    const stub = env.CHAT_ROOM.get(id);

    // Rewrite del path per il DO
    const doUrl = new URL(request.url);
    doUrl.pathname = subpath;

    return stub.fetch(new Request(doUrl.toString(), request));
  },
};

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Transakční úložiště: atomové operace

Úložiště Durable Objects podporuje atomické transakce prostřednictvím state.storage.transaction(). To je rozhodující, kdy operace musí číst a zapisovat více klíčů konzistentně:

// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
  state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    if (request.method !== 'POST') {
      return new Response('Method Not Allowed', { status: 405 });
    }

    const { from, to, amount } = await request.json<TransferRequest>();

    try {
      // La transazione e atomica: o tutto va a buon fine, o niente
      await this.state.storage.transaction(async (txn) => {
        const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
        const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;

        if (fromBalance < amount) {
          // Il throw annulla la transazione
          throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
        }

        await txn.put(`balance:${from}`, fromBalance - amount);
        await txn.put(`balance:${to}`, toBalance + amount);

        // Log dell'operazione
        const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
        txLog.push({ from, to, amount, timestamp: Date.now() });
        await txn.put('tx_log', txLog.slice(-1000));
      });

      return Response.json({ success: true, from, to, amount });
    } catch (err) {
      return Response.json(
        { success: false, error: (err as Error).message },
        { status: 400 }
      );
    }
  }
}

interface TransferRequest {
  from: string;
  to: string;
  amount: number;
}

interface TxRecord {
  from: string;
  to: string;
  amount: number;
  timestamp: number;
}

interface Env {
  ACCOUNT: DurableObjectNamespace;
}

Alarmy: Plánované operace v trvanlivém objektu

Podpora odolných objektů Alarmy: zpětné volání alarm() který je vyvolán v naplánovanou dobu, i když tam nejsem aktivní požadavky na DO. To je užitečné pro TTL, termíny a pravidelné úlohy spojené se stavem DO:

// src/session-do.ts
// DO con alarm per scadenza automatica della sessione

export class SessionDO implements DurableObject {
  state: DurableObjectState;
  static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/create' && request.method === 'POST') {
      const data = await request.json<SessionData>();

      // Salva i dati della sessione
      await this.state.storage.put('session', {
        ...data,
        createdAt: Date.now(),
      });

      // Schedula l'alarm per la scadenza della sessione
      // Se l'alarm e gia schedulato, viene sostituito
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
    }

    if (url.pathname === '/get') {
      const session = await this.state.storage.get<SessionData>('session');
      if (!session) {
        return Response.json({ error: 'Session not found' }, { status: 404 });
      }

      // Refresh del TTL ad ogni accesso (sliding expiry)
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ session });
    }

    if (url.pathname === '/invalidate' && request.method === 'DELETE') {
      await this.state.storage.deleteAll();
      await this.state.storage.deleteAlarm();
      return Response.json({ ok: true });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato automaticamente quando scatta l'alarm
  async alarm(): Promise<void> {
    // Pulisce i dati della sessione scaduta
    const session = await this.state.storage.get<SessionData>('session');
    if (session) {
      console.log(`Session expired for user: ${session.userId}`);
      await this.state.storage.deleteAll();
    }
  }
}

interface SessionData {
  userId: string;
  role: string;
  metadata?: Record<string, unknown>;
}

interface Env {
  SESSION: DurableObjectNamespace;
}

Globální omezovač rychlosti s odolnými předměty

Omezovač distribuované rychlosti je jedním z nejžádanějších vzorů pro veřejná rozhraní API. S Workers KV by implementace podléhala závodním podmínkám. S C, každý "bucket" omezující rychlost je instance se silnou konzistencí:

// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects

export class RateLimiterDO implements DurableObject {
  state: DurableObjectState;

  // Configurazione: 100 req/minuto per IP
  static MAX_TOKENS = 100;
  static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const now = Date.now();

    // Legge lo stato attuale del bucket
    const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
      tokens: RateLimiterDO.MAX_TOKENS,
      lastRefill: now,
    };

    // Calcola quanti token sono stati aggiunti dall'ultimo accesso
    const elapsed = now - bucket.lastRefill;
    const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
    const currentTokens = Math.min(
      RateLimiterDO.MAX_TOKENS,
      bucket.tokens + tokensToAdd
    );

    if (currentTokens < 1) {
      // Rate limit superato
      const retryAfterMs = Math.ceil(
        (1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
      );

      await this.state.storage.put('bucket', {
        tokens: currentTokens,
        lastRefill: now,
      });

      return Response.json(
        {
          allowed: false,
          retryAfter: Math.ceil(retryAfterMs / 1000),
          remaining: 0,
        },
        {
          status: 429,
          headers: {
            'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
            'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
            'X-RateLimit-Remaining': '0',
          },
        }
      );
    }

    // Consuma un token e aggiorna il bucket
    await this.state.storage.put('bucket', {
      tokens: currentTokens - 1,
      lastRefill: now,
    });

    return Response.json({
      allowed: true,
      remaining: Math.floor(currentTokens - 1),
    });
  }
}

interface TokenBucket {
  tokens: number;
  lastRefill: number;
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Worker integruje omezovač rychlosti do toku každého požadavku:

// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Identifica il client (IP o API key)
    const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
    const apiKey = request.headers.get('X-API-Key');
    const bucketId = apiKey ?? `ip:${clientIp}`;

    // Controlla il rate limit per questo client
    const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
    const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
    const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));

    if (limitCheck.status === 429) {
      return limitCheck; // Propaga la risposta 429 con gli headers
    }

    // Prosegue con la logica dell'API
    return handleApiRequest(request, env);
  },
};

async function handleApiRequest(request: Request, env: Env): Promise<Response> {
  return Response.json({ data: 'your api response here' });
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Úvahy o výkonu a nákladech

Odolné objekty mají velmi odlišný profil nákladů a výkonu k jednoduchým dělníkům. Některé klíčové body, které je třeba mít na paměti:

Latence: Ne vždy místní

Každá instance DO se nachází v jediném datovém centru. Pokud uživatel v Evropě přistupuje k DO vytvořenému v Severní Americe, latenci každé operace zahrnuje transatlantickou zpáteční cestu (~100-150 ms). Navrhněte DO ID Omezení geografického sdílení: Kdykoli je to možné, používejte ID založená na regionu (idFromName(`${region}:${resourceId}`)), nebo přijměte latenci vysoká pouze pro operace, které vyžadují silnou meziregionální konzistenci.

Hlas Volné úrovně Placení (placení pracovníci)
Žádosti na DO 1M/měsíc v ceně 0,15 dolaru za milion výše
DO Lifetime (CPU) 400 000 GB za měsíc 12,50 $ za milion GB-s
Skladování 1 GB v ceně 0,20 $/GB měsíčně dále
Hibernace WebSocket K dispozici K dispozici (bez nákladů na nečinnost)
Alarmy K dispozici K dispozici

Nejlepší postupy

  • Granularita ID: použijte konkrétní ID (např. chat:room-42) spíše než globální ID, abyste se vyhnuli aktivním bodům na jednotlivých instancích.
  • Hibernace WebSocket vždy: USA state.acceptWebSocket() místo ručních posluchačů událostí, aby se snížily náklady na nečinné připojení.
  • Dávkové skladování: USA storage.put(map) psát více klíčů v jedné operaci místo více put() nezadaní.
  • Omezená velikost úložiště: každá instance má limit 128 MB skladování. Pro velká data použijte R2 a ukládejte pouze reference v DO.
  • Rozpočet na serializaci: požadavky jsou zařazeny do fronty a zpracovány postupně; pomalé operace blokují následné. Udržujte manipulátory rychlé (ideální <1s).

Závěry a další kroky

Trvanlivé předměty překlenují propast mezi bezstátním modelem Dělníků a dělníků moderní aplikace, které vyžadují koordinaci a sdílený stav. jejich v kombinaci s rozhraním WebSocket Hibernation API a Alarms z nich činí primitivní kompletní pro chat, hraní her, spolupráci na dokumentech a globální koordinační systémy.

Kompromisem je latence: DO je fyzicky pouze v jednom datovém centru, operace zápisu mezi oblastmi tedy zvyšují latenci. Pro čtení škálovatelné zvažte kombinaci DO (pro zápisy) s KV (pro čtení v mezipaměti).

Další články v seriálu

  • Článek 5: Workers AI — Odvození LLM a Vision Models on the Edge: Jak spustit modely Llama, Whisper a vision přímo v Workers bez vyhrazeného GPU, s AI pracujících meziročně o 4000 % rostoucí.
  • Článek 6: Vercel Edge Runtime — pokročilý middleware, Geolokace a A/B testování: Vercelův přístup k okraji pomocí Next.js.
  • Článek 7: Geografické směrování na okraji — Personalizace Obsah a soulad s GDPR: geo-fencing, lokalizované ceny a GDPR.