Problema de stat în Edge Computing

Articolele anterioare din această serie au arătat cum excelează Cloudflare Workers pentru încărcături de lucru fără stat: fiecare cerere este tratată izolat, fără memorie partajată între invocaţii. Această caracteristică este un punct forte pentru scalabilitate orizontală, dar devine un obstacol de îndată ce aplicarea necesită coordonare.

Luați în considerare aceste scenarii comune: o cameră de chat unde trebuie să fie mesaje comandat și livrat tuturor participanților; un document de colaborare unde mai mult utilizatorii editează simultan; un limitator de rată pentru API-urile care trebuie să conteze cererile într-un mod coerent la nivel global. În toate aceste cazuri modelul apatrid al Muncitorilor nu este suficient: ai nevoie de unul fost foarte consistent si capacitate pentru a coordona cererile concurente într-un singur loc.

Aceasta este exact problema pe care i Obiecte durabile ei rezolvă.

Ce vei învăța

  • Ce este un obiect durabil și cum diferă de Workers KV
  • Modelul de consistență puternică: un singur scriitor, coordonare globală
  • Cum să implementați WebSocket cu gestionarea sesiunii folosind Durable Objects
  • Stocare tranzacțională: citire, scriere și tranzacții atomice
  • Modele pentru camere de chat, limitarea ratei și colaborarea documentelor
  • Alarme: operațiuni programate în cadrul Obiectului Durabil
  • Limite, prețuri și când să alegeți DO vs KV vs D1

Ce este un obiect durabil

Un obiect durabil (DO) este o clasă JavaScript/TypeScript pe care Cloudflare o instanțează într-o singură locație geografică pentru un anumit identificator. Spre deosebire de Muncitorii KV (consecvență posibilă pe peste 300 de PoP), un DO are următoarele garanții:

  • Consecvență cu un singur scriitor: doar o singură instanță a DO există la nivel mondial pentru un anumit ID
  • Serializarea cererilor: Apelurile DO sunt executate secvenţial, niciodată simultan
  • Depozitare durabilă: un magazin tranzacțional privat cheie-valoare pentru fiecare instanță
  • Hibernare WebSocket: Conexiunile WebSocket supraviețuiesc perioadelor de inactivitate fără costuri

Locația unui DO este determinată automat la prima activare și rămâne fix. Cloudflare alege centrul de date cel mai apropiat de prima solicitare. Toate cererile ulterioare, oriunde în lume, sunt direcționate către acea singură instanță prin rutarea Anycast Cloudflare.

Primitiv Consecvență Concurenţă Latența de citire Caz de utilizare
Muncitori KV Eventual (minute) Multi-scriitor ~1 ms (în cache) Configurație, active, sesiuni de citire grele
Obiecte durabile Puternic (liniizabil) Un singur scriitor ~50-150 ms (de la marginea de la distanță) Chat, limitator de rată, stare joc, CRDT
D1 SQLite Puternic (primar) Multi-cititor, un singur scriitor ~5-20 ms (de la PoP din apropiere) Interogări relaționale, rapoarte, OLTP
Depozitarea obiectelor R2 Puternic (după obiect) Multi-writer (detecția conflictelor) ~50-200 ms Fișiere, imagini, copii de rezervă

Structura unui obiect durabil

Un obiect durabil este o clasă cu metode specifice pe care rulează Cloudflare recunoaște. Cel mai important este fetch(), chemat pentru fiecare Solicitarea HTTP a fost redirecționată către DO. Să vedem structura de bază:

// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza

export class CounterDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
    // state.storage e il key-value store privato di questa istanza
    // Persiste attraverso i riavvii del DO
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    switch (url.pathname) {
      case '/increment': {
        // Legge il valore corrente (undefined se non esiste)
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = current + 1;
        // Scrittura atomica: garantita durabile prima del return
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/decrement': {
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = Math.max(0, current - 1);
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/value': {
        const count = (await this.state.storage.get<number>('count')) ?? 0;
        return Response.json({ count });
      }

      case '/reset': {
        await this.state.storage.put('count', 0);
        return Response.json({ count: 0, reset: true });
      }

      default:
        return new Response('Not Found', { status: 404 });
    }
  }
}

interface Env {
  COUNTER: DurableObjectNamespace;
}

Pentru a utiliza DO din punctul de intrare Worker, trebuie să-l instanțiați prin legarea spațiului de nume si un act de identitate:

// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Estrae l'ID dalla query string: /counter?id=room-1
    const counterId = url.searchParams.get('id') ?? 'global';

    // idFromName() crea un ID deterministico da una stringa
    // Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
    const id = env.COUNTER.idFromName(counterId);

    // Ottiene lo stub per comunicare con l'istanza
    const stub = env.COUNTER.get(id);

    // Invia la richiesta al Durable Object
    // La richiesta viene instradata al datacenter corretto automaticamente
    return stub.fetch(request);
  },
};

interface Env {
  COUNTER: DurableObjectNamespace;
}

Configurația wrangler.toml trebuie să declare obligatoriu:

# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]

[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"

[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]

WebSocket cu obiecte durabile: cameră de chat

Cel mai puternic caz de utilizare al obiectelor durabile este gestionarea sesiunii WebSocket cu stare partajată. Fiecare cameră de chat este o instanță separată a DO, care menține lista conexiunilor active și istoricul mesajelor.

Cloudflare acceptă API-ul WebSocket Hibernation: DO vine „hibernat” atunci când nu există mesaje de procesat, reducând dramatic costurile (plătiți doar pentru timpul de procesare, nu pentru conexiunile deschise).

// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation

export class ChatRoomDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/ws') {
      // Verifica che sia una richiesta di upgrade WebSocket
      const upgradeHeader = request.headers.get('Upgrade');
      if (!upgradeHeader || upgradeHeader !== 'websocket') {
        return new Response('Expected WebSocket upgrade', { status: 426 });
      }

      // Crea la coppia WebSocket server/client
      const { 0: clientWs, 1: serverWs } = new WebSocketPair();

      // Accetta la connessione tramite la Hibernation API
      // Il DO sara ibernato tra i messaggi (no costo di CPU idle)
      this.state.acceptWebSocket(serverWs);

      // Opzionale: associa metadata alla connessione
      // Utile per identificare l'utente nei messaggi successivi
      const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
      serverWs.serializeAttachment({ userId });

      // Invia la storia recente al nuovo utente
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      if (history.length > 0) {
        serverWs.send(JSON.stringify({ type: 'history', messages: history }));
      }

      return new Response(null, {
        status: 101,
        webSocket: clientWs,
      });
    }

    if (url.pathname === '/messages' && request.method === 'GET') {
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      return Response.json({ messages: history });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };

    let parsed: ClientMessage;
    try {
      parsed = JSON.parse(message as string);
    } catch {
      ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
      return;
    }

    if (parsed.type === 'chat') {
      const msg: Message = {
        id: crypto.randomUUID(),
        userId,
        text: parsed.text,
        timestamp: Date.now(),
      };

      // Salva nella history (mantieni solo gli ultimi 100 messaggi)
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      const newHistory = [...history, msg].slice(-100);
      await this.state.storage.put('history', newHistory);

      // Broadcast a tutte le connessioni WebSocket attive nel DO
      const allWebSockets = this.state.getWebSockets();
      const payload = JSON.stringify({ type: 'message', message: msg });
      for (const socket of allWebSockets) {
        try {
          socket.send(payload);
        } catch {
          // Connessione chiusa, ignorala
        }
      }
    }
  }

  // Chiamato quando una connessione WebSocket viene chiusa
  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };
    ws.close(code, reason);

    // Notifica gli altri utenti dell'uscita
    const notification = JSON.stringify({
      type: 'user_left',
      userId,
      timestamp: Date.now(),
    });
    for (const socket of this.state.getWebSockets()) {
      try {
        socket.send(notification);
      } catch { /* ignore */ }
    }
  }

  // Chiamato in caso di errore sulla connessione WebSocket
  async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
    console.error('WebSocket error:', error);
    ws.close(1011, 'Internal error');
  }
}

interface Message {
  id: string;
  userId: string;
  text: string;
  timestamp: number;
}

interface ClientMessage {
  type: 'chat' | 'ping';
  text?: string;
}

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Punctul de intrare al lucrătorului direcționează cererile către camera corectă pe baza căii:

// src/worker.ts
export { ChatRoomDO } from './chat-room-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // /room/:roomId/ws -> WebSocket per la stanza
    // /room/:roomId/messages -> GET cronologia
    const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
    if (!match) {
      return new Response('Not Found', { status: 404 });
    }

    const roomId = match[1];
    const subpath = match[2] ?? '/ws';

    // Ogni stanza e una istanza distinta del DO
    const id = env.CHAT_ROOM.idFromName(roomId);
    const stub = env.CHAT_ROOM.get(id);

    // Rewrite del path per il DO
    const doUrl = new URL(request.url);
    doUrl.pathname = subpath;

    return stub.fetch(new Request(doUrl.toString(), request));
  },
};

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Stocare tranzacțională: operațiuni atomice

Stocarea Durable Objects acceptă tranzacții atomice prin state.storage.transaction(). Acest lucru este crucial când o operație trebuie să citească și să scrie mai multe chei în mod constant:

// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
  state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    if (request.method !== 'POST') {
      return new Response('Method Not Allowed', { status: 405 });
    }

    const { from, to, amount } = await request.json<TransferRequest>();

    try {
      // La transazione e atomica: o tutto va a buon fine, o niente
      await this.state.storage.transaction(async (txn) => {
        const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
        const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;

        if (fromBalance < amount) {
          // Il throw annulla la transazione
          throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
        }

        await txn.put(`balance:${from}`, fromBalance - amount);
        await txn.put(`balance:${to}`, toBalance + amount);

        // Log dell'operazione
        const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
        txLog.push({ from, to, amount, timestamp: Date.now() });
        await txn.put('tx_log', txLog.slice(-1000));
      });

      return Response.json({ success: true, from, to, amount });
    } catch (err) {
      return Response.json(
        { success: false, error: (err as Error).message },
        { status: 400 }
      );
    }
  }
}

interface TransferRequest {
  from: string;
  to: string;
  amount: number;
}

interface TxRecord {
  from: string;
  to: string;
  amount: number;
  timestamp: number;
}

interface Env {
  ACCOUNT: DurableObjectNamespace;
}

Alarme: Operații programate în obiectul durabil

Suport pentru obiecte durabile Alarme: un apel invers alarm() care este invocat la o oră programată, chiar dacă eu nu sunt acolo solicitări active către DO. Acest lucru este util pentru TTL, termene limită și lucrări periodice legate de statutul DO:

// src/session-do.ts
// DO con alarm per scadenza automatica della sessione

export class SessionDO implements DurableObject {
  state: DurableObjectState;
  static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/create' && request.method === 'POST') {
      const data = await request.json<SessionData>();

      // Salva i dati della sessione
      await this.state.storage.put('session', {
        ...data,
        createdAt: Date.now(),
      });

      // Schedula l'alarm per la scadenza della sessione
      // Se l'alarm e gia schedulato, viene sostituito
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
    }

    if (url.pathname === '/get') {
      const session = await this.state.storage.get<SessionData>('session');
      if (!session) {
        return Response.json({ error: 'Session not found' }, { status: 404 });
      }

      // Refresh del TTL ad ogni accesso (sliding expiry)
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ session });
    }

    if (url.pathname === '/invalidate' && request.method === 'DELETE') {
      await this.state.storage.deleteAll();
      await this.state.storage.deleteAlarm();
      return Response.json({ ok: true });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato automaticamente quando scatta l'alarm
  async alarm(): Promise<void> {
    // Pulisce i dati della sessione scaduta
    const session = await this.state.storage.get<SessionData>('session');
    if (session) {
      console.log(`Session expired for user: ${session.userId}`);
      await this.state.storage.deleteAll();
    }
  }
}

interface SessionData {
  userId: string;
  role: string;
  metadata?: Record<string, unknown>;
}

interface Env {
  SESSION: DurableObjectNamespace;
}

Limitator global de viteză cu obiecte durabile

Un limitator de rată distribuită este unul dintre cele mai solicitate modele pentru API-urile publice. Cu Workers KV, implementarea ar fi supusă condițiilor de cursă. Cu un C, fiecare „bucket” de limitare a ratei este o instanță cu o consistență puternică:

// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects

export class RateLimiterDO implements DurableObject {
  state: DurableObjectState;

  // Configurazione: 100 req/minuto per IP
  static MAX_TOKENS = 100;
  static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const now = Date.now();

    // Legge lo stato attuale del bucket
    const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
      tokens: RateLimiterDO.MAX_TOKENS,
      lastRefill: now,
    };

    // Calcola quanti token sono stati aggiunti dall'ultimo accesso
    const elapsed = now - bucket.lastRefill;
    const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
    const currentTokens = Math.min(
      RateLimiterDO.MAX_TOKENS,
      bucket.tokens + tokensToAdd
    );

    if (currentTokens < 1) {
      // Rate limit superato
      const retryAfterMs = Math.ceil(
        (1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
      );

      await this.state.storage.put('bucket', {
        tokens: currentTokens,
        lastRefill: now,
      });

      return Response.json(
        {
          allowed: false,
          retryAfter: Math.ceil(retryAfterMs / 1000),
          remaining: 0,
        },
        {
          status: 429,
          headers: {
            'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
            'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
            'X-RateLimit-Remaining': '0',
          },
        }
      );
    }

    // Consuma un token e aggiorna il bucket
    await this.state.storage.put('bucket', {
      tokens: currentTokens - 1,
      lastRefill: now,
    });

    return Response.json({
      allowed: true,
      remaining: Math.floor(currentTokens - 1),
    });
  }
}

interface TokenBucket {
  tokens: number;
  lastRefill: number;
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Lucrătorul integrează limitatorul de rată în fluxul fiecărei cereri:

// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Identifica il client (IP o API key)
    const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
    const apiKey = request.headers.get('X-API-Key');
    const bucketId = apiKey ?? `ip:${clientIp}`;

    // Controlla il rate limit per questo client
    const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
    const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
    const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));

    if (limitCheck.status === 429) {
      return limitCheck; // Propaga la risposta 429 con gli headers
    }

    // Prosegue con la logica dell'API
    return handleApiRequest(request, env);
  },
};

async function handleApiRequest(request: Request, env: Env): Promise<Response> {
  return Response.json({ data: 'your api response here' });
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Considerații de performanță și cost

Obiectele durabile au un profil de cost și performanță foarte diferit la Muncitori simpli. Câteva puncte cheie de reținut:

Latență: nu întotdeauna locală

Fiecare instanță DO rezidă într-un singur centru de date. Dacă un utilizator în Europa accesează un DO instanțiat în America de Nord, latența fiecărei operațiuni include călătoria transatlantică dus-întors (~100-150ms). Proiectați ID-uri DO Pentru a limita partajarea geografică: utilizați ID-uri bazate pe regiune ori de câte ori este posibil (idFromName(`${region}:${resourceId}`)), sau acceptați latența ridicat numai pentru operațiunile care necesită o consecvență transregională puternică.

Voce Niveluri gratuite Plătit (lucrători plătiți)
Cereri către DO 1M/luna inclus 0,15 USD per milion de mai sus
Durată de viață DO (CPU) 400.000 GB-s/lună 12,50 USD per milion GB-s
Depozitare 1 GB inclus 0,20 USD/GB-lună dincolo
Hibernare WebSocket Disponibil Disponibil (fără cost inactiv)
Alarme Disponibil Disponibil

Cele mai bune practici

  • Granularitatea ID: utilizați ID-uri specifice (de ex. chat:room-42) mai degrabă decât ID-uri globale pentru a evita hotspot-urile pe instanțe individuale.
  • Hibernare WebSocket întotdeauna: STATELE UNITE ALE AMERICII state.acceptWebSocket() în locul ascultătorilor manuali de evenimente pentru a reduce costurile de conectare inactivă.
  • Stocare în lot: STATELE UNITE ALE AMERICII storage.put(map) a scrie taste multiple într-o singură operație în loc de mai multe put() singuri.
  • Limită dimensiunea de stocare: fiecare instanță are o limită de 128 MB de depozitare. Pentru date mari utilizați R2 și salvați doar referințele în DO.
  • Bugetul de serializare: cererile sunt puse în coadă și procesate secvenţial; operațiunile lente blochează cele ulterioare. Păstrați manipulatorii rapid (<1s ideal).

Concluzii și pașii următori

Obiectele durabile creează decalajul dintre modelul apatrid al muncitorilor și cel aplicații moderne care necesită coordonare și stare partajată. ale lor combinate cu WebSocket Hibernation API și Alarms le face un primitiv complet pentru chat, jocuri, colaborare în documente și sisteme de coordonare globală.

Compensația este latența: un DO este fizic într-un singur centru de date, astfel că operațiunile de scriere între regiuni adaugă latență. Pentru lecturi scalabil luați în considerare combinarea DO (pentru scrieri) cu KV (pentru citiri în cache).

Următoarele articole din serie

  • Articolul 5: Workers AI — Inferența LLM și modele de viziune on the Edge: Cum să rulați modele Llama, Whisper și viziune direct în Workers fără GPU dedicat, cu IA pentru lucrători în creștere cu 4000% pe an.
  • Articolul 6: Vercel Edge Runtime — Advanced Middleware, Geolocalizare și testare A/B: abordarea lui Vercel la margine cu Next.js.
  • Articolul 7: Rutarea geografică la margine — Personalizare Conținut și conformitate GDPR: geo-fencing, prețuri localizate și GDPR.