Olay Kaynak Kullanımı + CQRS Birlikte: Projeksiyon, Anlık Görüntü ve Tutarlılık
Event Sourcing tek başına yazma tarafını değişmez ve denetlenebilir bir şekilde ele alır. CQRS tek başına optimize eder ayrı ayrı okuma ve yazma. Kullanılmış Birlikteen zor sorunlardan birini çözüyorlar yazılım mimarisi: tamamen denetlenebilir, okumalara göre ölçeklenebilir bir sisteme nasıl sahip olunur ve sorgularda daha iyi performans gösteriyor. Ancak bu gücün bir bedeli var: sürdürülecek projeksiyonlar, tekrar oynatılacak anlık görüntüler verimli ve yazma ve okuma modelleri arasındaki tutarlılığın yönetimi.
Özet: Olay Kaynak Kullanımı ve CQRS Ayrı
Bunları birleştirmeden önce iki modeli ayrı ayrı özetleyelim:
- Etkinlik Kaynak Kullanımı: Toplamanın mevcut durumunu veritabanına kaydetmek yerine, sen kaydet olaylar dizisi onu kim yarattı? Devlet yeniden inşa ediliyor üreme (tekrar oynatılıyor) olaylar başından itibaren. Avantajı: tam denetim takibi, zaman yolculuğu, olaylardan elde edilen iş öngörüleri.
- CQRS: Komut tarafı (model yazma) ve Sorgu tarafı (modeli okuma) ayrıdır. Yazma modeli Komutu alır ve olaylar üretir. Okuma modeli aşağıdakiler için optimize edilmiştir: olaylardan oluşturulan uygulamaya özel sorgular.
Kombinasyon doğaldır: Event Sourcing olaylar üretir, CQRS bunları oluşturmak için tüketir optimize edilmiş okuma modelleri (le projeksiyonlar).
Event Sourcing + CQRS Ne Zaman Birlikte Kullanılmalı?
- Zorunlu denetim takibine sahip sistemler (fintech, sağlık hizmetleri, e-ticaret)
- Okuma modelleri yazma modelinden çok farklı yapılara sahip olduğunda
- Okumaların yazmalara kıyasla 10-100x ölçeklendirilmesi gerektiğinde
- Zaman yolculuğu sorgusu içeren sistemler ("15 Mart'ta durum neydi?")
- Kullanma basit CRUD uygulamaları için: önemli ölçüde fazlalık
Eksiksiz Mimari: Uçtan uca akış
Event Sourcing + CQRS sisteminde bir isteğin akışı aşağıdaki gibidir:
- Müşteri bir mesaj gönderir Emretmek (ör.
EffettuaOrdineCommand) - Il Komut İşleyicisi Toplama'yı Etkinlik Deposu'ndan yükleyin (olay tekrarı)
- Toplama, Komutu doğrular ve bir veya daha fazla üretir Etki Alanı Etkinliği (ör.
OrdineEffettuatoEvent) - Olaylar devam ediyorEtkinlik Mağazası (yalnızca ekleme)
- Etkinlikler şu adreste yayınlanmaktadır: Mesaj Veriyolu (Kafka, EventBridge vb.)
- Bir veya daha fazla Projeksiyon İşleyicisi olayları tüketirler ve güncellerler Modeli Oku
- Okuma Modelinden okunan istemci sorguları (genellikle okuma için optimize edilmiş bir veritabanı)
Uygulama: Olay Kaynak Kullanımıyla Toplama
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
Etkinlik Mağazası: Etkinliklerin Devamlılığı
L'Etkinlik Mağazası her bir küme için olay dizisini sürdüren veritabanıdır. Temel gereklilik kutsal kitapların olmasıdır. salt ekleme ve destek the iyimser eşzamanlılık kontrolü: toplamanın beklenen sürümünü belirtirsiniz ve bu arada bir başkası bir olay (çatışma) yazmışsa yazma başarısız olur.
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
Projeksiyon: Olaylardan Okuma Modelini Oluşturma
Una projeksiyon bir görünüm oluşturan bir etkinlik tüketicisidir (okuma modeli) sorgular için optimize edilmiştir. Her projeksiyon idempotenttir ve Etkinlik Mağazası'nın başlangıcındaki tüm etkinliklerin yeniden okunmasıyla sıfırdan yeniden oluşturulabilir.
Projeksiyonlar her olaydan sonra eşzamansız olarak güncellenir: kaynak budur arasında nihai tutarlılık. Az önce bir Komutu yürüten istemci Okuma Modelinde sonuçları hemen görmezsiniz.
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
Anlık Görüntü: Birçok Olayla Toplamaların Yeniden Oynatılmasını Optimize Etme
Bir Toplama zaman içinde olayları biriktirdikçe, tam tekrar yavaşlar: 100 durum güncellemesi içeren bir sipariş, hazır olmadan önce 100 sorgu gerektirir ve 100 başvuru yapılır yeni bir Komuta yönetmek için. Anlık görüntüler bu sorunu çözer: Belirli aralıklarla Toplamın mevcut durumu kaydedilir (anlık görüntü) ve bir sonraki yeniden yüklemede Hikayenin başlangıcından ziyade anlık görüntüden başlıyoruz.
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
Nihai Tutarlılığı Yönetmek
Event Sourcing + CQRS'nin ana zorluğu tutarsızlık penceresi: Bir Komut başarıyla yürütüldükten sonra Okuma Modeli henüz güncellenmez (Projeksiyon hâlâ olayı işliyor). Hemen ardından okuyan bir müşteri değişikliklerinizi göremeyebilirsiniz.
Bu durumu yönetmek için çeşitli stratejiler vardır:
Strateji 1: Sürüm Kontrolü ile Yoklama
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
Strateji 2: Komut Yanıtındaki Sürümü Geri Döndürme
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
Strateji 3: Kritik Veriler için Eşzamanlı Projeksiyon
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
Projeksiyon yeniden inşa edildi
Event Sourcing'in en büyük avantajlarından biri, yeniden oluşturma yeteneğidir Etkinlik Mağazasının başlangıcından itibaren tüm etkinlikleri yeniden okuyarak herhangi bir projeksiyonu sıfırdan yapın. Bu, geriye dönük olarak yeni okuma modelleri eklemenize veya hataları düzeltmenize olanak tanır mevcut projeksiyonlarda.
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
En İyi Uygulamalar ve Anti-Kalıplar
Anti-Desen: Geçmiş Olayları Değiştirmek
Etkinlik Mağazasındaki etkinlikler: değişmez. Yazdıktan sonra asla düzenleme yapmayın.
Bir etkinlik şemasının değişmesi gerekiyorsa etkinliğin yeni bir sürümünü oluşturun
(OrdineEffettuatoV2Event) ve seri durumdan çıkarıcıda yukarıya yayın işlemini gerçekleştirin.
Geçmiş olayların değiştirilmesi, denetim takibinin bütünlüğünü tehlikeye atar ve tekrar oynatmayı keser.
- İşlem başına bir toplam: Aynı İş Biriminde birden fazla toplamı düzenlemeyin. Çapraz toplama operasyonlarını düzenlemek için Saga'yı kullanın.
- Geçmiş bir gerçek olarak olaylar: Etkinlik adları daha önce olmuş bir şeyi tanımlar (
OrdineEffettuatoEvent, OlumsuzEffettuaOrdineEvent). - İdempotent projeksiyonlar: Her projeksiyon işleyicisinin aynı olayla (en az bir kez teslimat) birden çok kez çağrılması güvenli olmalıdır.
- Komut gibi olayları kullanmayın: Bir etkinliğin yayınlanması, başka bir grupta doğrudan bir Komut başlatmamalıdır. Saga veya Process Manager'ı kullanın.
- Etkinlik sayısına dayalı anlık görüntü politikası: Her 50-100 olayda bir anlık görüntü oluşturun. Çok sık = serileştirme yükü; çok nadir = yavaş tekrar oynatma.
Serideki Sonraki Adımlar
- Madde 5 – Destan Deseni: Bir işlem birden fazla toplama içerdiğinde veya birden fazla hizmet, Saga Pattern dağıtılmış işlemleri 2PC olmadan yönetir, başarısızlık durumunda telafi edici işlemlerle.
- Madde 6 – AWS EventBridge: Etkinlik Mağazanızdan etkinlikler nasıl yayınlanır Lambda, SQS ve diğer hedeflere sunucusuz bir şekilde ulaşmak için EventBridge'de.
Diğer Serilerle Bağlantı
- Apaçi Kafka (Seri 38): Kafka ideal Etkinlik Mağazası ve mesaj veriyoludur Üretimde Olay Kaynak Kullanımı için. Kafka Streams ve Kafka Connect hakkındaki makaleler entegre oluyor burada açıklanan desenlerle doğrudan.
- PostgreSQL AI ve pgvektör: Bir projeksiyonun okuma modeli şu şekilde olabilir: benzerlik için vektör sütunları da dahil olmak üzere özel indekslere sahip bir PostgreSQL veritabanı emirlerin metinsel açıklamalarına ilişkin anlambilim.







