Eventsourcing + CQRS samen: projectie, momentopname en consistentie
Alleen Event Sourcing regelt de schrijfkant op een onveranderlijke en controleerbare manier. Alleen CQRS optimaliseert afzonderlijk lezen en schrijven. Gebruikt Samen, lossen ze een van de moeilijkste problemen op van softwarearchitectuur: hoe u een volledig controleerbaar systeem krijgt, schaalbaar op basis van metingen en presteert op vragen. Maar aan deze macht zijn kosten verbonden: projecties die moeten worden gehandhaafd, momentopnamen die opnieuw kunnen worden afgespeeld efficiënt en het beheer van enige consistentie tussen schrijf- en leesmodellen.
Samenvatting: Event Sourcing en CQRS gescheiden
Laten we, voordat we ze combineren, de twee patronen afzonderlijk samenvatten:
- Inkoop van evenementen: In plaats van de huidige status van het aggregaat in de database op te slaan, jij bewaart de opeenvolging van gebeurtenissen wie het heeft gemaakt. De staat wordt gereconstrueerd reproduceren (opnieuw afspelen) de gebeurtenissen vanaf het begin. Voordeel: compleet audittraject, tijdreizen, zakelijke inzichten afgeleid van evenementen.
- CQRS: Commandozijde (schrijfmodel) en Queryzijde (leesmodel) zijn gescheiden. Het schrijfmodel ontvangt Commando en produceert gebeurtenissen. Het leesmodel is geoptimaliseerd voor toepassingsspecifieke query's, opgebouwd uit gebeurtenissen.
De combinatie is logisch: Event Sourcing produceert evenementen, CQRS gebruikt ze om te bouwen geoptimaliseerde leesmodellen (bijv projecties).
Wanneer gebruik je Event Sourcing + CQRS samen?
- Systemen met verplichte audit trail (fintech, gezondheidszorg, e-commerce)
- Wanneer leesmodellen een heel andere structuur hebben dan het schrijfmodel
- Wanneer leesbewerkingen 10-100x moeten worden geschaald in vergelijking met schrijfbewerkingen
- Systemen met tijdreisvragen ("wat was de status op 15 maart?")
- Niet gebruiken voor eenvoudige CRUD-toepassingen: aanzienlijke overkill
Volledige architectuur: end-to-end-stroom
De stroom van een aanvraag in een Event Sourcing + CQRS-systeem is als volgt:
- De opdrachtgever stuurt een Commando (bijv.
EffettuaOrdineCommand) - Il Commandobehandelaar laad het aggregaat uit de gebeurteniswinkel (herhaling van gebeurtenissen)
- Het aggregaat valideert de opdracht en produceert er een of meer Domein evenement (bijv.
OrdineEffettuatoEvent) - Gebeurtenissen worden voortgezetEvenementenwinkel (alleen toevoegen)
- Evenementen worden gepubliceerd op Berichtenbus (Kafka, EventBridge, enz.)
- Eén of meer Projectiebehandelaar ze consumeren evenementen en updaten de Model lezen
- Clientquery's worden gelezen uit het leesmodel (doorgaans een voor lezen geoptimaliseerde database)
Implementatie: The Aggregate met Event Sourcing
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
Event Store: het voortbestaan van evenementen
L'Evenementenwinkel het is de database die de reeks gebeurtenissen voor elk aggregaat bijhoudt. De fundamentele vereiste is dat de Schriften dat zijn alleen toevoegen en ondersteuning de optimistische gelijktijdigheidscontrole: u specificeert de verwachte versie van het aggregaat en het schrijven mislukt als iemand anders in de tussentijd een gebeurtenis heeft opgeschreven (conflict).
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
Projectie: het leesmodel bouwen op basis van gebeurtenissen
Una projectie is een evenementenconsument die een visie opbouwt (het leesmodel) geoptimaliseerd voor zoekopdrachten. Elke projectie is idempotent en kan opnieuw opgebouwd worden door alle gebeurtenissen vanaf het begin van de Event Store opnieuw te lezen.
De projecties worden na elke gebeurtenis asynchroon bijgewerkt: dit is de bron van uiteindelijke consistentie. De cliënt die zojuist een opdracht heeft uitgevoerd, zou dit kunnen doen zie niet onmiddellijk resultaten op het Leesmodel.
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
Momentopname: het optimaliseren van de herhaling van aggregaten met veel evenementen
Omdat een aggregaat gebeurtenissen in de loop van de tijd verzamelt, wordt de volledige herhaling wordt langzaam: een bestelling met 100 statusupdates vereist 100 vragen en 100 aanvragen voordat deze klaar is om een nieuw commando te beheren. De Momentopnamen lost dit probleem op: periodiek wordt de huidige status van het aggregaat opgeslagen (momentopname), en bij de volgende herlaadbeurt we vertrekken vanaf de momentopname in plaats van vanaf het begin van het verhaal.
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
Het beheren van de uiteindelijke consistentie
De belangrijkste uitdaging van Event Sourcing + CQRS is inconsistentie venster: nadat een opdracht met succes is uitgevoerd, wordt het leesmodel nog niet bijgewerkt (De projectie verwerkt de gebeurtenis nog steeds). Een cliënt die direct daarna leest Mogelijk ziet u uw wijzigingen niet.
Er zijn verschillende strategieën om deze situatie te beheersen:
Strategie 1: Polling met versiecontrole
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
Strategie 2: Geef de versie terug in de opdrachtreactie
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
Strategie 3: Synchrone projectie voor kritieke gegevens
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
Projectie opnieuw opbouwen
Een van de grote voordelen van Event Sourcing is de mogelijkheid om opnieuw op te bouwen elke projectie helemaal opnieuw door alle gebeurtenissen vanaf het begin van de evenementenwinkel opnieuw te lezen. Hierdoor kunt u met terugwerkende kracht nieuwe leesmodellen toevoegen of bugs oplossen in bestaande projecties.
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
Beste praktijken en antipatronen
Antipatroon: gebeurtenissen uit het verleden veranderen
Evenementen in de Event Store wel onveranderlijk. Bewerk ze nooit na het schrijven.
Als een gebeurtenisschema moet worden gewijzigd, maakt u een nieuwe versie van de gebeurtenis
(OrdineEffettuatoV2Event) en handel upcasting af in de deserializer.
Het wijzigen van gebeurtenissen uit het verleden brengt de integriteit van het audittraject in gevaar en belemmert de herhaling.
- Eén aggregaat per transactie: bewerk niet meerdere aggregaties in dezelfde werkeenheid. Gebruik Saga om geaggregeerde bewerkingen te organiseren.
- Gebeurtenissen als een feit uit het verleden: Gebeurtenisnamen beschrijven iets dat al is gebeurd (
OrdineEffettuatoEvent, NietEffettuaOrdineEvent). - Idempotente projecties: Elke projectie-handler moet veilig meerdere keren kunnen worden aangeroepen met dezelfde gebeurtenis (minstens één keer levering).
- Gebruik geen gebeurtenissen zoals Command: Het publiceren van een gebeurtenis mag niet rechtstreeks een opdracht op een ander aggregaat starten. Gebruik Saga of Process Manager.
- Momentopnamebeleid op basis van het aantal gebeurtenissen: Maak elke 50-100 gebeurtenissen momentopnamen. Te frequent = serialisatie-overhead; te zeldzaam = langzame herhaling.
Volgende stappen in de serie
- Artikel 5 – Saga-patroon: wanneer bij een bewerking meerdere aggregaten betrokken zijn of meerdere services, verwerkt het Saga Pattern gedistribueerde transacties zonder 2PC, met het compenseren van transacties in geval van mislukking.
- Artikel 6 – AWS EventBridge: hoe u evenementen publiceert vanuit uw evenementenwinkel op EventBridge om Lambda, SQS en andere doelen op een serverloze manier te bereiken.
Link met andere series
- Apache Kafka (serie 38): Kafka is de ideale Event Store en berichtenbus voor Event Sourcing in productie. De artikelen over Kafka Streams en Kafka Connect integreren direct met de hier beschreven patronen.
- PostgreSQL AI en pgvector: het leesmodel van een projectie kan zijn een PostgreSQL-database met gespecialiseerde indexen, inclusief vectorkolommen voor gelijkenis semantiek op tekstuele beschrijvingen van bestellingen.







