Sourcing událostí + CQRS společně: projekce, snímek a konzistence
Samotný Event Sourcing zpracovává stranu zápisu neměnným a auditovatelným způsobem. CQRS sám optimalizuje čtení a psaní zvlášť. Použitý Spolu, řeší jeden z nejtěžších problémů softwarové architektury: jak mít plně auditovatelný systém, škálovatelný na čtení a výkonný na dotazy. Ale tato síla něco stojí: projekce, které je třeba udržovat, snímky pro přehrávání efektivní a správu jakékoli konzistence mezi modely zápisu a čtení.
Shrnutí: Sourcing událostí a samostatné CQRS
Než je zkombinujeme, zrekapitulujme si tyto dva vzory izolovaně:
- Sourcing událostí: Místo uložení aktuálního stavu agregátu do databáze, zachráníš sled událostí kdo to vytvořil. Stav je rekonstruován rozmnožování (přehrávání) události od začátku. Výhoda: kompletní audit trail, cestování časem, obchodní poznatky odvozené z událostí.
- CQRS: Příkazová strana (model zápisu) a strana dotazu (model čtení) jsou oddělené. Model zápisu přijímá Příkaz a vytváří události. Model čtení je optimalizován pro dotazy specifické pro aplikaci, sestavené z událostí.
Kombinace je přirozená: Event Sourcing vytváří události, CQRS je spotřebovává k vytváření optimalizované modely čtení (le projekce).
Kdy použít Event Sourcing + CQRS společně
- Systémy s povinnou auditní stopou (fintech, zdravotnictví, e-commerce)
- Při čtení mají modely velmi odlišné struktury od modelu zápisu
- Když je potřeba změnit měřítko 10-100x oproti zápisu
- Systémy s dotazy na cestování v čase („jaký byl stav 15. března?“)
- Nepoužívejte pro jednoduché aplikace CRUD: značné přetížení
Kompletní architektura: Tok end-to-end
Tok požadavku v systému Event Sourcing + CQRS je následující:
- Klient zašle a Příkaz (např.
EffettuaOrdineCommand) - Il Obsluha příkazů načíst agregát z obchodu událostí (přehrání události)
- Agregát ověří příkaz a vytvoří jeden nebo více Doménová událost (např.
OrdineEffettuatoEvent) - Události jsou přetrvávány doProdejna akcí (pouze připojit)
- Události jsou zveřejněny na Sběrnice zpráv (Kafka, EventBridge atd.)
- Jeden nebo více Projekční manipulátor konzumují události a aktualizují je Přečtěte si Model
- Klientské dotazy čtené z modelu čtení (obvykle databáze optimalizovaná pro čtení)
Implementace: Agregát s Event Sourcing
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
Event Store: The Persistence of Events
L'Prodejna akcí je to databáze, která uchovává sekvenci událostí pro každý agregát. Základním požadavkem je, že písma jsou pouze připojit a podporu a optimistická kontrola souběhu: zadáte očekávanou verzi agregátu a zápis selže, pokud mezitím někdo jiný napsal událost (konflikt).
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
Projekce: Sestavení modelu čtení z událostí
Una projekce je konzument událostí, který vytváří pohled (model čtení) optimalizovaný pro dotazy. Každá projekce je idempotentní a může být přestavěn od nuly tím, že si znovu přečtete všechny události od začátku obchodu Event Store.
Projekce jsou aktualizovány asynchronně po každé události: toto je zdroj z případná konzistence. Klient, který právě provedl příkaz, může nezobrazují okamžitě výsledky na modelu čtení.
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
Snímek: Optimalizace přehrávání agregátů s mnoha událostmi
Vzhledem k tomu, že agregát shromažďuje události v průběhu času, plné přehrávání zpomalí: objednávka se 100 aktualizacemi stavu vyžaduje 100 dotazů a 100 platí, než bude připravena spravovat nové Velení. The Snímky řeší tento problém: periodicky se aktuální stav Agregátu ukládá (snímek) a při příštím opětovném načtení začínáme od snímku místo od začátku příběhu.
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
Řízení případné konzistence
Hlavní výzvou Event Sourcing + CQRS je okno nekonzistence: po úspěšném provedení příkazu se model čtení ještě neaktualizuje (Projekce událost stále zpracovává). Klient, který čte hned poté možná neuvidíte své změny.
Existuje několik strategií, jak tuto situaci zvládnout:
Strategie 1: Dotazování s kontrolou verze
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
Strategie 2: Vraťte verzi v příkazové odpovědi
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
Strategie 3: Synchronní projekce pro kritická data
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
Rekonstrukce projekce
Jednou z velkých výhod Event Sourcingu je možnost přestavby jakoukoli projekci od nuly tím, že si znovu přečtete všechny události od začátku obchodu Event Store. To vám umožňuje zpětně přidávat nové modely čtení nebo opravovat chyby ve stávajících projekcích.
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
Osvědčené postupy a anti-vzorce
Anti-Pattern: Změna minulých událostí
Události v Event Store jsou neměnný. Po napsání je nikdy neupravujte.
Pokud je třeba změnit schéma události, vytvořte novou verzi události
(OrdineEffettuatoV2Event) a zvládnout upcasting v deserializátoru.
Změna minulých událostí narušuje integritu auditní stopy a přerušuje přehrávání.
- Jeden souhrn na transakci: Neupravujte více agregátů ve stejné jednotce práce. Použijte Saga k organizování cross-agregačních operací.
- Události jako minulost: Názvy událostí popisují něco, co se již stalo (
OrdineEffettuatoEvent, NeEffettuaOrdineEvent). - Idempotentní projekce: Každý obslužný program projekce musí být bezpečný pro vícenásobné vyvolání stejné události (alespoň jednou doručení).
- Nepoužívejte události jako Command: Publikování události nesmí přímo spustit příkaz na jiném agregátu. Použijte Saga nebo Process Manager.
- Zásady snímku založené na počtu událostí: Vytvořte snímky každých 50–100 událostí. Příliš časté = režie serializace; příliš vzácné = pomalé přehrávání.
Další kroky v sérii
- Článek 5 – Vzor ságy: když operace zahrnuje více agregátů nebo více služeb, Saga Pattern zpracovává distribuované transakce bez 2PC, s kompenzací transakcí v případě selhání.
- Článek 6 – AWS EventBridge: jak publikovat události z vašeho Event Store na EventBridge k dosažení Lambda, SQS a dalších cílů bez serveru.
Propojení s ostatními sériemi
- Apache Kafka (řada 38): Kafka je ideální Event Store a komunikační sběrnice pro Event Sourcing ve výrobě. Články o Kafka Streams a Kafka Connect se integrují přímo se zde popsanými vzory.
- PostgreSQL AI a pgvector: čtený model projekce může být PostgreSQL databáze se specializovanými indexy, včetně vektorových sloupců pro podobnost sémantika na textových popisech příkazů.







