Uç Bilişimde Durum Sorunu

Bu serideki önceki makaleler Cloudflare Çalışanlarının ne kadar üstün olduğunu gösterdi durum bilgisi olmayan iş yükleri için: her istek, paylaşılan bellek olmadan yalıtılmış olarak işlenir çağrılar arasında. Bu özellik ölçeklenebilirlik açısından güçlü bir noktadır yataydır ancak uygulama koordinasyon gerektirdiği anda engel haline gelir.

Şu yaygın senaryoları göz önünde bulundurun: Mesajların iletilmesi gereken bir sohbet odası tüm katılımcılara sipariş edildi ve teslim edildi; daha fazlasının yer aldığı ortak bir belge kullanıcılar aynı anda düzenleme yapabilir; istekleri sayması gereken API'ler için hız sınırlayıcı küresel olarak tutarlı bir şekilde. Tüm bu durumlarda İşçilerin vatansız modeli bu yeterli değil: bir taneye ihtiyacın var son derece tutarlıydı ve kapasite Rekabet halindeki talepleri tek bir yerde koordine etmek.

Tam olarak yaşadığım sorun bu Dayanıklı Nesneler çözüyorlar.

Ne Öğreneceksiniz

  • Dayanıklı Nesne nedir ve İşçi KV'sinden farkı nedir?
  • Güçlü tutarlılık modeli: tek yazarlı, küresel koordinasyon
  • Dayanıklı Nesneler kullanılarak oturum yönetimi ile WebSocket nasıl uygulanır?
  • İşlemsel depolama: okuma, yazma ve atomik işlemler
  • Sohbet odaları, hız sınırlama ve belge işbirliğine yönelik modeller
  • Alarmlar: Dayanıklı Nesne içindeki planlanmış işlemler
  • Limitler, fiyatlandırma ve ne zaman DO, KV ve D1 arasında seçim yapılacağı

Dayanıklı Nesne Nedir?

Dayanıklı Nesne (DO), Cloudflare'in başlattığı bir JavaScript/TypeScript sınıfıdır Belirli bir tanımlayıcı için tek bir coğrafi konumda. İşçiler KV'den farklı olarak (300'den fazla PoP'ta olası tutarlılık), DO'nun şu garantileri vardır:

  • Tek yazarlı tutarlılık: Belirli bir kimlik için dünya çapında DO'nun yalnızca bir örneği mevcuttur
  • İsteklerin serileştirilmesi: DO çağrıları sırayla yürütülür, asla aynı anda yürütülmez
  • Dayanıklı Depolama: her örnek için özel bir işlemsel anahtar/değer deposu
  • WebSocket hazırda bekletme modu: WebSocket bağlantıları boşta kaldıkları sürelerden ücretsiz olarak kurtulur

Bir DO'nun konumu ilk aktivasyonda otomatik olarak belirlenir ve sabit kalır. Cloudflare, ilk isteğe en yakın veri merkezini seçer. Dünyanın herhangi bir yerindeki sonraki tüm istekler o tek kişiye yönlendirilir Cloudflare anycast yönlendirmesi aracılığıyla örnek.

İlkel Tutarlılık Yarışma Gecikmeyi okuma Kullanım örneği
İşçiler KV'si Nihai (dakika) Çoklu yazar ~1 ms (önbelleğe alınmış) Yapılandırma, varlıklar, okuma ağırlıklı oturumlar
Dayanıklı Nesneler Güçlü (doğrusallaştırılabilir) Tek yazar ~50-150ms (uzak uçtan) Sohbet, hız sınırlayıcı, oyun durumu, CRDT
D1 SQLit Güçlü (birincil) Çoklu okuyucu, tek yazarlı ~5-20 ms (yakındaki PoP'tan) İlişkisel sorgular, raporlar, OLTP
R2 Nesne Depolama Güçlü (nesneye göre) Çoklu yazar (çakışma tespiti) ~50-200ms Dosyalar, resimler, yedeklemeler

Dayanıklı Bir Nesnenin Yapısı

Dayanıklı Nesne, Cloudflare çalışma zamanının belirlediği belirli yöntemlere sahip bir sınıftır. tanır. En önemlisi fetch(), her biri için çağrıldı HTTP isteği DO'ya iletildi. Temel yapıya bakalım:

// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza

export class CounterDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
    // state.storage e il key-value store privato di questa istanza
    // Persiste attraverso i riavvii del DO
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    switch (url.pathname) {
      case '/increment': {
        // Legge il valore corrente (undefined se non esiste)
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = current + 1;
        // Scrittura atomica: garantita durabile prima del return
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/decrement': {
        const current = (await this.state.storage.get<number>('count')) ?? 0;
        const next = Math.max(0, current - 1);
        await this.state.storage.put('count', next);
        return Response.json({ count: next });
      }

      case '/value': {
        const count = (await this.state.storage.get<number>('count')) ?? 0;
        return Response.json({ count });
      }

      case '/reset': {
        await this.state.storage.put('count', 0);
        return Response.json({ count: 0, reset: true });
      }

      default:
        return new Response('Not Found', { status: 404 });
    }
  }
}

interface Env {
  COUNTER: DurableObjectNamespace;
}

DO'yu Çalışan giriş noktasından kullanmak için, bunu ad alanı bağlama yoluyla başlatmanız gerekir. ve bir kimlik:

// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // Estrae l'ID dalla query string: /counter?id=room-1
    const counterId = url.searchParams.get('id') ?? 'global';

    // idFromName() crea un ID deterministico da una stringa
    // Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
    const id = env.COUNTER.idFromName(counterId);

    // Ottiene lo stub per comunicare con l'istanza
    const stub = env.COUNTER.get(id);

    // Invia la richiesta al Durable Object
    // La richiesta viene instradata al datacenter corretto automaticamente
    return stub.fetch(request);
  },
};

interface Env {
  COUNTER: DurableObjectNamespace;
}

Yapılandırma wrangler.toml bağlayıcılığı beyan etmelidir:

# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]

[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"

[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]

Dayanıklı Nesnelere Sahip WebSocket: Sohbet Odası

Dayanıklı Nesnelerin en güçlü kullanım durumu oturum yönetimidir Paylaşılan durum bilgisi olan WebSocket. Her sohbet odası DO'nun ayrı bir örneğidir, aktif bağlantıların ve mesaj geçmişinin listesini tutar.

Cloudflare şunları destekliyor: WebSocket Hazırda Bekletme API'si: DO geliyor İşlenecek mesaj olmadığında "hazırda bekletme" moduna geçerek maliyetleri önemli ölçüde azaltır (açık bağlantılar için değil, yalnızca işlem süresi için ödeme yaparsınız).

// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation

export class ChatRoomDO implements DurableObject {
  private state: DurableObjectState;
  private env: Env;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/ws') {
      // Verifica che sia una richiesta di upgrade WebSocket
      const upgradeHeader = request.headers.get('Upgrade');
      if (!upgradeHeader || upgradeHeader !== 'websocket') {
        return new Response('Expected WebSocket upgrade', { status: 426 });
      }

      // Crea la coppia WebSocket server/client
      const { 0: clientWs, 1: serverWs } = new WebSocketPair();

      // Accetta la connessione tramite la Hibernation API
      // Il DO sara ibernato tra i messaggi (no costo di CPU idle)
      this.state.acceptWebSocket(serverWs);

      // Opzionale: associa metadata alla connessione
      // Utile per identificare l'utente nei messaggi successivi
      const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
      serverWs.serializeAttachment({ userId });

      // Invia la storia recente al nuovo utente
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      if (history.length > 0) {
        serverWs.send(JSON.stringify({ type: 'history', messages: history }));
      }

      return new Response(null, {
        status: 101,
        webSocket: clientWs,
      });
    }

    if (url.pathname === '/messages' && request.method === 'GET') {
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      return Response.json({ messages: history });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
  async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };

    let parsed: ClientMessage;
    try {
      parsed = JSON.parse(message as string);
    } catch {
      ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
      return;
    }

    if (parsed.type === 'chat') {
      const msg: Message = {
        id: crypto.randomUUID(),
        userId,
        text: parsed.text,
        timestamp: Date.now(),
      };

      // Salva nella history (mantieni solo gli ultimi 100 messaggi)
      const history = (await this.state.storage.get<Message[]>('history')) ?? [];
      const newHistory = [...history, msg].slice(-100);
      await this.state.storage.put('history', newHistory);

      // Broadcast a tutte le connessioni WebSocket attive nel DO
      const allWebSockets = this.state.getWebSockets();
      const payload = JSON.stringify({ type: 'message', message: msg });
      for (const socket of allWebSockets) {
        try {
          socket.send(payload);
        } catch {
          // Connessione chiusa, ignorala
        }
      }
    }
  }

  // Chiamato quando una connessione WebSocket viene chiusa
  async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
    const { userId } = ws.deserializeAttachment() as { userId: string };
    ws.close(code, reason);

    // Notifica gli altri utenti dell'uscita
    const notification = JSON.stringify({
      type: 'user_left',
      userId,
      timestamp: Date.now(),
    });
    for (const socket of this.state.getWebSockets()) {
      try {
        socket.send(notification);
      } catch { /* ignore */ }
    }
  }

  // Chiamato in caso di errore sulla connessione WebSocket
  async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
    console.error('WebSocket error:', error);
    ws.close(1011, 'Internal error');
  }
}

interface Message {
  id: string;
  userId: string;
  text: string;
  timestamp: number;
}

interface ClientMessage {
  type: 'chat' | 'ping';
  text?: string;
}

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

Çalışan giriş noktası, istekleri yola göre doğru odaya yönlendirir:

// src/worker.ts
export { ChatRoomDO } from './chat-room-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    // /room/:roomId/ws -> WebSocket per la stanza
    // /room/:roomId/messages -> GET cronologia
    const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
    if (!match) {
      return new Response('Not Found', { status: 404 });
    }

    const roomId = match[1];
    const subpath = match[2] ?? '/ws';

    // Ogni stanza e una istanza distinta del DO
    const id = env.CHAT_ROOM.idFromName(roomId);
    const stub = env.CHAT_ROOM.get(id);

    // Rewrite del path per il DO
    const doUrl = new URL(request.url);
    doUrl.pathname = subpath;

    return stub.fetch(new Request(doUrl.toString(), request));
  },
};

interface Env {
  CHAT_ROOM: DurableObjectNamespace;
}

İşlemsel Depolama: Atomik İşlemler

Dayanıklı Nesneler depolama, atomik işlemleri destekler state.storage.transaction(). Bu çok önemli bir işlemin birden fazla anahtarı tutarlı bir şekilde okuması ve yazması gerekir:

// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
  state: DurableObjectState;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    if (request.method !== 'POST') {
      return new Response('Method Not Allowed', { status: 405 });
    }

    const { from, to, amount } = await request.json<TransferRequest>();

    try {
      // La transazione e atomica: o tutto va a buon fine, o niente
      await this.state.storage.transaction(async (txn) => {
        const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
        const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;

        if (fromBalance < amount) {
          // Il throw annulla la transazione
          throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
        }

        await txn.put(`balance:${from}`, fromBalance - amount);
        await txn.put(`balance:${to}`, toBalance + amount);

        // Log dell'operazione
        const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
        txLog.push({ from, to, amount, timestamp: Date.now() });
        await txn.put('tx_log', txLog.slice(-1000));
      });

      return Response.json({ success: true, from, to, amount });
    } catch (err) {
      return Response.json(
        { success: false, error: (err as Error).message },
        { status: 400 }
      );
    }
  }
}

interface TransferRequest {
  from: string;
  to: string;
  amount: number;
}

interface TxRecord {
  from: string;
  to: string;
  amount: number;
  timestamp: number;
}

interface Env {
  ACCOUNT: DurableObjectNamespace;
}

Alarmlar: Dayanıklı Nesnede Planlanmış Operasyonlar

Dayanıklı Nesneler desteği Alarmlar: geri arama alarm() orada olmasam bile, planlanmış bir zamanda çağrılan DO'ya aktif talepler. Bu, TTL, son teslim tarihleri ve periyodik işler için kullanışlıdır. DO'nun durumuyla bağlantılı:

// src/session-do.ts
// DO con alarm per scadenza automatica della sessione

export class SessionDO implements DurableObject {
  state: DurableObjectState;
  static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname === '/create' && request.method === 'POST') {
      const data = await request.json<SessionData>();

      // Salva i dati della sessione
      await this.state.storage.put('session', {
        ...data,
        createdAt: Date.now(),
      });

      // Schedula l'alarm per la scadenza della sessione
      // Se l'alarm e gia schedulato, viene sostituito
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
    }

    if (url.pathname === '/get') {
      const session = await this.state.storage.get<SessionData>('session');
      if (!session) {
        return Response.json({ error: 'Session not found' }, { status: 404 });
      }

      // Refresh del TTL ad ogni accesso (sliding expiry)
      await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);

      return Response.json({ session });
    }

    if (url.pathname === '/invalidate' && request.method === 'DELETE') {
      await this.state.storage.deleteAll();
      await this.state.storage.deleteAlarm();
      return Response.json({ ok: true });
    }

    return new Response('Not Found', { status: 404 });
  }

  // Chiamato automaticamente quando scatta l'alarm
  async alarm(): Promise<void> {
    // Pulisce i dati della sessione scaduta
    const session = await this.state.storage.get<SessionData>('session');
    if (session) {
      console.log(`Session expired for user: ${session.userId}`);
      await this.state.storage.deleteAll();
    }
  }
}

interface SessionData {
  userId: string;
  role: string;
  metadata?: Record<string, unknown>;
}

interface Env {
  SESSION: DurableObjectNamespace;
}

Dayanıklı Nesnelerle Küresel Hız Sınırlayıcı

Dağıtılmış hız sınırlayıcı, genel API'ler için en çok talep edilen kalıplardan biridir. Workers KV'de uygulama yarış koşullarına tabi olacaktır. C ile her hız sınırlayıcı "kova" güçlü tutarlılığa sahip bir örnektir:

// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects

export class RateLimiterDO implements DurableObject {
  state: DurableObjectState;

  // Configurazione: 100 req/minuto per IP
  static MAX_TOKENS = 100;
  static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
  }

  async fetch(request: Request): Promise<Response> {
    const now = Date.now();

    // Legge lo stato attuale del bucket
    const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
      tokens: RateLimiterDO.MAX_TOKENS,
      lastRefill: now,
    };

    // Calcola quanti token sono stati aggiunti dall'ultimo accesso
    const elapsed = now - bucket.lastRefill;
    const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
    const currentTokens = Math.min(
      RateLimiterDO.MAX_TOKENS,
      bucket.tokens + tokensToAdd
    );

    if (currentTokens < 1) {
      // Rate limit superato
      const retryAfterMs = Math.ceil(
        (1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
      );

      await this.state.storage.put('bucket', {
        tokens: currentTokens,
        lastRefill: now,
      });

      return Response.json(
        {
          allowed: false,
          retryAfter: Math.ceil(retryAfterMs / 1000),
          remaining: 0,
        },
        {
          status: 429,
          headers: {
            'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
            'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
            'X-RateLimit-Remaining': '0',
          },
        }
      );
    }

    // Consuma un token e aggiorna il bucket
    await this.state.storage.put('bucket', {
      tokens: currentTokens - 1,
      lastRefill: now,
    });

    return Response.json({
      allowed: true,
      remaining: Math.floor(currentTokens - 1),
    });
  }
}

interface TokenBucket {
  tokens: number;
  lastRefill: number;
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Worker, hız sınırlayıcıyı her isteğin akışına entegre eder:

// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Identifica il client (IP o API key)
    const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
    const apiKey = request.headers.get('X-API-Key');
    const bucketId = apiKey ?? `ip:${clientIp}`;

    // Controlla il rate limit per questo client
    const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
    const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
    const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));

    if (limitCheck.status === 429) {
      return limitCheck; // Propaga la risposta 429 con gli headers
    }

    // Prosegue con la logica dell'API
    return handleApiRequest(request, env);
  },
};

async function handleApiRequest(request: Request, env: Env): Promise<Response> {
  return Response.json({ data: 'your api response here' });
}

interface Env {
  RATE_LIMITER: DurableObjectNamespace;
}

Performans ve Maliyet Hususları

Dayanıklı Nesneler çok farklı bir maliyet ve performans profiline sahiptir basit İşçilere. Akılda tutulması gereken bazı önemli noktalar:

Gecikme: Her Zaman Yerel Değil

Her DO örneği tek bir veri merkezinde bulunur. Avrupa'daki bir kullanıcı ise Kuzey Amerika'da başlatılan bir DO'ya eriştiğinde, her işlemin gecikme süresi transatlantik gidiş dönüş içerir (~100-150ms). DO kimliklerini tasarla Coğrafi paylaşımı sınırlamak için: Mümkün olduğunda bölgeye dayalı kimlikleri kullanın (idFromName(`${region}:${resourceId}`)) veya gecikmeyi kabul edin yalnızca güçlü bölgeler arası tutarlılık gerektiren operasyonlar için yüksektir.

Ses Ücretsiz katmanlar Ücretli (Ücretli Çalışanlar)
DO'ya yapılan talepler 1 milyon/ay dahil Yukarıdaki milyon başına 0,15 ABD doları
Ömür boyu (CPU) 400.000 GB-sn/ay Milyon GB-s başına 12,50 ABD doları
Depolamak 1 GB dahil 0,20 USD/GB-ay ötesi
WebSocket Hazırda Bekletme Mevcut Mevcut (boşta kalma maliyeti yok)
Alarmlar Mevcut Mevcut

En İyi Uygulamalar

  • Kimlik Ayrıntı Düzeyi: belirli kimlikler kullanın (ör. chat:room-42) bireysel örneklerde sıcak noktalardan kaçınmak için genel kimlikler yerine.
  • WebSocket Hazırda Bekletme modu her zaman: Amerika state.acceptWebSocket() boş bağlantı maliyetlerini azaltmak için manuel olay dinleyicileri yerine.
  • Toplu depolama: Amerika storage.put(map) yazmak birden fazla anahtar yerine tek bir işlemde birden fazla anahtar put() bekarlar.
  • Depolama boyutunu sınırlayın: her örneğin 128 MB sınırı vardır depolama. Büyük veriler için R2 kullanın ve referansları yalnızca DO'ya kaydedin.
  • Serileştirme bütçesi: istekler sıraya alınır ve işlenir sırasıyla; Yavaş işlemler sonraki işlemleri engeller. İşleyicileri sakla hızlı (<1s ideal).

Sonuçlar ve Sonraki Adımlar

Dayanıklı Nesneler, İşçilerin vatansız modeli ile koordinasyon ve paylaşımlı durum gerektiren modern uygulamalar. onlarınki WebSocket Hazırda Bekletme API'si ve Alarmlar ile birleştirildiğinde, onları ilkel hale getirir sohbet, oyun, belge işbirliği ve küresel koordinasyon sistemleri için eksiksizdir.

Bunun karşılığı gecikmedir: Bir DO fiziksel olarak yalnızca tek bir veri merkezinde bulunur, bu nedenle bölgeler arası yazma işlemleri gecikmeye neden olur. Okumalar için ölçeklenebilir, DO'yu (yazma için) KV (önbelleğe alınmış okumalar için) ile birleştirmeyi düşünün.

Serideki Sonraki Yazılar

  • Madde 5: İşçilerin Yapay Zekası - Yüksek Lisans ve Vizyon Modellerinin Çıkarımı Edge'de: Lama, Whisper ve görme modelleri doğrudan Workers'ta nasıl çalıştırılır Özel GPU olmadan, Workers AI'nın yıllık bazda %4000 büyümesiyle.
  • Madde 6: Vercel Edge Runtime — Gelişmiş Ara Yazılım, Coğrafi Konum ve A/B Testi: Vercel'in Next.js ile uç noktaya yaklaşımı.
  • Madde 7: Sınırda Coğrafi Yönlendirme - Kişiselleştirme GDPR İçeriği ve Uyumluluğu: coğrafi sınırlama, yerelleştirilmiş fiyatlandırma ve GDPR.