Sourcing událostí: Stav jako neměnná sekvence událostí
Zvažte, jak funguje banka: Zůstatek na vašem účtu není „číslo v databázi aktualizováno s každou transakcí“. A výsledek všech transakcí, které kdy proběhly: vklady, výběry, převody, úroky. Pokud chcete znát zůstatek včera ve 14 hodin, můžete jej rekonstruovat přehráním všech transakcí až do tohoto bodu. Tohle je přesně ten principSourcing událostí.
Namísto uložení aktuálního stavu entity do databáze a jejího pokaždé přepsání upravit, Sourcing událostí uložit sled událostí, které k tomu vedly do toho stavu. Současného stavu je dosaženo „přehráváním“ událostí od začátku. Výsledkem je systém s kompletním audit trailem, možností časových dotazů, a přirozené oddělení mezi psaním (připojování událostí) a čtením (projekce na události).
Co se naučíte
- Architektura zdroje událostí: Event Store, Aggregate, Event Stream
- Implementujte agregát s událostmi v TypeScriptu
- Event Store: jak strukturovat databázi událostí
- Přehrání události: Obnovte stav od začátku z událostí
- Dotazy na cestování v čase: stav agregátu v jakémkoli historickém okamžiku
- EventStoreDB: databáze určená pro Event Sourcing
- Kompromis se získáváním událostí: kdy jej přijmout a kdy se mu vyhnout
Klasický model vs. Event Sourcing
Porovnejme dva přístupy k systému řízení objednávek:
| čekám | CRUD Classic | Sourcing událostí |
|---|---|---|
| Co je zachráněno | Aktuální stav (UPDATE záznam) | Posloupnost událostí (pouze INSERT) |
| Auditní stopy | Ne (nebo se samostatnými tabulkami) | Ano, nativní a kompletní |
| Minulý stav | Ne (vidíte pouze aktuální stav) | Ano, opakování událostí až do T |
| Složitost | Nízký | Středně vysoká |
| Čtení výkonu | Vysoká (přímý dotaz) | Vyžaduje screening (viz CQRS) |
| Zápis o výkonu | Vysoký | Vysoká (pouze připojení) |
| Ladění | Těžký (koncový stav bez příběhu) | Snadné (přehrání události) |
Základní pojmy
Agregát: Jednotka konzistence
Un Agregát a hranice konzistence v Event Sourcingu. Všechny ke změnám v agregátu dochází prostřednictvím metod, které generují události. Stát je vždy rekonstruován aplikací událostí v pořadí:
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
Event Store: Databáze událostí
Un Prodejna akcí a databázi pouze pro připojení optimalizovanou pro zápis a číst sledy událostí. Základní struktura aStream události: uspořádaná sekvence událostí pro jeden agregát (identifikovaný jeho ID).
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
Úložiště zdrojů událostí
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
Dotazy na cestování v čase
Jednou z nejsilnějších výhod Event Sourcingu je schopnost rekonstruovat stav souhrnu v jakémkoli historickém okamžiku:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: Nativní databáze pro zajišťování zdrojů událostí
EventStoreDB a databáze navržená speciálně pro Event Sourcing. Poskytuje streamy událostí jako nativní primitiva, předplatné pro oznámení v reálném čase, a projekce na straně serveru:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
Snímek: Optimalizace přehrávání velkých agregátů
Pokud má agregát tisíce událostí, přehrávání se zpomalí. The Snímky a kontrolní bod: místo toho, aby začal od začátku, obnoví se od posledního snímku:
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
Kompromis se sourcingem událostí
Když má event sourcing smysl
- Povinná auditní stopa: Finanční systémy, zdravotnické systémy, právní systémy, kde je požadavkem úplný příběh
- Komplexní ladění: Když chcete umět "přetočit pásku" a pochopit, jak jste se dostali do problematického stavu
- Temporal business intelligence: "Kolik objednávek bylo včera v 16:00 ve stavu Koncept?"
- Integrace řízená událostmi: Pokud již máte architekturu EDA, Event Sourcing se přirozeně integruje
Když Event Sourcing NEMÁ SMYSL
- Jednoduché CRUD: Systém správy kmenových záznamů bez požadavků na audit nevyužívá ES
- Složité dotazy: ES optimalizuje zápisy; čtení vyžaduje CQRS s projekcí (další složitost)
- Tým bez zkušeností s ES: Křivka učení je významná; špatně spravovaná složitost může převážit výhody
- Pevné schéma: Pokud se vaše schéma událostí často mění, správa verzí se stává složitou
Závěry a další kroky
Event Sourcing transformuje persistenci z aktualizace stavu na připojení události neměnný. Výsledkem je systém s nativním audit trailem, dotazy na cestování v čase a přirozené oddělení psaní a čtení. Cena a složitost: opakování události, správa verzí schémat a vyžadování CQRS pro dotazy komplexní.
Další článek — CQRS — se zabývá přesně touto výzvou: jak vytvořit vrstvu optimalizovaný model čtení (model čtení), který se synchronizuje s úložištěm událostí prostřednictvím projekcí, umožňující rychlé dotazy, aniž byste se dotkli streamu událostí.
Připravované články ze série Event-Driven Architecture Series
Související série
- Základy EDA — kontext, ve kterém se používá Event Sourcing
- Apache Kafka — Alternativní distribuované úložiště událostí pro vysokoobjemové streamy







