Olay Kaynak Kullanımı: Değişmez Bir Olaylar Dizisi Olarak Durum
Bir bankanın nasıl çalıştığını düşünün: Hesap bakiyeniz "veritabanındaki bir sayı" değildir her işlemde güncellenir". Ve şimdiye kadar gerçekleşen tüm işlemlerin sonucu: para yatırma, çekme, transfer, faiz. Dün saat 14:00'teki bakiyeyi öğrenmek istiyorsanız, o ana kadarki tüm işlemleri tekrar oynatarak bunu yeniden oluşturabilirsiniz. bu tam olarak prensibiEtkinlik Kaynak Kullanımı.
Bir varlığın mevcut durumunu veritabanına kaydetmek ve her seferinde üzerine yazmak yerine düzenle, Etkinlik Kaynak Kullanımı ona giden olayların sırasını kaydedin o duruma. Mevcut duruma, olayların baştan itibaren "oynatılması" ile ulaşılır. Sonuç, eksiksiz bir denetim takibine sahip, zaman yolculuğu sorgularının mümkün olduğu, ve yazma (olayların eklenmesi) ile okuma (olaylara ilişkin projeksiyonlar) arasında doğal bir ayrım.
Ne Öğreneceksiniz
- Olay Kaynak Kullanımı Mimarisi: Etkinlik Deposu, Toplama, Etkinlik Akışı
- TypeScript'te olaylarla bir Toplama uygulama
- Etkinlik Deposu: etkinlik veritabanının nasıl yapılandırılacağı
- Olay Tekrarı: Olaylardan durumu sıfırdan yeniden oluşturun
- Zaman yolculuğu sorguları: herhangi bir tarihsel andaki toplamın durumu
- EventStoreDB: Event Sourcing için tasarlanmış veritabanı
- Event Sourcing ödünleşimi: ne zaman benimsenmeli ve ne zaman kaçınılmalı
Klasik Model ve Olay Kaynak Kullanımı
Bir sipariş yönetimi sistemi için iki yaklaşımı karşılaştıralım:
| bekliyorum | CRUD Klasik | Etkinlik Kaynak Kullanımı |
|---|---|---|
| Kaydedilenler | Mevcut durum (GÜNCELLEME kaydı) | Olayların sırası (yalnızca INSERT) |
| Denetim izleri | Hayır (veya ayrı tablolarla) | Evet, yerel ve eksiksiz |
| Geçmiş durum | Hayır (yalnızca mevcut durumu görürsünüz) | Evet, T'ye kadar olayların tekrarı |
| Karmaşıklık | Düşük | Orta-Yüksek |
| Performans okuması | Yüksek (doğrudan sorgu) | Tarama gerektirir (bkz. CQRS) |
| Performans yazma | Yüksek | Yüksek (yalnızca ekleme) |
| Hata ayıklama | Zor (hikayenin olmadığı son durum) | Kolay (olay tekrarı) |
Temel Kavramlar
Toplama: Tutarlılık Birimi
Un Agrega ve Event Sourcing'deki tutarlılık sınırı. hepsi Toplamda yapılan değişiklikler, olay üreten yöntemler aracılığıyla gerçekleşir. Devlet her zaman olaylar sırayla uygulanarak yeniden oluşturuldu:
// TypeScript: implementazione di un Order Aggregate con Event Sourcing
// 1. Definisci gli eventi dell'Aggregate
type OrderEvent =
| { type: 'OrderCreated'; orderId: string; customerId: string; createdAt: string }
| { type: 'ItemAdded'; productId: string; quantity: number; unitPrice: number }
| { type: 'ItemRemoved'; productId: string }
| { type: 'OrderConfirmed'; confirmedAt: string }
| { type: 'OrderCancelled'; reason: string; cancelledAt: string };
// 2. Lo stato dell'Aggregate
interface OrderState {
id: string;
customerId: string;
items: Map<string, { quantity: number; unitPrice: number }>;
status: 'Draft' | 'Confirmed' | 'Cancelled';
totalAmount: number;
createdAt: string;
}
// 3. Il reducer: applica un evento allo stato (pura funzione)
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderCreated':
return {
id: event.orderId,
customerId: event.customerId,
items: new Map(),
status: 'Draft',
totalAmount: 0,
createdAt: event.createdAt,
};
case 'ItemAdded': {
const newItems = new Map(state!.items);
newItems.set(event.productId, {
quantity: event.quantity,
unitPrice: event.unitPrice,
});
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'ItemRemoved': {
const newItems = new Map(state!.items);
newItems.delete(event.productId);
const total = Array.from(newItems.values())
.reduce((sum, item) => sum + item.quantity * item.unitPrice, 0);
return { ...state!, items: newItems, totalAmount: total };
}
case 'OrderConfirmed':
return { ...state!, status: 'Confirmed' };
case 'OrderCancelled':
return { ...state!, status: 'Cancelled' };
default:
return state!;
}
}
// 4. L'Aggregate: genera eventi in risposta a command
class OrderAggregate {
private state: OrderState | null = null;
private uncommittedEvents: OrderEvent[] = [];
private version: number = 0;
// Ricostruisce l'aggregate da una sequenza di eventi storici
static rehydrate(events: OrderEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
for (const event of events) {
aggregate.apply(event);
aggregate.version++;
}
aggregate.uncommittedEvents = []; // pulizia: gli eventi storici non sono "nuovi"
return aggregate;
}
// Metodo business: crea un nuovo ordine
create(orderId: string, customerId: string): void {
if (this.state !== null) {
throw new Error('Order already created');
}
this.raiseEvent({
type: 'OrderCreated',
orderId,
customerId,
createdAt: new Date().toISOString(),
});
}
// Metodo business: aggiunge un articolo
addItem(productId: string, quantity: number, unitPrice: number): void {
if (this.state?.status !== 'Draft') {
throw new Error('Cannot add items to a non-draft order');
}
this.raiseEvent({ type: 'ItemAdded', productId, quantity, unitPrice });
}
// Metodo business: conferma l'ordine
confirm(): void {
if (this.state?.status !== 'Draft') {
throw new Error('Only draft orders can be confirmed');
}
if (this.state.items.size === 0) {
throw new Error('Cannot confirm empty order');
}
this.raiseEvent({ type: 'OrderConfirmed', confirmedAt: new Date().toISOString() });
}
getUncommittedEvents(): OrderEvent[] {
return [...this.uncommittedEvents];
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): OrderState {
if (!this.state) throw new Error('Order not initialized');
return this.state;
}
get currentVersion(): number {
return this.version;
}
private raiseEvent(event: OrderEvent): void {
this.apply(event);
this.uncommittedEvents.push(event);
this.version++;
}
private apply(event: OrderEvent): void {
this.state = applyOrderEvent(this.state, event);
}
}
Etkinlik Mağazası: Etkinlik Veritabanı
Un Etkinlik Mağazası ve yazma ve yazma için optimize edilmiş salt ekleme veritabanı olay dizilerini okuyun. Temel yapı veEtkinlik Akışı: tek bir Toplama (kimliğiyle tanımlanır) için sıralı bir olay dizisi.
// Schema dell'Event Store con PostgreSQL
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- "Order-{orderId}"
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
version BIGINT NOT NULL, -- posizione nell'event stream
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Previene concurrent writes con lo stesso version number
CONSTRAINT uq_stream_version UNIQUE (stream_id, version)
);
-- Index per lettura efficiente di uno stream
CREATE INDEX idx_event_store_stream ON event_store (stream_id, version ASC);
-- Index per proiezioni su tipo di evento
CREATE INDEX idx_event_store_type ON event_store (event_type, created_at);
Olay Kaynağı Havuzu
// TypeScript: EventSourcedRepository con ottimistic concurrency
class OrderRepository {
constructor(private readonly db: Database) {}
// Carica un Order dal suo stream di eventi
async findById(orderId: string): Promise<OrderAggregate | null> {
const streamId = `Order-${orderId}`;
const rows = await this.db.query(
'SELECT event_type, event_data, version FROM event_store WHERE stream_id = $1 ORDER BY version ASC',
[streamId]
);
if (rows.length === 0) return null;
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
return OrderAggregate.rehydrate(events);
}
// Salva i nuovi eventi generati dall'Aggregate
async save(aggregate: OrderAggregate): Promise<void> {
const uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.length === 0) return;
const orderId = aggregate.getState().id;
const streamId = `Order-${orderId}`;
// version corrente = version dopo rehydrate
let expectedVersion = aggregate.currentVersion - uncommittedEvents.length;
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of uncommittedEvents) {
expectedVersion++;
await client.query(
`INSERT INTO event_store (stream_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[streamId, event.type, JSON.stringify(event), expectedVersion]
);
}
await client.query('COMMIT');
aggregate.clearUncommittedEvents();
} catch (err) {
await client.query('ROLLBACK');
// Se viola il UNIQUE constraint su (stream_id, version) = conflitto concorrente
if ((err as any).code === '23505') {
throw new Error(`Optimistic concurrency conflict on stream ${streamId}`);
}
throw err;
} finally {
client.release();
}
}
}
Zaman Yolculuğu Sorguları
Event Sourcing'in en güçlü avantajlarından biri durumu yeniden yapılandırma yeteneğidir. herhangi bir tarihsel anda toplamın:
// Ricostruisci lo stato dell'ordine alle 14:00 di ieri
async function getOrderStateAt(orderId: string, timestamp: Date): Promise<OrderState> {
const streamId = `Order-${orderId}`;
// Carica solo gli eventi antecedenti o uguali al timestamp
const rows = await db.query(
`SELECT event_type, event_data FROM event_store
WHERE stream_id = $1 AND created_at <= $2
ORDER BY version ASC`,
[streamId, timestamp.toISOString()]
);
const events = rows.map((row) => ({
type: row.event_type,
...row.event_data,
})) as OrderEvent[];
const aggregate = OrderAggregate.rehydrate(events);
return aggregate.getState();
}
// Esempio: "Qual era il totale dell'ordine prima dell'ultima modifica?"
const stateYesterday = await getOrderStateAt(
'order-789',
new Date('2026-03-19T14:00:00Z')
);
console.log(`Totale ieri alle 14:00: ${stateYesterday.totalAmount} EUR`);
EventStoreDB: Olay Kaynağına Yönelik Yerel Veritabanı
EventStoreDB ve Event Sourcing için özel olarak tasarlanmış bir veritabanı. Yerel temel öğeler olarak olay akışları, gerçek zamanlı bildirimler için abonelikler sağlar, ve sunucu tarafı projeksiyonları:
// Connessione a EventStoreDB con il client TypeScript
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
const client = EventStoreDBClient.connectionString(
"esdb://localhost:2113?tls=false"
);
// Scrivi eventi su uno stream
async function appendOrderEvents(
orderId: string,
events: OrderEvent[],
expectedVersion: bigint
): Promise<void> {
const streamName = `Order-${orderId}`;
const esdbEvents = events.map((event) =>
jsonEvent({
type: event.type,
data: event,
metadata: {
correlationId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
},
})
);
await client.appendToStream(streamName, esdbEvents, {
expectedRevision: expectedVersion, // optimistic concurrency
});
}
// Leggi tutti gli eventi di uno stream
async function readOrderStream(orderId: string): Promise<OrderEvent[]> {
const streamName = `Order-${orderId}`;
const events: OrderEvent[] = [];
const readResult = client.readStream(streamName, {
direction: "forwards",
fromRevision: "start",
});
for await (const resolvedEvent of readResult) {
if (resolvedEvent.event?.data) {
events.push(resolvedEvent.event.data as OrderEvent);
}
}
return events;
}
// Proiezione: conta gli ordini per status
// Eseguita server-side in EventStoreDB
const projection = `
fromAll()
.when({
$init: function() { return { confirmed: 0, cancelled: 0 }; },
'OrderConfirmed': function(state, event) { state.confirmed++; },
'OrderCancelled': function(state, event) { state.cancelled++; }
})
.outputState();
`;
Anlık Görüntü: Büyük Toplamların Tekrar Oynatılmasını Optimize Etme
Toplamda binlerce etkinlik varsa yeniden oynatma yavaşlar. Anlık görüntüler ve bir kontrol noktası: baştan başlamak yerine son anlık görüntüden yeniden oluşturulur:
// Snapshot ogni 100 eventi
const SNAPSHOT_INTERVAL = 100;
async function findByIdWithSnapshot(orderId: string): Promise<OrderAggregate> {
// 1. Cerca l'ultimo snapshot
const snapshot = await snapshotStore.findLatest(`Order-${orderId}`);
if (snapshot) {
// 2. Carica solo gli eventi dopo lo snapshot
const events = await eventStore.loadFrom(
`Order-${orderId}`,
snapshot.version + 1
);
// 3. Riapplica gli eventi recenti sopra lo snapshot
return OrderAggregate.rehydrateFromSnapshot(snapshot.state, events);
}
// Nessun snapshot: replay dall'inizio
const allEvents = await eventStore.loadAll(`Order-${orderId}`);
return OrderAggregate.rehydrate(allEvents);
}
// Crea uno snapshot dopo ogni save
async function saveWithSnapshot(aggregate: OrderAggregate): Promise<void> {
await orderRepo.save(aggregate);
// Crea snapshot ogni N eventi
if (aggregate.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
streamId: `Order-${aggregate.getState().id}`,
version: aggregate.currentVersion,
state: aggregate.getState(),
createdAt: new Date().toISOString(),
});
}
}
Etkinlik Kaynağı takası
Olay Kaynak Kullanımı Mantıklı Olduğunda
- Zorunlu denetim yolu: Hikayenin tamamının gerekli olduğu finansal sistemler, sağlık sistemleri, hukuk sistemleri
- Karmaşık Hata Ayıklama: "Kaseti geri sarabilmek" ve sorunlu bir duruma nasıl geldiğinizi anlamak istediğinizde
- Geçici iş zekası: "Dün saat 16:00'da Taslak durumunda kaç sipariş vardı?"
- Olay odaklı entegrasyon: Zaten bir EDA mimariniz varsa Event Sourcing doğal olarak entegre olur
Olay Kaynak Kullanımı Mantıklı Olmadığında
- Basit CRUD: Denetim gereklilikleri olmayan bir ana kayıt yönetim sistemi ES'den faydalanamaz
- Karmaşık sorgular: ES yazma işlemlerini optimize eder; okumalar projeksiyonlu CQRS gerektirir (ek karmaşıklık)
- ES deneyimi olmayan takım: Öğrenme eğrisi önemlidir; Kötü yönetilen karmaşıklık, faydalardan daha ağır basabilir
- Katı şema: Etkinlik şemanız sık sık değişiyorsa sürüm yönetimi karmaşık hale gelir
Sonuçlar ve Sonraki Adımlar
Event Sourcing, kalıcılığı durum güncellemesinden olay eklentisine dönüştürür değişmez. Sonuç, yerel denetim takibine, zaman yolculuğu sorgularına ve yazma ve okuma arasındaki doğal ayrım. Fiyat ve karmaşıklık: tekrarı olaylar, şema sürümlerini yönetme ve sorgular için CQRS gerektirme karmaşık.
Bir sonraki makale - CQRS - tam olarak bu zorluğa değiniyor: katman nasıl oluşturulur projeksiyonlar aracılığıyla olay deposuyla senkronize olan optimize edilmiş okuma modeli (okuma modeli), olay akışına dokunmadan hızlı sorgulara olanak tanır.
Olay Odaklı Mimari Serisinde Gelecek Makaleler
İlgili Seriler
- EDA'nın Temelleri — Event Sourcing'in kullanıldığı bağlam
- Apaçi Kafka — Yüksek hacimli akışlar için alternatif dağıtılmış etkinlik deposu







