Źródło zdarzeń: stan jako niezmienna sekwencja zdarzeń
Zastanów się, jak działa bank: saldo Twojego konta nie jest „liczbą w bazie danych”. aktualizowana przy każdej transakcji”. I wynik wszystkich transakcji, które kiedykolwiek miały miejsce: wpłaty, wypłaty, przelewy, odsetki. Jeśli chcesz poznać saldo wczoraj o 14:00, możesz go zrekonstruować, odtwarzając wszystkie transakcje do tego momentu. To jest dokładnie zasadaPozyskiwanie zdarzeń.
Zamiast zapisywać aktualny stan jednostki w bazie danych i każdorazowo go nadpisywać edytować, Pozyskiwanie zdarzeń zapisać sekwencję wydarzeń, która do tego doprowadziła do tego stanu. Stan obecny osiąga się poprzez „odgrywanie” zdarzeń od początku. Rezultatem jest system z pełną ścieżką audytu, możliwością zapytań o podróże w czasie, oraz naturalne oddzielenie pisania (dołączanie wydarzeń) od czytania (projekcje na zdarzenia).
Czego się nauczysz
- Architektura pozyskiwania zdarzeń: magazyn zdarzeń, agregat, strumień zdarzeń
- Zaimplementuj agregat ze zdarzeniami w TypeScript
- Event Store: jak ustrukturyzować bazę danych zdarzeń
- Powtórka wydarzenia: Odbuduj stan od podstaw na podstawie wydarzeń
- Zapytania dotyczące podróży w czasie: stan agregatu w dowolnym momencie historycznym
- EventStoreDB: baza danych przeznaczona do pozyskiwania zdarzeń
- Kompromis w zakresie pozyskiwania zdarzeń: kiedy go zastosować, a kiedy go unikać
Model klasyczny a pozyskiwanie zdarzeń
Porównajmy dwa podejścia do systemu zarządzania zamówieniami:
| Czekam | CRUD Klasyczny | Pozyskiwanie zdarzeń |
|---|---|---|
| Co jest zapisane | Aktualny stan (rekord AKTUALIZUJ) | Sekwencja zdarzeń (tylko WSTAW) |
| Ścieżki audytu | Nie (lub z oddzielnymi tabelami) | Tak, natywnie i kompletnie |
| Stan przeszły | Nie (widzisz tylko bieżący status) | Tak, powtórka wydarzeń aż do T |
| Złożoność | Niski | Średnio-wysoki |
| Odczyt wydajności | Wysoka (zapytanie bezpośrednie) | Wymaga badań przesiewowych (patrz CQRS) |
| Zapis wydajności | Wysoki | Wysoki (tylko do dołączenia) |
| Debugowanie | Trudny (stan końcowy bez historii) | Łatwy (powtórka wydarzenia) |
Podstawowe pojęcia
Agregat: jednostka spójności
Un Agregat oraz granica spójności w Event Sourcing. Wszystkie zmiany w agregacie zachodzą za pomocą metod generujących zdarzenia. Państwo jest zawsze zrekonstruowany poprzez zastosowanie sekwencji zdarzeń:
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
Sklep wydarzeń: baza danych wydarzeń
Un Sklep z wydarzeniami oraz baza danych przeznaczona wyłącznie do dołączania, zoptymalizowana pod kątem zapisu i czytaj ciągi zdarzeń. Podstawowa struktura iStrumień wydarzenia: uporządkowana sekwencja zdarzeń dla pojedynczego agregatu (identyfikowana poprzez jego identyfikator).
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
Repozytorium pozyskiwania zdarzeń
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
Zapytania dotyczące podróży w czasie
Jedną z najpotężniejszych zalet Event Sourcingu jest możliwość rekonstrukcji stanu agregatu w dowolnym momencie historycznym:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: Natywna baza danych do pozyskiwania zdarzeń
EventStoreDB oraz bazę danych zaprojektowaną specjalnie dla Event Sourcing. Zapewnia strumienie zdarzeń jako natywne prymitywy, subskrypcje powiadomień w czasie rzeczywistym, i projekcje po stronie serwera:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
Migawka: Optymalizacja odtwarzania dużych agregatów
Jeśli agregat zawiera tysiące zdarzeń, odtwarzanie staje się powolne. The Migawki i punkt kontrolny: zamiast zaczynać od początku, rekonstruuje od ostatniej migawki:
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
Kompromis w zakresie pozyskiwania zdarzeń
Kiedy pozyskiwanie zdarzeń ma sens
- Obowiązkowa ścieżka audytu: Systemy finansowe, systemy opieki zdrowotnej, systemy prawne, w których wymagana jest pełna historia
- Złożone debugowanie: Kiedy chcesz móc „przewinąć taśmę” i zrozumieć, jak doszedłeś do problematycznego stanu
- Tymczasowa analiza biznesowa: „Ile zamówień miało wczoraj status wersji roboczej o 16:00?”
- Integracja sterowana zdarzeniami: Jeśli masz już architekturę EDA, Event Sourcing integruje się w sposób naturalny
Kiedy pozyskiwanie zdarzeń nie ma sensu
- Prosty CRUD: Główny system zarządzania dokumentacją bez wymagań audytowych nie korzysta z ES
- Złożone zapytania: ES optymalizuje zapisy; odczyty wymagają CQRS z projekcją (dodatkowa złożoność)
- Zespół bez doświadczenia w ES: Krzywa uczenia się jest znacząca; źle zarządzana złożoność może przeważyć nad korzyściami
- Sztywny schemat: Jeśli schemat zdarzeń często się zmienia, zarządzanie wersjami staje się skomplikowane
Wnioski i dalsze kroki
Event Sourcing przekształca trwałość z aktualizacji stanu w dołączanie zdarzenia niezmienne. Rezultatem jest system z natywną ścieżką audytu, zapytaniami dotyczącymi podróży w czasie i a naturalna separacja pomiędzy pisaniem i czytaniem. Cena i złożoność: powtórka zdarzenia, zarządzanie wersjami schematu i wymaganie CQRS dla zapytań złożone.
Następny artykuł – CQRS – dotyczy dokładnie tego wyzwania: jak zbudować warstwę zoptymalizowany model odczytu (model odczytu), który synchronizuje się z magazynem zdarzeń poprzez projekcje, umożliwiając szybkie zapytania bez dotykania strumienia zdarzeń.
Nadchodzące artykuły z serii Architektura sterowana zdarzeniami
Powiązane serie
- Podstawy EDA — kontekst, w którym wykorzystywane jest Event Sourcing
- Apacz Kafka — Alternatywny rozproszony magazyn wydarzeń dla strumieni o dużej objętości







