Problem stanu w przetwarzaniu brzegowym

Poprzednie artykuły z tej serii pokazały, jak wyróżnia się Cloudflare Workers w przypadku obciążeń bezstanowych: każde żądanie jest obsługiwane oddzielnie, bez pamięci współdzielonej pomiędzy wywołaniami. Ta funkcja jest mocną stroną skalowalności poziomo, ale staje się przeszkodą, gdy tylko zastosowanie wymaga koordynacji.

Rozważ następujące typowe scenariusze: Pokój rozmów, w którym muszą znajdować się wiadomości zamówione i dostarczone do wszystkich uczestników; wspólny dokument, w którym więcej użytkownicy edytują jednocześnie; ogranicznik szybkości dla interfejsów API, które muszą liczyć żądania w sposób globalnie spójny. We wszystkich tych przypadkach bezpaństwowy model Robotników to nie wystarczy: potrzebujesz jednego były bardzo spójne i pojemność koordynować konkurencyjne żądania w jednym miejscu.

To jest właśnie problem, o którym mówię Trwałe obiekty rozwiązują.

Czego się nauczysz

  • Co to jest przedmiot trwały i czym różni się od obiektu Workers KV
  • Silny model spójności: jeden autor, globalna koordynacja
  • Jak zaimplementować WebSocket z zarządzaniem sesjami przy użyciu trwałych obiektów
  • Pamięć transakcyjna: transakcje odczytu, zapisu i atomowe
  • Wzorce dla pokojów rozmów, ograniczania szybkości i współpracy nad dokumentami
  • Alarmy: zaplanowane operacje w ramach Obiektu Trwałego
  • Limity, ceny i kiedy wybrać DO vs KV vs D1

Co to jest przedmiot trwały

Trwały obiekt (DO) to klasa JavaScript/TypeScript tworzona przez Cloudflare w jednej lokalizacji geograficznej dla danego identyfikatora. W przeciwieństwie do Workers KV (możliwa spójność na ponad 300 punktach PoP), DO ma następujące gwarancje:

  • Spójność pojedynczego autora: tylko jedno wystąpienie DO istnieje na całym świecie dla danego identyfikatora
  • Serializacja żądań: Wywołania DO są wykonywane sekwencyjnie, nigdy jednocześnie
  • Trwałe przechowywanie: prywatny transakcyjny magazyn klucz-wartość dla każdej instancji
  • Hibernacja WebSocket: Połączenia WebSocket wytrzymują okresy bezczynności bez ponoszenia kosztów

Lokalizacja DO jest określana automatycznie po pierwszej aktywacji i pozostaje stała. Cloudflare wybiera centrum danych najbliżej pierwszego żądania. Wszystkie kolejne żądania, w dowolnym miejscu na świecie, są kierowane do tego jednego instancję za pośrednictwem routingu anycast Cloudflare.

Prymitywny Konsystencja Konkurs Przeczytaj opóźnienie Użyj przypadku
Pracownicy KV Ewentualne (minuty) Wielu pisarzy ~1 ms (w pamięci podręcznej) Konfiguracja, zasoby, sesje wymagające dużej liczby odczytów
Trwałe obiekty Silny (linearyzowalny) Pojedynczy pisarz ~50-150ms (od zdalnej krawędzi) Czat, ogranicznik prędkości, stan gry, CRDT
SQLite D1 Silny (główny) Wielu czytelników, jeden autor ~5-20ms (z pobliskiego PoP) Zapytania relacyjne, raporty, OLTP
Pamięć obiektowa R2 Silny (według obiektu) Wielu autorów (wykrywanie konfliktów) ~50-200ms Pliki, obrazy, kopie zapasowe

Struktura trwałego przedmiotu

Trwały obiekt to klasa z określonymi metodami środowiska wykonawczego Cloudflare rozpoznaje. Najważniejsze jest fetch(), wezwał każdego Żądanie HTTP przekazane do DO. Zobaczmy podstawową strukturę:

// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza

export class CounterDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
    // state.storage e il key-value store privato di questa istanza
    // Persiste attraverso i riavvii del DO
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    switch (url.pathname) {
      case '/increment': {
        // Legge il valore corrente (undefined se non esiste)
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = current + 1;
        // Scrittura atomica: garantita durabile prima del return
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/decrement': {
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = Math.max(0, current - 1);
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/value': {
        const count = (await this.state.storage.get<number>('count')) ?? 0;
        return Response.json({ count });
      }

      case '/reset': {
        await this.state.storage.put('count', 0);
        return Response.json({ count: 0, reset: true });
      }

      default:
        return new Response('Not Found', { status: 404 });
    }
  }
}

interface Env {
  COUNTER: DurableObjectNamespace;
}

Aby użyć DO z punktu wejścia Worker, należy utworzyć jego instancję poprzez powiązanie przestrzeni nazw i identyfikator:

// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Estrae l'ID dalla query string: /counter?id=room-1
    const counterId = url.searchParams.get('id') ?? 'global';

    // idFromName() crea un ID deterministico da una stringa
    // Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
    const id = env.COUNTER.idFromName(counterId);

    // Ottiene lo stub per comunicare con l'istanza
    const stub = env.COUNTER.get(id);

    // Invia la richiesta al Durable Object
    // La richiesta viene instradata al datacenter corretto automaticamente
    return stub.fetch(request);
  },
};

interface Env {
  COUNTER: DurableObjectNamespace;
}

Konfiguracja wrangler.toml musi zadeklarować wiązanie:

# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]

[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"

[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]

WebSocket z trwałymi obiektami: pokój rozmów

Najpotężniejszym przypadkiem użycia obiektów trwałych jest zarządzanie sesjami Udostępniony stanowy protokół WebSocket. Każdy pokój rozmów jest oddzielną instancją DO, który przechowuje listę aktywnych połączeń i historię wiadomości.

Cloudflare obsługuje Interfejs API hibernacji protokołu WebSocket: nadchodzi DO „hibernowany”, gdy nie ma żadnych komunikatów do przetworzenia, co radykalnie zmniejsza koszty (płacisz tylko za czas przetwarzania, a nie za otwarte połączenia).

// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation

export class ChatRoomDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/ws') {
      // Verifica che sia una richiesta di upgrade WebSocket
      const upgradeHeader = request.headers.get('Upgrade');
      if (!upgradeHeader || upgradeHeader !== 'websocket') {
        return new Response('Expected WebSocket upgrade', { status: 426 });
      }

      // Crea la coppia WebSocket server/client
      const { 0: clientWs, 1: serverWs } = new WebSocketPair();

      // Accetta la connessione tramite la Hibernation API
      // Il DO sara ibernato tra i messaggi (no costo di CPU idle)
      this.state.acceptWebSocket(serverWs);

      // Opzionale: associa metadata alla connessione
      // Utile per identificare l'utente nei messaggi successivi
      const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
      serverWs.serializeAttachment({ userId });

      // Invia la storia recente al nuovo utente
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      if (history.length > 0) {
        serverWs.send(JSON.stringify({ type: 'history', messages: history }));
      }

      return new Response(null, {
        status: 101,
        webSocket: clientWs,
      });
    }

    if (url.pathname === '/messages' && request.method === 'GET') {
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      return Response.json({ messages: history });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };

    let parsed: ClientMessage;
    try {
      parsed = JSON.parse(message as string);
    } catch {
      ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
      return;
    }

    if (parsed.type === 'chat') {
      const msg: Message = {
        id: crypto.randomUUID(),
        userId,
        text: parsed.text,
        timestamp: Date.now(),
      };

      // Salva nella history (mantieni solo gli ultimi 100 messaggi)
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      const newHistory = [...history, msg].slice(-100);
      await this.state.storage.put('history', newHistory);

      // Broadcast a tutte le connessioni WebSocket attive nel DO
      const allWebSockets = this.state.getWebSockets();
      const payload = JSON.stringify({ type: 'message', message: msg });
      for (const socket of allWebSockets) {
        try {
          socket.send(payload);
        } catch {
          // Connessione chiusa, ignorala
        }
      }
    }
  }

  // Chiamato quando una connessione WebSocket viene chiusa
  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };
    ws.close(code, reason);

    // Notifica gli altri utenti dell'uscita
    const notification = JSON.stringify({
      type: 'user_left',
      userId,
      timestamp: Date.now(),
    });
    for (const socket of this.state.getWebSockets()) {
      try {
        socket.send(notification);
      } catch { /* ignore */ }
    }
  }

  // Chiamato in caso di errore sulla connessione WebSocket
  async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
    console.error('WebSocket error:', error);
    ws.close(1011, 'Internal error');
  }
}

interface Message {
  id: string;
  userId: string;
  text: string;
  timestamp: number;
}

interface ClientMessage {
  type: 'chat' | 'ping';
  text?: string;
}

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Punkt wejścia pracownika kieruje żądania do odpowiedniego pokoju na podstawie ścieżki:

// src/worker.ts
export { ChatRoomDO } from './chat-room-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // /room/:roomId/ws -> WebSocket per la stanza
    // /room/:roomId/messages -> GET cronologia
    const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
    if (!match) {
      return new Response('Not Found', { status: 404 });
    }

    const roomId = match[1];
    const subpath = match[2] ?? '/ws';

    // Ogni stanza e una istanza distinta del DO
    const id = env.CHAT_ROOM.idFromName(roomId);
    const stub = env.CHAT_ROOM.get(id);

    // Rewrite del path per il DO
    const doUrl = new URL(request.url);
    doUrl.pathname = subpath;

    return stub.fetch(new Request(doUrl.toString(), request));
  },
};

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Przechowywanie transakcyjne: operacje atomowe

Pamięć Durable Objects obsługuje transakcje atomowe za pośrednictwem state.storage.transaction(). Ma to kluczowe znaczenie, kiedy operacja musi konsekwentnie czytać i zapisywać wiele kluczy:

// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
  state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    if (request.method !== 'POST') {
      return new Response('Method Not Allowed', { status: 405 });
    }

    const { from, to, amount } = await request.json<TransferRequest>();

    try {
      // La transazione e atomica: o tutto va a buon fine, o niente
      await this.state.storage.transaction(async (txn) => {
        const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
        const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;

        if (fromBalance < amount) {
          // Il throw annulla la transazione
          throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
        }

        await txn.put(`balance:${from}`, fromBalance - amount);
        await txn.put(`balance:${to}`, toBalance + amount);

        // Log dell'operazione
        const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
        txLog.push({ from, to, amount, timestamp: Date.now() });
        await txn.put('tx_log', txLog.slice(-1000));
      });

      return Response.json({ success: true, from, to, amount });
    } catch (err) {
      return Response.json(
        { success: false, error: (err as Error).message },
        { status: 400 }
      );
    }
  }
}

interface TransferRequest {
  from: string;
  to: string;
  amount: number;
}

interface TxRecord {
  from: string;
  to: string;
  amount: number;
  timestamp: number;
}

interface Env {
  ACCOUNT: DurableObjectNamespace;
}

Alarmy: Zaplanowane operacje w obiekcie trwałym

Wsparcie dla trwałych obiektów Alarmy: oddzwonienie alarm() który jest wywoływany w zaplanowanym czasie, nawet jeśli mnie tam nie ma aktywne żądania do DO. Jest to przydatne w przypadku TTL, terminów i zadań okresowych powiązane ze statusem DO:

// src/session-do.ts
// DO con alarm per scadenza automatica della sessione

export class SessionDO implements DurableObject {
  state: DurableObjectState;
  static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/create' && request.method === 'POST') {
      const data = await request.json<SessionData>();

      // Salva i dati della sessione
      await this.state.storage.put('session', {
        ...data,
        createdAt: Date.now(),
      });

      // Schedula l'alarm per la scadenza della sessione
      // Se l'alarm e gia schedulato, viene sostituito
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
    }

    if (url.pathname === '/get') {
      const session = await this.state.storage.get<SessionData>('session');
      if (!session) {
        return Response.json({ error: 'Session not found' }, { status: 404 });
      }

      // Refresh del TTL ad ogni accesso (sliding expiry)
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ session });
    }

    if (url.pathname === '/invalidate' && request.method === 'DELETE') {
      await this.state.storage.deleteAll();
      await this.state.storage.deleteAlarm();
      return Response.json({ ok: true });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato automaticamente quando scatta l'alarm
  async alarm(): Promise<void> {
    // Pulisce i dati della sessione scaduta
    const session = await this.state.storage.get<SessionData>('session');
    if (session) {
      console.log(`Session expired for user: ${session.userId}`);
      await this.state.storage.deleteAll();
    }
  }
}

interface SessionData {
  userId: string;
  role: string;
  metadata?: Record<string, unknown>;
}

interface Env {
  SESSION: DurableObjectNamespace;
}

Globalny ogranicznik szybkości z trwałymi obiektami

Rozproszony ogranicznik szybkości jest jednym z najbardziej pożądanych wzorców publicznych interfejsów API. W przypadku Workers KV wdrożenie byłoby uzależnione od warunków wyścigowych. Z literą C, każdy „wiadro” ograniczające szybkość jest instancją o silnej spójności:

// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects

export class RateLimiterDO implements DurableObject {
  state: DurableObjectState;

  // Configurazione: 100 req/minuto per IP
  static MAX_TOKENS = 100;
  static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const now = Date.now();

    // Legge lo stato attuale del bucket
    const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
      tokens: RateLimiterDO.MAX_TOKENS,
      lastRefill: now,
    };

    // Calcola quanti token sono stati aggiunti dall'ultimo accesso
    const elapsed = now - bucket.lastRefill;
    const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
    const currentTokens = Math.min(
      RateLimiterDO.MAX_TOKENS,
      bucket.tokens + tokensToAdd
    );

    if (currentTokens < 1) {
      // Rate limit superato
      const retryAfterMs = Math.ceil(
        (1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
      );

      await this.state.storage.put('bucket', {
        tokens: currentTokens,
        lastRefill: now,
      });

      return Response.json(
        {
          allowed: false,
          retryAfter: Math.ceil(retryAfterMs / 1000),
          remaining: 0,
        },
        {
          status: 429,
          headers: {
            'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
            'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
            'X-RateLimit-Remaining': '0',
          },
        }
      );
    }

    // Consuma un token e aggiorna il bucket
    await this.state.storage.put('bucket', {
      tokens: currentTokens - 1,
      lastRefill: now,
    });

    return Response.json({
      allowed: true,
      remaining: Math.floor(currentTokens - 1),
    });
  }
}

interface TokenBucket {
  tokens: number;
  lastRefill: number;
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Worker integruje ogranicznik szybkości z przepływem każdego żądania:

// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Identifica il client (IP o API key)
    const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
    const apiKey = request.headers.get('X-API-Key');
    const bucketId = apiKey ?? `ip:${clientIp}`;

    // Controlla il rate limit per questo client
    const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
    const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
    const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));

    if (limitCheck.status === 429) {
      return limitCheck; // Propaga la risposta 429 con gli headers
    }

    // Prosegue con la logica dell'API
    return handleApiRequest(request, env);
  },
};

async function handleApiRequest(request: Request, env: Env): Promise<Response> {
  return Response.json({ data: 'your api response here' });
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Względy wydajności i kosztów

Obiekty trwałe mają bardzo różny profil kosztów i wydajności do prostych Robotników. Kilka kluczowych punktów, o których warto pamiętać:

Opóźnienie: nie zawsze lokalne

Każda instancja DO znajduje się w jednym centrum danych. Jeśli użytkownik w Europie uzyskuje dostęp do instancji DO utworzonej w Ameryce Północnej, opóźnienie każdej operacji obejmuje podróż transatlantycką w obie strony (~100-150 ms). Zaprojektuj identyfikatory DO Aby ograniczyć udostępnianie geograficzne: Jeśli to możliwe, używaj identyfikatorów opartych na regionie (idFromName(`${region}:${resourceId}`)) lub zaakceptuj opóźnienie wysoki jedynie w przypadku operacji wymagających silnej spójności międzyregionalnej.

Głos Darmowe poziomy Płatni (płatni pracownicy)
Prośby do DO 1M/miesiąc wliczony w cenę 0,15 dolara za milion powyżej
DO Żywotność (procesor) 400 000 GB-s/miesiąc 12,50 dolarów za milion GB-s
Składowanie 1 GB w zestawie Powyżej 0,20 USD/GB miesięcznie
Hibernacja protokołu WebSocket Dostępny Dostępne (bez kosztów bezczynności)
Alarmy Dostępny Dostępny

Najlepsze praktyki

  • Szczegółowość identyfikatora: użyj określonych identyfikatorów (np. chat:room-42) zamiast identyfikatorów globalnych, aby uniknąć hotspotów w poszczególnych instancjach.
  • Hibernacja protokołu WebSocket zawsze: USA state.acceptWebSocket() zamiast ręcznych detektorów zdarzeń, aby zmniejszyć koszty bezczynności połączeń.
  • Przechowywanie partii: USA storage.put(map) pisać wiele kluczy w jednej operacji zamiast wielu put() syngiel.
  • Ogranicz rozmiar pamięci: każda instancja ma limit 128 MB przechowywania. W przypadku dużych danych użyj R2 i zapisuj tylko odniesienia w DO.
  • Budżet serializacji: żądania są umieszczane w kolejce i przetwarzane sekwencyjnie; powolne operacje blokują kolejne. Zachowaj opiekunów szybko (idealnie <1 s).

Wnioski i dalsze kroki

Trwałe Obiekty wypełniają lukę pomiędzy bezpaństwowym modelem Robotników i nowoczesne aplikacje wymagające koordynacji i współdzielenia stanu. Ich w połączeniu z interfejsem API hibernacji WebSocket i alarmami czyni je prymitywnymi kompletny do czatów, gier, współpracy nad dokumentami i globalnych systemów koordynacji.

Kompromisem jest opóźnienie: DO fizycznie znajduje się tylko w jednym centrum danych, dlatego operacje zapisu między regionami zwiększają opóźnienie. Do odczytów skalowalne rozważ połączenie DO (dla zapisu) z KV (dla odczytów z pamięci podręcznej).

Następne artykuły z serii

  • Artykuł 5: Workers AI — wnioskowanie z modeli LLM i wizji on the Edge: Jak uruchomić modele Llama, Whisper i Vision bezpośrednio w Workersach bez dedykowanego procesora graficznego, a sztuczna inteligencja Workers rośnie o 4000% rok do roku.
  • Artykuł 6: Vercel Edge Runtime — zaawansowane oprogramowanie pośredniczące, Geolokalizacja i testy A/B: podejście Vercel do krawędzi z Next.js.
  • Artykuł 7: Trasowanie geograficzne na krawędzi — personalizacja Treść i zgodność z RODO: geo-ogrodzenie, zlokalizowane ceny i RODO.