Trwałe obiekty: bardzo spójny stan i WebSockets na krawędzi
Trwałe obiekty są najpotężniejszym prymitywem w ekosystemie Cloudflare: umożliwiają silnie spójny stan, WebSocket z zarządzaniem sesjami i rozproszona koordynacja bez scentralizowanych serwerów, a wszystko to na całym świecie.
Problem stanu w przetwarzaniu brzegowym
Poprzednie artykuły z tej serii pokazały, jak wyróżnia się Cloudflare Workers w przypadku obciążeń bezstanowych: każde żądanie jest obsługiwane oddzielnie, bez pamięci współdzielonej pomiędzy wywołaniami. Ta funkcja jest mocną stroną skalowalności poziomo, ale staje się przeszkodą, gdy tylko zastosowanie wymaga koordynacji.
Rozważ następujące typowe scenariusze: Pokój rozmów, w którym muszą znajdować się wiadomości zamówione i dostarczone do wszystkich uczestników; wspólny dokument, w którym więcej użytkownicy edytują jednocześnie; ogranicznik szybkości dla interfejsów API, które muszą liczyć żądania w sposób globalnie spójny. We wszystkich tych przypadkach bezpaństwowy model Robotników to nie wystarczy: potrzebujesz jednego były bardzo spójne i pojemność koordynować konkurencyjne żądania w jednym miejscu.
To jest właśnie problem, o którym mówię Trwałe obiekty rozwiązują.
Czego się nauczysz
- Co to jest przedmiot trwały i czym różni się od obiektu Workers KV
- Silny model spójności: jeden autor, globalna koordynacja
- Jak zaimplementować WebSocket z zarządzaniem sesjami przy użyciu trwałych obiektów
- Pamięć transakcyjna: transakcje odczytu, zapisu i atomowe
- Wzorce dla pokojów rozmów, ograniczania szybkości i współpracy nad dokumentami
- Alarmy: zaplanowane operacje w ramach Obiektu Trwałego
- Limity, ceny i kiedy wybrać DO vs KV vs D1
Co to jest przedmiot trwały
Trwały obiekt (DO) to klasa JavaScript/TypeScript tworzona przez Cloudflare w jednej lokalizacji geograficznej dla danego identyfikatora. W przeciwieństwie do Workers KV (możliwa spójność na ponad 300 punktach PoP), DO ma następujące gwarancje:
- Spójność pojedynczego autora: tylko jedno wystąpienie DO istnieje na całym świecie dla danego identyfikatora
- Serializacja żądań: Wywołania DO są wykonywane sekwencyjnie, nigdy jednocześnie
- Trwałe przechowywanie: prywatny transakcyjny magazyn klucz-wartość dla każdej instancji
- Hibernacja WebSocket: Połączenia WebSocket wytrzymują okresy bezczynności bez ponoszenia kosztów
Lokalizacja DO jest określana automatycznie po pierwszej aktywacji i pozostaje stała. Cloudflare wybiera centrum danych najbliżej pierwszego żądania. Wszystkie kolejne żądania, w dowolnym miejscu na świecie, są kierowane do tego jednego instancję za pośrednictwem routingu anycast Cloudflare.
| Prymitywny | Konsystencja | Konkurs | Przeczytaj opóźnienie | Użyj przypadku |
|---|---|---|---|---|
| Pracownicy KV | Ewentualne (minuty) | Wielu pisarzy | ~1 ms (w pamięci podręcznej) | Konfiguracja, zasoby, sesje wymagające dużej liczby odczytów |
| Trwałe obiekty | Silny (linearyzowalny) | Pojedynczy pisarz | ~50-150ms (od zdalnej krawędzi) | Czat, ogranicznik prędkości, stan gry, CRDT |
| SQLite D1 | Silny (główny) | Wielu czytelników, jeden autor | ~5-20ms (z pobliskiego PoP) | Zapytania relacyjne, raporty, OLTP |
| Pamięć obiektowa R2 | Silny (według obiektu) | Wielu autorów (wykrywanie konfliktów) | ~50-200ms | Pliki, obrazy, kopie zapasowe |
Struktura trwałego przedmiotu
Trwały obiekt to klasa z określonymi metodami środowiska wykonawczego Cloudflare
rozpoznaje. Najważniejsze jest fetch(), wezwał każdego
Żądanie HTTP przekazane do DO. Zobaczmy podstawową strukturę:
// src/counter-do.ts
// Un Durable Object semplice: un contatore con persistenza
export class CounterDO implements DurableObject {
private state: DurableObjectState;
private env: Env;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
this.env = env;
// state.storage e il key-value store privato di questa istanza
// Persiste attraverso i riavvii del DO
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
switch (url.pathname) {
case '/increment': {
// Legge il valore corrente (undefined se non esiste)
const current = (await this.state.storage.get<number>('count')) ?? 0;
const next = current + 1;
// Scrittura atomica: garantita durabile prima del return
await this.state.storage.put('count', next);
return Response.json({ count: next });
}
case '/decrement': {
const current = (await this.state.storage.get<number>('count')) ?? 0;
const next = Math.max(0, current - 1);
await this.state.storage.put('count', next);
return Response.json({ count: next });
}
case '/value': {
const count = (await this.state.storage.get<number>('count')) ?? 0;
return Response.json({ count });
}
case '/reset': {
await this.state.storage.put('count', 0);
return Response.json({ count: 0, reset: true });
}
default:
return new Response('Not Found', { status: 404 });
}
}
}
interface Env {
COUNTER: DurableObjectNamespace;
}
Aby użyć DO z punktu wejścia Worker, należy utworzyć jego instancję poprzez powiązanie przestrzeni nazw i identyfikator:
// src/worker.ts - entry point del Worker
export { CounterDO } from './counter-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Estrae l'ID dalla query string: /counter?id=room-1
const counterId = url.searchParams.get('id') ?? 'global';
// idFromName() crea un ID deterministico da una stringa
// Lo stesso nome produce sempre lo stesso ID (e la stessa istanza)
const id = env.COUNTER.idFromName(counterId);
// Ottiene lo stub per comunicare con l'istanza
const stub = env.COUNTER.get(id);
// Invia la richiesta al Durable Object
// La richiesta viene instradata al datacenter corretto automaticamente
return stub.fetch(request);
},
};
interface Env {
COUNTER: DurableObjectNamespace;
}
Konfiguracja wrangler.toml musi zadeklarować wiązanie:
# wrangler.toml
name = "counter-worker"
main = "src/worker.ts"
compatibility_date = "2024-09-23"
compatibility_flags = ["nodejs_compat"]
[[durable_objects.bindings]]
name = "COUNTER"
class_name = "CounterDO"
[[migrations]]
tag = "v1"
new_classes = ["CounterDO"]
WebSocket z trwałymi obiektami: pokój rozmów
Najpotężniejszym przypadkiem użycia obiektów trwałych jest zarządzanie sesjami Udostępniony stanowy protokół WebSocket. Każdy pokój rozmów jest oddzielną instancją DO, który przechowuje listę aktywnych połączeń i historię wiadomości.
Cloudflare obsługuje Interfejs API hibernacji protokołu WebSocket: nadchodzi DO „hibernowany”, gdy nie ma żadnych komunikatów do przetworzenia, co radykalnie zmniejsza koszty (płacisz tylko za czas przetwarzania, a nie za otwarte połączenia).
// src/chat-room-do.ts
// Durable Object per una stanza di chat con WebSocket Hibernation
export class ChatRoomDO implements DurableObject {
private state: DurableObjectState;
private env: Env;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
this.env = env;
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/ws') {
// Verifica che sia una richiesta di upgrade WebSocket
const upgradeHeader = request.headers.get('Upgrade');
if (!upgradeHeader || upgradeHeader !== 'websocket') {
return new Response('Expected WebSocket upgrade', { status: 426 });
}
// Crea la coppia WebSocket server/client
const { 0: clientWs, 1: serverWs } = new WebSocketPair();
// Accetta la connessione tramite la Hibernation API
// Il DO sara ibernato tra i messaggi (no costo di CPU idle)
this.state.acceptWebSocket(serverWs);
// Opzionale: associa metadata alla connessione
// Utile per identificare l'utente nei messaggi successivi
const userId = url.searchParams.get('userId') ?? `anon-${Date.now()}`;
serverWs.serializeAttachment({ userId });
// Invia la storia recente al nuovo utente
const history = (await this.state.storage.get<Message[]>('history')) ?? [];
if (history.length > 0) {
serverWs.send(JSON.stringify({ type: 'history', messages: history }));
}
return new Response(null, {
status: 101,
webSocket: clientWs,
});
}
if (url.pathname === '/messages' && request.method === 'GET') {
const history = (await this.state.storage.get<Message[]>('history')) ?? [];
return Response.json({ messages: history });
}
return new Response('Not Found', { status: 404 });
}
// Chiamato dalla Hibernation API quando arriva un messaggio WebSocket
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
const { userId } = ws.deserializeAttachment() as { userId: string };
let parsed: ClientMessage;
try {
parsed = JSON.parse(message as string);
} catch {
ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' }));
return;
}
if (parsed.type === 'chat') {
const msg: Message = {
id: crypto.randomUUID(),
userId,
text: parsed.text,
timestamp: Date.now(),
};
// Salva nella history (mantieni solo gli ultimi 100 messaggi)
const history = (await this.state.storage.get<Message[]>('history')) ?? [];
const newHistory = [...history, msg].slice(-100);
await this.state.storage.put('history', newHistory);
// Broadcast a tutte le connessioni WebSocket attive nel DO
const allWebSockets = this.state.getWebSockets();
const payload = JSON.stringify({ type: 'message', message: msg });
for (const socket of allWebSockets) {
try {
socket.send(payload);
} catch {
// Connessione chiusa, ignorala
}
}
}
}
// Chiamato quando una connessione WebSocket viene chiusa
async webSocketClose(ws: WebSocket, code: number, reason: string): Promise<void> {
const { userId } = ws.deserializeAttachment() as { userId: string };
ws.close(code, reason);
// Notifica gli altri utenti dell'uscita
const notification = JSON.stringify({
type: 'user_left',
userId,
timestamp: Date.now(),
});
for (const socket of this.state.getWebSockets()) {
try {
socket.send(notification);
} catch { /* ignore */ }
}
}
// Chiamato in caso di errore sulla connessione WebSocket
async webSocketError(ws: WebSocket, error: unknown): Promise<void> {
console.error('WebSocket error:', error);
ws.close(1011, 'Internal error');
}
}
interface Message {
id: string;
userId: string;
text: string;
timestamp: number;
}
interface ClientMessage {
type: 'chat' | 'ping';
text?: string;
}
interface Env {
CHAT_ROOM: DurableObjectNamespace;
}
Punkt wejścia pracownika kieruje żądania do odpowiedniego pokoju na podstawie ścieżki:
// src/worker.ts
export { ChatRoomDO } from './chat-room-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// /room/:roomId/ws -> WebSocket per la stanza
// /room/:roomId/messages -> GET cronologia
const match = url.pathname.match(/^\/room\/([^/]+)(\/.*)?$/);
if (!match) {
return new Response('Not Found', { status: 404 });
}
const roomId = match[1];
const subpath = match[2] ?? '/ws';
// Ogni stanza e una istanza distinta del DO
const id = env.CHAT_ROOM.idFromName(roomId);
const stub = env.CHAT_ROOM.get(id);
// Rewrite del path per il DO
const doUrl = new URL(request.url);
doUrl.pathname = subpath;
return stub.fetch(new Request(doUrl.toString(), request));
},
};
interface Env {
CHAT_ROOM: DurableObjectNamespace;
}
Przechowywanie transakcyjne: operacje atomowe
Pamięć Durable Objects obsługuje transakcje atomowe za pośrednictwem
state.storage.transaction(). Ma to kluczowe znaczenie, kiedy
operacja musi konsekwentnie czytać i zapisywać wiele kluczy:
// Esempio: trasferimento di crediti tra utenti (atomico)
export class AccountDO implements DurableObject {
state: DurableObjectState;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
if (request.method !== 'POST') {
return new Response('Method Not Allowed', { status: 405 });
}
const { from, to, amount } = await request.json<TransferRequest>();
try {
// La transazione e atomica: o tutto va a buon fine, o niente
await this.state.storage.transaction(async (txn) => {
const fromBalance = (await txn.get<number>(`balance:${from}`)) ?? 0;
const toBalance = (await txn.get<number>(`balance:${to}`)) ?? 0;
if (fromBalance < amount) {
// Il throw annulla la transazione
throw new Error(`Insufficient balance: ${fromBalance} < ${amount}`);
}
await txn.put(`balance:${from}`, fromBalance - amount);
await txn.put(`balance:${to}`, toBalance + amount);
// Log dell'operazione
const txLog = (await txn.get<TxRecord[]>('tx_log')) ?? [];
txLog.push({ from, to, amount, timestamp: Date.now() });
await txn.put('tx_log', txLog.slice(-1000));
});
return Response.json({ success: true, from, to, amount });
} catch (err) {
return Response.json(
{ success: false, error: (err as Error).message },
{ status: 400 }
);
}
}
}
interface TransferRequest {
from: string;
to: string;
amount: number;
}
interface TxRecord {
from: string;
to: string;
amount: number;
timestamp: number;
}
interface Env {
ACCOUNT: DurableObjectNamespace;
}
Alarmy: Zaplanowane operacje w obiekcie trwałym
Wsparcie dla trwałych obiektów Alarmy: oddzwonienie
alarm() który jest wywoływany w zaplanowanym czasie, nawet jeśli mnie tam nie ma
aktywne żądania do DO. Jest to przydatne w przypadku TTL, terminów i zadań okresowych
powiązane ze statusem DO:
// src/session-do.ts
// DO con alarm per scadenza automatica della sessione
export class SessionDO implements DurableObject {
state: DurableObjectState;
static SESSION_TTL_MS = 30 * 60 * 1000; // 30 minuti
constructor(state: DurableObjectState, env: Env) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/create' && request.method === 'POST') {
const data = await request.json<SessionData>();
// Salva i dati della sessione
await this.state.storage.put('session', {
...data,
createdAt: Date.now(),
});
// Schedula l'alarm per la scadenza della sessione
// Se l'alarm e gia schedulato, viene sostituito
await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);
return Response.json({ ok: true, expiresIn: SessionDO.SESSION_TTL_MS });
}
if (url.pathname === '/get') {
const session = await this.state.storage.get<SessionData>('session');
if (!session) {
return Response.json({ error: 'Session not found' }, { status: 404 });
}
// Refresh del TTL ad ogni accesso (sliding expiry)
await this.state.storage.setAlarm(Date.now() + SessionDO.SESSION_TTL_MS);
return Response.json({ session });
}
if (url.pathname === '/invalidate' && request.method === 'DELETE') {
await this.state.storage.deleteAll();
await this.state.storage.deleteAlarm();
return Response.json({ ok: true });
}
return new Response('Not Found', { status: 404 });
}
// Chiamato automaticamente quando scatta l'alarm
async alarm(): Promise<void> {
// Pulisce i dati della sessione scaduta
const session = await this.state.storage.get<SessionData>('session');
if (session) {
console.log(`Session expired for user: ${session.userId}`);
await this.state.storage.deleteAll();
}
}
}
interface SessionData {
userId: string;
role: string;
metadata?: Record<string, unknown>;
}
interface Env {
SESSION: DurableObjectNamespace;
}
Globalny ogranicznik szybkości z trwałymi obiektami
Rozproszony ogranicznik szybkości jest jednym z najbardziej pożądanych wzorców publicznych interfejsów API. W przypadku Workers KV wdrożenie byłoby uzależnione od warunków wyścigowych. Z literą C, każdy „wiadro” ograniczające szybkość jest instancją o silnej spójności:
// src/rate-limiter-do.ts
// Token bucket rate limiter con Durable Objects
export class RateLimiterDO implements DurableObject {
state: DurableObjectState;
// Configurazione: 100 req/minuto per IP
static MAX_TOKENS = 100;
static REFILL_RATE_MS = 60_000; // 1 minuto per refill completo
constructor(state: DurableObjectState, env: Env) {
this.state = state;
}
async fetch(request: Request): Promise<Response> {
const now = Date.now();
// Legge lo stato attuale del bucket
const bucket = (await this.state.storage.get<TokenBucket>('bucket')) ?? {
tokens: RateLimiterDO.MAX_TOKENS,
lastRefill: now,
};
// Calcola quanti token sono stati aggiunti dall'ultimo accesso
const elapsed = now - bucket.lastRefill;
const tokensToAdd = (elapsed / RateLimiterDO.REFILL_RATE_MS) * RateLimiterDO.MAX_TOKENS;
const currentTokens = Math.min(
RateLimiterDO.MAX_TOKENS,
bucket.tokens + tokensToAdd
);
if (currentTokens < 1) {
// Rate limit superato
const retryAfterMs = Math.ceil(
(1 - currentTokens) / (RateLimiterDO.MAX_TOKENS / RateLimiterDO.REFILL_RATE_MS)
);
await this.state.storage.put('bucket', {
tokens: currentTokens,
lastRefill: now,
});
return Response.json(
{
allowed: false,
retryAfter: Math.ceil(retryAfterMs / 1000),
remaining: 0,
},
{
status: 429,
headers: {
'Retry-After': String(Math.ceil(retryAfterMs / 1000)),
'X-RateLimit-Limit': String(RateLimiterDO.MAX_TOKENS),
'X-RateLimit-Remaining': '0',
},
}
);
}
// Consuma un token e aggiorna il bucket
await this.state.storage.put('bucket', {
tokens: currentTokens - 1,
lastRefill: now,
});
return Response.json({
allowed: true,
remaining: Math.floor(currentTokens - 1),
});
}
}
interface TokenBucket {
tokens: number;
lastRefill: number;
}
interface Env {
RATE_LIMITER: DurableObjectNamespace;
}
Worker integruje ogranicznik szybkości z przepływem każdego żądania:
// src/worker.ts con rate limiting
export { RateLimiterDO } from './rate-limiter-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Identifica il client (IP o API key)
const clientIp = request.headers.get('CF-Connecting-IP') ?? 'unknown';
const apiKey = request.headers.get('X-API-Key');
const bucketId = apiKey ?? `ip:${clientIp}`;
// Controlla il rate limit per questo client
const rateLimiterId = env.RATE_LIMITER.idFromName(bucketId);
const rateLimiter = env.RATE_LIMITER.get(rateLimiterId);
const limitCheck = await rateLimiter.fetch(new Request('https://dummy/check'));
if (limitCheck.status === 429) {
return limitCheck; // Propaga la risposta 429 con gli headers
}
// Prosegue con la logica dell'API
return handleApiRequest(request, env);
},
};
async function handleApiRequest(request: Request, env: Env): Promise<Response> {
return Response.json({ data: 'your api response here' });
}
interface Env {
RATE_LIMITER: DurableObjectNamespace;
}
Względy wydajności i kosztów
Obiekty trwałe mają bardzo różny profil kosztów i wydajności do prostych Robotników. Kilka kluczowych punktów, o których warto pamiętać:
Opóźnienie: nie zawsze lokalne
Każda instancja DO znajduje się w jednym centrum danych. Jeśli użytkownik w Europie
uzyskuje dostęp do instancji DO utworzonej w Ameryce Północnej, opóźnienie każdej operacji
obejmuje podróż transatlantycką w obie strony (~100-150 ms). Zaprojektuj identyfikatory DO
Aby ograniczyć udostępnianie geograficzne: Jeśli to możliwe, używaj identyfikatorów opartych na regionie
(idFromName(`${region}:${resourceId}`)) lub zaakceptuj opóźnienie
wysoki jedynie w przypadku operacji wymagających silnej spójności międzyregionalnej.
| Głos | Darmowe poziomy | Płatni (płatni pracownicy) |
|---|---|---|
| Prośby do DO | 1M/miesiąc wliczony w cenę | 0,15 dolara za milion powyżej |
| DO Żywotność (procesor) | 400 000 GB-s/miesiąc | 12,50 dolarów za milion GB-s |
| Składowanie | 1 GB w zestawie | Powyżej 0,20 USD/GB miesięcznie |
| Hibernacja protokołu WebSocket | Dostępny | Dostępne (bez kosztów bezczynności) |
| Alarmy | Dostępny | Dostępny |
Najlepsze praktyki
-
Szczegółowość identyfikatora: użyj określonych identyfikatorów (np.
chat:room-42) zamiast identyfikatorów globalnych, aby uniknąć hotspotów w poszczególnych instancjach. -
Hibernacja protokołu WebSocket zawsze: USA
state.acceptWebSocket()zamiast ręcznych detektorów zdarzeń, aby zmniejszyć koszty bezczynności połączeń. -
Przechowywanie partii: USA
storage.put(map)pisać wiele kluczy w jednej operacji zamiast wieluput()syngiel. - Ogranicz rozmiar pamięci: każda instancja ma limit 128 MB przechowywania. W przypadku dużych danych użyj R2 i zapisuj tylko odniesienia w DO.
- Budżet serializacji: żądania są umieszczane w kolejce i przetwarzane sekwencyjnie; powolne operacje blokują kolejne. Zachowaj opiekunów szybko (idealnie <1 s).
Wnioski i dalsze kroki
Trwałe Obiekty wypełniają lukę pomiędzy bezpaństwowym modelem Robotników i nowoczesne aplikacje wymagające koordynacji i współdzielenia stanu. Ich w połączeniu z interfejsem API hibernacji WebSocket i alarmami czyni je prymitywnymi kompletny do czatów, gier, współpracy nad dokumentami i globalnych systemów koordynacji.
Kompromisem jest opóźnienie: DO fizycznie znajduje się tylko w jednym centrum danych, dlatego operacje zapisu między regionami zwiększają opóźnienie. Do odczytów skalowalne rozważ połączenie DO (dla zapisu) z KV (dla odczytów z pamięci podręcznej).
Następne artykuły z serii
- Artykuł 5: Workers AI — wnioskowanie z modeli LLM i wizji on the Edge: Jak uruchomić modele Llama, Whisper i Vision bezpośrednio w Workersach bez dedykowanego procesora graficznego, a sztuczna inteligencja Workers rośnie o 4000% rok do roku.
- Artykuł 6: Vercel Edge Runtime — zaawansowane oprogramowanie pośredniczące, Geolokalizacja i testy A/B: podejście Vercel do krawędzi z Next.js.
- Artykuł 7: Trasowanie geograficzne na krawędzi — personalizacja Treść i zgodność z RODO: geo-ogrodzenie, zlokalizowane ceny i RODO.







