Event Sourcing: Staat als een onveranderlijke opeenvolging van gebeurtenissen
Kijk eens hoe een bank werkt: uw rekeningsaldo is geen getal in een database bijgewerkt bij elke transactie". En het resultaat van alle transacties die ooit hebben plaatsgevonden: stortingen, opnames, overdrachten, rente. Als je het saldo van gisteren om 14.00 uur wilt weten, je kunt het reconstrueren door alle transacties tot dat moment opnieuw af te spelen. Dit is precies het principe vanInkoop van evenementen.
In plaats van de huidige status van een entiteit in de database op te slaan en deze elke keer te overschrijven bewerken, Inkoop van evenementen bewaar de reeks gebeurtenissen die eraan voorafgingen naar die staat. De huidige status wordt bereikt door gebeurtenissen vanaf het begin af te spelen. Het resultaat is een systeem met een compleet audittraject, mogelijkheid tot tijdreisquery's, en een natuurlijke scheiding tussen schrijven (gebeurtenissen toevoegen) en lezen (projecties op gebeurtenissen).
Wat je gaat leren
- Architectuur voor evenementensourcing: evenementenwinkel, aggregatie, evenementenstroom
- Implementeer een aggregatie met gebeurtenissen in TypeScript
- Event Store: hoe de evenementendatabase te structureren
- Event Replay: Herbouw de staat helemaal opnieuw op basis van gebeurtenissen
- Tijdreisvragen: status van het aggregaat op elk historisch moment
- EventStoreDB: de database ontworpen voor Event Sourcing
- Evenement Sourcing-afweging: wanneer moet je het adopteren en wanneer moet je het vermijden?
Het klassieke model versus evenementensourcing
Laten we de twee benaderingen voor een orderbeheersysteem vergelijken:
| Ik wacht | CRUD Klassieker | Inkoop van evenementen |
|---|---|---|
| Wat wordt er opgeslagen | Huidige status (UPDATE-record) | Volgorde van gebeurtenissen (alleen INSERT) |
| Audittrails | Nee (of met aparte tabellen) | Ja, native en compleet |
| Verleden staat | Nee (je ziet alleen de huidige status) | Ja, herhaling van gebeurtenissen tot en met T |
| Complexiteit | Laag | Middelhoog |
| Prestaties gelezen | Hoog (directe zoekopdracht) | Vereist screenings (zie CQRS) |
| Prestaties schrijven | Hoog | Hoog (alleen toevoegen) |
| Foutopsporing | Moeilijk (eindtoestand zonder verhaal) | Gemakkelijk (herhaling van gebeurtenis) |
Fundamentele concepten
Aggregaat: de eenheid van consistentie
Un Totaal en de consistentiegrens in Event Sourcing. Alle veranderingen in het aggregaat vinden plaats via methoden die gebeurtenissen genereren. De staat is altijd gereconstrueerd door de gebeurtenissen in volgorde toe te passen:
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
Gebeurtenisopslag: de gebeurtenissendatabase
Un Evenementenwinkel en een alleen-toevoegen-database die is geoptimaliseerd voor schrijven en lees reeksen gebeurtenissen. De fundamentele structuur en deGebeurtenisstroom: een geordende reeks gebeurtenissen voor een enkele aggregaat (geïdentificeerd door zijn ID).
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
Opslagplaats voor gebeurtenisbronnen
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
Vragen over tijdreizen
Een van de krachtigste voordelen van Event Sourcing is het vermogen om de staat te reconstrueren van het aggregaat op elk historisch moment:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: de native database voor gebeurtenissourcing
EventStoreDB en een database die speciaal is ontworpen voor Event Sourcing. Biedt gebeurtenisstreams als native primitieven, abonnementen voor realtime meldingen, en projecties aan de serverzijde:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
Momentopname: het optimaliseren van de herhaling van grote aggregaten
Als een aggregaat duizenden gebeurtenissen bevat, wordt het opnieuw afspelen traag. De Momentopnamen en een controlepunt: in plaats van vanaf het begin te beginnen, wordt het opnieuw opgebouwd vanaf de laatste momentopname:
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
Evenementsourcing-afweging
Wanneer evenementensourcing zinvol is
- Verplichte audittrail: Financiële systemen, gezondheidszorgsystemen, rechtssystemen waarbij het complete verhaal een vereiste is
- Complexe foutopsporing: Als je "de band wilt terugspoelen" en wilt begrijpen hoe je in een problematische toestand terecht bent gekomen
- Tijdelijke bedrijfsinformatie: “Hoeveel bestellingen hadden gisteren om 16.00 uur de status Concept?”
- Gebeurtenisgestuurde integratie: Als u al over een EDA-architectuur beschikt, integreert Event Sourcing op natuurlijke wijze
Wanneer het sourcen van evenementen geen zin heeft
- Simpele CRUD: Een master record management systeem zonder auditvereisten heeft geen baat bij ES
- Complexe zoekopdrachten: ES optimaliseert schrijfbewerkingen; lezingen vereisen CQRS met projectie (extra complexiteit)
- Team zonder ES-ervaring: De leercurve is aanzienlijk; slecht beheerde complexiteit kan opwegen tegen de voordelen
- Stijf schema: Als uw gebeurtenisschema regelmatig verandert, wordt versiebeheer complex
Conclusies en volgende stappen
Event Sourcing transformeert persistentie van een statusupdate naar een gebeurtenistoevoeging onveranderlijk. Het resultaat is een systeem met native audit trail, tijdreisquery's en een natuurlijke scheiding tussen schrijven en lezen. Prijs en complexiteit: de herhaling van gebeurtenissen, het beheren van schemaversies en het vereisen van CQRS voor query's complex.
Het volgende artikel – CQRS – gaat precies in op deze uitdaging: hoe je een laag opbouwt geoptimaliseerd leesmodel (leesmodel) dat via projecties synchroniseert met de gebeurtenisopslag, waardoor snelle zoekopdrachten mogelijk zijn zonder de evenementenstroom aan te raken.
Aankomende artikelen in de Event-Driven Architecture-serie
Gerelateerde serie
- EDA-grondbeginselen — de context waarin Event Sourcing wordt gebruikt
- Apache Kafka — Alternatieve gedistribueerde gebeurtenisopslag voor streams met een hoog volume







