Obiecte durabile: Stare puternic consistentă și WebSockets la margine
Obiectele durabile sunt cele mai puternice primitive din ecosistemul Cloudflare: permite o stare puternic consecventă, WebSocket cu gestionarea sesiunii și coordonare distribuită fără servere centralizate, toate la marginea globală.
Problema de stat în Edge Computing
Articolele anterioare din această serie au arătat cum excelează Cloudflare Workers pentru încărcături de lucru fără stat: fiecare cerere este tratată izolat, fără memorie partajată între invocaţii. Această caracteristică este un punct forte pentru scalabilitate orizontală, dar devine un obstacol de îndată ce aplicarea necesită coordonare.
Luați în considerare aceste scenarii comune: o cameră de chat unde trebuie să fie mesaje comandat și livrat tuturor participanților; un document de colaborare unde mai mult utilizatorii editează simultan; un limitator de rată pentru API-urile care trebuie să conteze cererile într-un mod coerent la nivel global. În toate aceste cazuri modelul apatrid al Muncitorilor nu este suficient: ai nevoie de unul fost foarte consistent si capacitate pentru a coordona cererile concurente într-un singur loc.
Aceasta este exact problema pe care i Obiecte durabile ei rezolvă.
Ce vei învăța
- Ce este un obiect durabil și cum diferă de Workers KV
- Modelul de consistență puternică: un singur scriitor, coordonare globală
- Cum să implementați WebSocket cu gestionarea sesiunii folosind Durable Objects
- Stocare tranzacțională: citire, scriere și tranzacții atomice
- Modele pentru camere de chat, limitarea ratei și colaborarea documentelor
- Alarme: operațiuni programate în cadrul Obiectului Durabil
- Limite, prețuri și când să alegeți DO vs KV vs D1
Ce este un obiect durabil
Un obiect durabil (DO) este o clasă JavaScript/TypeScript pe care Cloudflare o instanțează într-o singură locație geografică pentru un anumit identificator. Spre deosebire de Muncitorii KV (consecvență posibilă pe peste 300 de PoP), un DO are următoarele garanții:
- Consecvență cu un singur scriitor: doar o singură instanță a DO există la nivel mondial pentru un anumit ID
- Serializarea cererilor: Apelurile DO sunt executate secvenţial, niciodată simultan
- Depozitare durabilă: un magazin tranzacțional privat cheie-valoare pentru fiecare instanță
- Hibernare WebSocket: Conexiunile WebSocket supraviețuiesc perioadelor de inactivitate fără costuri
Locația unui DO este determinată automat la prima activare și rămâne fix. Cloudflare alege centrul de date cel mai apropiat de prima solicitare. Toate cererile ulterioare, oriunde în lume, sunt direcționate către acea singură instanță prin rutarea Anycast Cloudflare.
| Primitiv | Consecvență | Concurenţă | Latența de citire | Caz de utilizare |
|---|---|---|---|---|
| Muncitori KV | Eventual (minute) | Multi-scriitor | ~1 ms (în cache) | Configurație, active, sesiuni de citire grele |
| Obiecte durabile | Puternic (liniizabil) | Un singur scriitor | ~50-150 ms (de la marginea de la distanță) | Chat, limitator de rată, stare joc, CRDT |
| D1 SQLite | Puternic (primar) | Multi-cititor, un singur scriitor | ~5-20 ms (de la PoP din apropiere) | Interogări relaționale, rapoarte, OLTP |
| Depozitarea obiectelor R2 | Puternic (după obiect) | Multi-writer (detecția conflictelor) | ~50-200 ms | Fișiere, imagini, copii de rezervă |
Structura unui obiect durabil
Un obiect durabil este o clasă cu metode specifice pe care rulează Cloudflare
recunoaște. Cel mai important este fetch(), chemat pentru fiecare
Solicitarea HTTP a fost redirecționată către DO. Să vedem structura de bază:
// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza
export class CounterDO implements DurableObject {
private state: DurableObjectState;
private env: Env;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
this.env = env;
// state.storage e il key-value store privato di questa istanza
// Persiste attraverso i riavvii del DO
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
switch (url.pathname) {
case '/increment': {
// Legge il valore corrente (undefined se non esiste)
const current = (await this.state.storage.get<number>('count')) ?? 0;
const next = current + 1;
// Scrittura atomica: garantita durabile prima del return
await this.state.storage.put('count', next);
return Response.json({ count: next });
}
case '/decrement': {
const current = (await this.state.storage.get<number>('count')) ?? 0;
const next = Math.max(0, current - 1);
await this.state.storage.put('count', next);
return Response.json({ count: next });
}
case '/value': {
const count = (await this.state.storage.get<number>('count')) ?? 0;
return Response.json({ count });
}
case '/reset': {
await this.state.storage.put('count', 0);
return Response.json({ count: 0, reset: true });
}
default:
return new Response('Not Found', { status: 404 });
}
}
}
interface Env {
COUNTER: DurableObjectNamespace;
}
Pentru a utiliza DO din punctul de intrare Worker, trebuie să-l instanțiați prin legarea spațiului de nume si un act de identitate:
// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Estrae l'ID dalla query string: /counter?id=room-1
const counterId = url.searchParams.get('id') ?? 'global';
// idFromName() crea un ID deterministico da una stringa
// Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
const id = env.COUNTER.idFromName(counterId);
// Ottiene lo stub per comunicare con l'istanza
const stub = env.COUNTER.get(id);
// Invia la richiesta al Durable Object
// La richiesta viene instradata al datacenter corretto automaticamente
return stub.fetch(request);
},
};
interface Env {
COUNTER: DurableObjectNamespace;
}
Configurația wrangler.toml trebuie să declare obligatoriu:
# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]
[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"
[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]
WebSocket cu obiecte durabile: cameră de chat
Cel mai puternic caz de utilizare al obiectelor durabile este gestionarea sesiunii WebSocket cu stare partajată. Fiecare cameră de chat este o instanță separată a DO, care menține lista conexiunilor active și istoricul mesajelor.
Cloudflare acceptă API-ul WebSocket Hibernation: DO vine „hibernat” atunci când nu există mesaje de procesat, reducând dramatic costurile (plătiți doar pentru timpul de procesare, nu pentru conexiunile deschise).
// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation
export class ChatRoomDO implements DurableObject {
private state: DurableObjectState;
private env: Env;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
this.env = env;
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/ws') {
// Verifica che sia una richiesta di upgrade WebSocket
const upgradeHeader = request.headers.get('Upgrade');
if (!upgradeHeader || upgradeHeader !== 'websocket') {
return new Response('Expected WebSocket upgrade', { status: 426 });
}
// Crea la coppia WebSocket server/client
const { 0: clientWs, 1: serverWs } = new WebSocketPair();
// Accetta la connessione tramite la Hibernation API
// Il DO sara ibernato tra i messaggi (no costo di CPU idle)
this.state.acceptWebSocket(serverWs);
// Opzionale: associa metadata alla connessione
// Utile per identificare l'utente nei messaggi successivi
const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
serverWs.serializeAttachment({ userId });
// Invia la storia recente al nuovo utente
const history = (await this.state.storage.get<Message[]>('history')) ?? [];
if (history.length > 0) {
serverWs.send(JSON.stringify({ type: 'history', messages: history }));
}
return new Response(null, {
status: 101,
webSocket: clientWs,
});
}
if (url.pathname === '/messages' && request.method === 'GET') {
const history = (await this.state.storage.get<Message[]>('history')) ?? [];
return Response.json({ messages: history });
}
return new Response('Not Found', { status: 404 });
}
// Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const { userId } = ws.deserializeAttachment() as { userId: string };
let parsed: ClientMessage;
try {
parsed = JSON.parse(message as string);
} catch {
ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
return;
}
if (parsed.type === 'chat') {
const msg: Message = {
id: crypto.randomUUID(),
userId,
text: parsed.text,
timestamp: Date.now(),
};
// Salva nella history (mantieni solo gli ultimi 100 messaggi)
const history = (await this.state.storage.get<Message[]>('history')) ?? [];
const newHistory = [...history, msg].slice(-100);
await this.state.storage.put('history', newHistory);
// Broadcast a tutte le connessioni WebSocket attive nel DO
const allWebSockets = this.state.getWebSockets();
const payload = JSON.stringify({ type: 'message', message: msg });
for (const socket of allWebSockets) {
try {
socket.send(payload);
} catch {
// Connessione chiusa, ignorala
}
}
}
}
// Chiamato quando una connessione WebSocket viene chiusa
async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
const { userId } = ws.deserializeAttachment() as { userId: string };
ws.close(code, reason);
// Notifica gli altri utenti dell'uscita
const notification = JSON.stringify({
type: 'user_left',
userId,
timestamp: Date.now(),
});
for (const socket of this.state.getWebSockets()) {
try {
socket.send(notification);
} catch { /* ignore */ }
}
}
// Chiamato in caso di errore sulla connessione WebSocket
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
console.error('WebSocket error:', error);
ws.close(1011, 'Internal error');
}
}
interface Message {
id: string;
userId: string;
text: string;
timestamp: number;
}
interface ClientMessage {
type: 'chat' | 'ping';
text?: string;
}
interface Env {
CHAT_ROOM: DurableObjectNamespace;
}
Punctul de intrare al lucrătorului direcționează cererile către camera corectă pe baza căii:
// src/worker.ts
export { ChatRoomDO } from './chat-room-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// /room/:roomId/ws -> WebSocket per la stanza
// /room/:roomId/messages -> GET cronologia
const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
if (!match) {
return new Response('Not Found', { status: 404 });
}
const roomId = match[1];
const subpath = match[2] ?? '/ws';
// Ogni stanza e una istanza distinta del DO
const id = env.CHAT_ROOM.idFromName(roomId);
const stub = env.CHAT_ROOM.get(id);
// Rewrite del path per il DO
const doUrl = new URL(request.url);
doUrl.pathname = subpath;
return stub.fetch(new Request(doUrl.toString(), request));
},
};
interface Env {
CHAT_ROOM: DurableObjectNamespace;
}
Stocare tranzacțională: operațiuni atomice
Stocarea Durable Objects acceptă tranzacții atomice prin
state.storage.transaction(). Acest lucru este crucial când
o operație trebuie să citească și să scrie mai multe chei în mod constant:
// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
state: DurableObjectState;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
if (request.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 });
}
const { from, to, amount } = await request.json<TransferRequest>();
try {
// La transazione e atomica: o tutto va a buon fine, o niente
await this.state.storage.transaction(async (txn) => {
const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;
if (fromBalance < amount) {
// Il throw annulla la transazione
throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
}
await txn.put(`balance:${from}`, fromBalance - amount);
await txn.put(`balance:${to}`, toBalance + amount);
// Log dell'operazione
const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
txLog.push({ from, to, amount, timestamp: Date.now() });
await txn.put('tx_log', txLog.slice(-1000));
});
return Response.json({ success: true, from, to, amount });
} catch (err) {
return Response.json(
{ success: false, error: (err as Error).message },
{ status: 400 }
);
}
}
}
interface TransferRequest {
from: string;
to: string;
amount: number;
}
interface TxRecord {
from: string;
to: string;
amount: number;
timestamp: number;
}
interface Env {
ACCOUNT: DurableObjectNamespace;
}
Alarme: Operații programate în obiectul durabil
Suport pentru obiecte durabile Alarme: un apel invers
alarm() care este invocat la o oră programată, chiar dacă eu nu sunt acolo
solicitări active către DO. Acest lucru este util pentru TTL, termene limită și lucrări periodice
legate de statutul DO:
// src/session-do.ts
// DO con alarm per scadenza automatica della sessione
export class SessionDO implements DurableObject {
state: DurableObjectState;
static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti
constructor(state: DurableObjectState, env: Env) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/create' && request.method === 'POST') {
const data = await request.json<SessionData>();
// Salva i dati della sessione
await this.state.storage.put('session', {
...data,
createdAt: Date.now(),
});
// Schedula l'alarm per la scadenza della sessione
// Se l'alarm e gia schedulato, viene sostituito
await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);
return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
}
if (url.pathname === '/get') {
const session = await this.state.storage.get<SessionData>('session');
if (!session) {
return Response.json({ error: 'Session not found' }, { status: 404 });
}
// Refresh del TTL ad ogni accesso (sliding expiry)
await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);
return Response.json({ session });
}
if (url.pathname === '/invalidate' && request.method === 'DELETE') {
await this.state.storage.deleteAll();
await this.state.storage.deleteAlarm();
return Response.json({ ok: true });
}
return new Response('Not Found', { status: 404 });
}
// Chiamato automaticamente quando scatta l'alarm
async alarm(): Promise<void> {
// Pulisce i dati della sessione scaduta
const session = await this.state.storage.get<SessionData>('session');
if (session) {
console.log(`Session expired for user: ${session.userId}`);
await this.state.storage.deleteAll();
}
}
}
interface SessionData {
userId: string;
role: string;
metadata?: Record<string, unknown>;
}
interface Env {
SESSION: DurableObjectNamespace;
}
Limitator global de viteză cu obiecte durabile
Un limitator de rată distribuită este unul dintre cele mai solicitate modele pentru API-urile publice. Cu Workers KV, implementarea ar fi supusă condițiilor de cursă. Cu un C, fiecare „bucket” de limitare a ratei este o instanță cu o consistență puternică:
// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects
export class RateLimiterDO implements DurableObject {
state: DurableObjectState;
// Configurazione: 100 req/minuto per IP
static MAX_TOKENS = 100;
static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo
constructor(state: DurableObjectState, env: Env) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
const now = Date.now();
// Legge lo stato attuale del bucket
const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
tokens: RateLimiterDO.MAX_TOKENS,
lastRefill: now,
};
// Calcola quanti token sono stati aggiunti dall'ultimo accesso
const elapsed = now - bucket.lastRefill;
const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
const currentTokens = Math.min(
RateLimiterDO.MAX_TOKENS,
bucket.tokens + tokensToAdd
);
if (currentTokens < 1) {
// Rate limit superato
const retryAfterMs = Math.ceil(
(1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
);
await this.state.storage.put('bucket', {
tokens: currentTokens,
lastRefill: now,
});
return Response.json(
{
allowed: false,
retryAfter: Math.ceil(retryAfterMs / 1000),
remaining: 0,
},
{
status: 429,
headers: {
'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
'X-RateLimit-Remaining': '0',
},
}
);
}
// Consuma un token e aggiorna il bucket
await this.state.storage.put('bucket', {
tokens: currentTokens - 1,
lastRefill: now,
});
return Response.json({
allowed: true,
remaining: Math.floor(currentTokens - 1),
});
}
}
interface TokenBucket {
tokens: number;
lastRefill: number;
}
interface Env {
RATE_LIMITER: DurableObjectNamespace;
}
Lucrătorul integrează limitatorul de rată în fluxul fiecărei cereri:
// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Identifica il client (IP o API key)
const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
const apiKey = request.headers.get('X-API-Key');
const bucketId = apiKey ?? `ip:${clientIp}`;
// Controlla il rate limit per questo client
const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));
if (limitCheck.status === 429) {
return limitCheck; // Propaga la risposta 429 con gli headers
}
// Prosegue con la logica dell'API
return handleApiRequest(request, env);
},
};
async function handleApiRequest(request: Request, env: Env): Promise<Response> {
return Response.json({ data: 'your api response here' });
}
interface Env {
RATE_LIMITER: DurableObjectNamespace;
}
Considerații de performanță și cost
Obiectele durabile au un profil de cost și performanță foarte diferit la Muncitori simpli. Câteva puncte cheie de reținut:
Latență: nu întotdeauna locală
Fiecare instanță DO rezidă într-un singur centru de date. Dacă un utilizator în Europa
accesează un DO instanțiat în America de Nord, latența fiecărei operațiuni
include călătoria transatlantică dus-întors (~100-150ms). Proiectați ID-uri DO
Pentru a limita partajarea geografică: utilizați ID-uri bazate pe regiune ori de câte ori este posibil
(idFromName(`${region}:${resourceId}`)), sau acceptați latența
ridicat numai pentru operațiunile care necesită o consecvență transregională puternică.
| Voce | Niveluri gratuite | Plătit (lucrători plătiți) |
|---|---|---|
| Cereri către DO | 1M/luna inclus | 0,15 USD per milion de mai sus |
| Durată de viață DO (CPU) | 400.000 GB-s/lună | 12,50 USD per milion GB-s |
| Depozitare | 1 GB inclus | 0,20 USD/GB-lună dincolo |
| Hibernare WebSocket | Disponibil | Disponibil (fără cost inactiv) |
| Alarme | Disponibil | Disponibil |
Cele mai bune practici
-
Granularitatea ID: utilizați ID-uri specifice (de ex.
chat:room-42) mai degrabă decât ID-uri globale pentru a evita hotspot-urile pe instanțe individuale. -
Hibernare WebSocket întotdeauna: STATELE UNITE ALE AMERICII
state.acceptWebSocket()în locul ascultătorilor manuali de evenimente pentru a reduce costurile de conectare inactivă. -
Stocare în lot: STATELE UNITE ALE AMERICII
storage.put(map)a scrie taste multiple într-o singură operație în loc de mai multeput()singuri. - Limită dimensiunea de stocare: fiecare instanță are o limită de 128 MB de depozitare. Pentru date mari utilizați R2 și salvați doar referințele în DO.
- Bugetul de serializare: cererile sunt puse în coadă și procesate secvenţial; operațiunile lente blochează cele ulterioare. Păstrați manipulatorii rapid (<1s ideal).
Concluzii și pașii următori
Obiectele durabile creează decalajul dintre modelul apatrid al muncitorilor și cel aplicații moderne care necesită coordonare și stare partajată. ale lor combinate cu WebSocket Hibernation API și Alarms le face un primitiv complet pentru chat, jocuri, colaborare în documente și sisteme de coordonare globală.
Compensația este latența: un DO este fizic într-un singur centru de date, astfel că operațiunile de scriere între regiuni adaugă latență. Pentru lecturi scalabil luați în considerare combinarea DO (pentru scrieri) cu KV (pentru citiri în cache).
Următoarele articole din serie
- Articolul 5: Workers AI — Inferența LLM și modele de viziune on the Edge: Cum să rulați modele Llama, Whisper și viziune direct în Workers fără GPU dedicat, cu IA pentru lucrători în creștere cu 4000% pe an.
- Articolul 6: Vercel Edge Runtime — Advanced Middleware, Geolocalizare și testare A/B: abordarea lui Vercel la margine cu Next.js.
- Articolul 7: Rutarea geografică la margine — Personalizare Conținut și conformitate GDPR: geo-fencing, prețuri localizate și GDPR.







