Aprovizionarea evenimentelor: starea ca o secvență imuabilă de evenimente
Luați în considerare cum funcționează o bancă: soldul contului dvs. nu este „un număr dintr-o bază de date actualizat cu fiecare tranzacție”. Și rezultatul tuturor tranzacțiilor care au avut loc vreodată: depuneri, retrageri, transferuri, dobânzi. Dacă vrei să afli soldul ieri la 14:00, îl puteți reconstrui reluând toate tranzacțiile până la acel moment. Aceasta este exact principiul deAprovizionare pentru evenimente.
În loc să salvați starea curentă a unei entități în baza de date și să o suprascrieți de fiecare dată edita, Aprovizionare pentru evenimente salvați succesiunea evenimentelor care au condus la aceasta la acea stare. Starea actuală este atinsă „jucând” evenimente de la început. Rezultatul este un sistem cu pistă de audit completă, posibilitate de interogări de călătorie în timp, și o separare naturală între scriere (adăugare evenimente) și lectură (proiecții asupra evenimentelor).
Ce vei învăța
- Arhitectura de aprovizionare cu evenimente: Magazin de evenimente, Agregat, Flux de evenimente
- Implementați un agregat cu evenimente în TypeScript
- Magazin de evenimente: cum să structurați baza de date de evenimente
- Reluare eveniment: reconstruiți starea de la zero din evenimente
- Interogări de călătorie în timp: starea agregatului în orice moment istoric
- EventStoreDB: baza de date concepută pentru Event Sourcing
- Compartimentul de aprovizionare cu evenimente: când să îl adoptăm și când să îl evitați
Modelul clasic vs aprovizionarea cu evenimente
Să comparăm cele două abordări pentru un sistem de management al comenzilor:
| astept | CRUD Clasic | Aprovizionare pentru evenimente |
|---|---|---|
| Ce este salvat | Starea curentă (înregistrare UPDATE) | Secvență de evenimente (numai INSERT) |
| Piste de audit | Nu (sau cu mese separate) | Da, nativ și complet |
| Stare trecută | Nu (vezi doar starea curentă) | Da, reluarea evenimentelor până la T |
| Complexitate | Scăzut | Mediu-Ridicat |
| Performanță citită | Ridicat (interogare directă) | Necesită examinări (vezi CQRS) |
| Scriere de performanță | Ridicat | Ridicat (numai pentru adăugare) |
| Depanare | Greu (stare finală fără poveste) | Ușor (reluare eveniment) |
Concepte fundamentale
Agregat: Unitatea de consistență
Un Agregat și limita de consistență în Event Sourcing. Toate modificările asupra agregatului apar prin metode care generează evenimente. Statul este mereu reconstruit prin aplicarea evenimentelor în succesiune:
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
Magazin de evenimente: Baza de date de evenimente
Un Magazin de evenimente și o bază de date numai pentru adăugare optimizată pentru scriere și citește secvențe de evenimente. Structura fundamentală șiFlux de evenimente: o secvență ordonată de evenimente pentru un singur agregat (identificat prin ID-ul său).
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
Depozitul de aprovizionare cu evenimente
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
Interogări de călătorie în timp
Unul dintre cele mai puternice avantaje ale Event Sourcing este capacitatea de a reconstrui starea a agregatului în orice moment istoric:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: baza de date nativă pentru aprovizionarea cu evenimente
EventStoreDB și o bază de date concepută special pentru Event Sourcing. Oferă fluxuri de evenimente ca primitive native, abonamente pentru notificări în timp real, și proiecții pe partea serverului:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
Instantaneu: optimizarea redării agregatelor mari
Dacă un agregat are mii de evenimente, reluarea devine lentă. The Instantanee și un punct de control: în loc să înceapă de la început, se reconstruiește din ultimul instantaneu:
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
Schimb de aprovizionare cu evenimente
Când aprovizionarea cu evenimente are sens
- Pista de audit obligatorie: Sisteme financiare, sisteme de sănătate, sisteme juridice în care povestea completă este o cerință
- Depanare complexă: Când doriți să puteți „bobina banda” și să înțelegeți cum ați ajuns într-o stare problematică
- Informații de afaceri temporale: „Câte comenzi erau în starea Draft ieri la 16:00?”
- Integrare bazată pe evenimente: Dacă aveți deja o arhitectură EDA, Event Sourcing se integrează în mod natural
Când aprovizionarea cu evenimente NU ARE SENS
- CRUD simplu: Un sistem master de management al înregistrărilor fără cerințe de audit nu beneficiază de ES
- Interogări complexe: ES optimizează scrierile; citirile necesită CQRS cu proiecție (complexitate suplimentară)
- Echipa fără experiență ES: Curba de învățare este semnificativă; complexitatea prost gestionată poate depăși beneficiile
- Schema rigida: Dacă schema dvs. de evenimente se modifică frecvent, gestionarea versiunilor devine complexă
Concluzii și pașii următori
Event Sourcing transformă persistența dintr-o actualizare de stare într-o adăugare a evenimentului imuabil. Rezultatul este un sistem cu pistă de audit nativă, interogări de călătorie în timp și a separarea firească între scris și citit. Preț și complexitate: reluarea evenimente, gestionarea versiunilor de schemă și solicitarea CQRS pentru interogări complex.
Următorul articol — CQRS — abordează exact această provocare: cum să construiți un strat model de citire optimizat (model de citire) care se sincronizează cu magazinul de evenimente prin proiecții, permițând interogări rapide fără a atinge fluxul de evenimente.
Articole viitoare din seria Event-Driven Architecture
Serii înrudite
- Fundamentele EDA — contextul în care este utilizată Event Sourcing
- Apache Kafka — Magazin de evenimente distribuit alternativ pentru fluxuri de mare volum







