Pozyskiwanie zdarzeń + CQRS razem: projekcja, migawka i spójność
Samo pozyskiwanie zdarzeń obsługuje stronę zapisu w niezmienny i możliwy do kontrolowania sposób. Sam CQRS optymalizuje osobno czytać i pisać. Używany Razemrozwiązują jeden z najtrudniejszych problemów architektury oprogramowania: jak mieć w pełni kontrolowalny system, skalowalny pod względem odczytów i wydajny w przypadku zapytań. Ale ta moc ma swoją cenę: projekcje do utrzymania, migawki do powtórki wydajne i zarządzanie dowolną spójnością pomiędzy modelami zapisu i odczytu.
Podsumowanie: Pozyskiwanie zdarzeń i oddzielne CQRS
Zanim je połączymy, podsumujmy oba wzorce oddzielnie:
- Pozyskiwanie zdarzeń: Zamiast zapisywać bieżący stan agregatu w bazie danych, oszczędzasz sekwencja wydarzeń kto to stworzył. Państwo zostaje odbudowane reprodukcja (odtwarzanie) wydarzenia od początku. Zaleta: pełna ścieżka audytu, podróże w czasie, spostrzeżenia biznesowe pochodzące z wydarzeń.
- CQRS: Strona poleceń (model zapisu) i strona zapytań (model odczytu) są oddzielne. Model zapisu odbiera polecenie i generuje zdarzenia. Model odczytu jest zoptymalizowany pod kątem zapytania specyficzne dla aplikacji, zbudowane na podstawie zdarzeń.
Połączenie jest naturalne: Event Sourcing tworzy zdarzenia, CQRS wykorzystuje je do budowy zoptymalizowane modele odczytu (np projekcje).
Kiedy stosować łącznie pozyskiwanie źródeł zdarzeń i CQRS
- Systemy z obowiązkową ścieżką audytu (fintech, opieka zdrowotna, e-commerce)
- Modele odczytu mają bardzo różną strukturę od modelu zapisu
- Kiedy odczyty wymagają skalowania 10-100x w porównaniu do zapisów
- Systemy z zapytaniami dotyczącymi podróży w czasie („Jaki był stan 15 marca?”)
- Nie używać dla prostych aplikacji CRUD: znaczna przesada
Kompletna architektura: kompleksowy przepływ
Przebieg żądania w systemie Event Sourcing + CQRS wygląda następująco:
- Klient wysyła Rozkaz (np.
EffettuaOrdineCommand) - Il Osoba obsługująca polecenia załaduj agregat ze sklepu zdarzeń (powtórka wydarzenia)
- Agregat sprawdza polecenie i generuje jedno lub więcej Zdarzenie domeny (np.
OrdineEffettuatoEvent) - Wydarzenia są utrwalaneSklep z wydarzeniami (tylko do dołączenia)
- Wydarzenia publikowane są na Autobus wiadomości (Kafka, EventBridge itp.)
- Jeden lub więcej Obsługa projekcji konsumują wydarzenia i aktualizują Przeczytaj model
- Zapytania klientów odczytywane z modelu odczytu (zazwyczaj baza danych zoptymalizowana pod kątem odczytu)
Implementacja: Agregat z pozyskiwaniem zdarzeń
// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"
import java.util.*;
import java.time.Instant;
public class OrdineAggregate {
// ID univoco dell'aggregate
private String ordineId;
// Versione corrente (numero di eventi applicati)
private long version = -1;
// Stato ricostruito dagli eventi
private StatoOrdine stato;
private String clienteId;
private List<LineaOrdine> linee;
private BigDecimal totale;
// Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Costruttore privato: si crea sempre tramite replay o factory method
private OrdineAggregate() {}
// ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
OrdineAggregate aggregate = new OrdineAggregate();
events.forEach(aggregate::apply);
return aggregate;
}
// ===== COMMAND HANDLING: valida il comando e produce eventi =====
public void effettuaOrdine(EffettuaOrdineCommand cmd) {
// Validazione business rules
if (this.stato != null) {
throw new IllegalStateException("Ordine gia esistente: " + ordineId);
}
if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
}
// Produce l'evento (nessuna modifica di stato diretta!)
OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
cmd.getOrdineId(),
cmd.getClienteId(),
cmd.getLinee(),
Instant.now()
);
// Applica l'evento localmente e aggiungilo agli uncommitted
applyAndRecord(event);
}
public void confermaPagamento(String metodoPagamento, String transactionId) {
if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
throw new IllegalStateException("Ordine non in attesa di pagamento");
}
PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
ordineId, metodoPagamento, transactionId, Instant.now()
);
applyAndRecord(event);
}
// ===== APPLY: modifica lo stato interno applicando un evento =====
// Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS
private void apply(OrdineEffettuatoEvent event) {
this.ordineId = event.getOrdineId();
this.clienteId = event.getClienteId();
this.linee = new ArrayList<>(event.getLinee());
this.totale = event.getTotale();
this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
this.version++;
}
private void apply(PagamentoConfermatoEvent event) {
this.stato = StatoOrdine.PAGATO;
this.version++;
}
private void apply(DomainEvent event) {
// Dispatch dinamico per tipo di evento
if (event instanceof OrdineEffettuatoEvent e) apply(e);
else if (event instanceof PagamentoConfermatoEvent e) apply(e);
else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
}
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
public long getVersion() { return version; }
public String getOrdineId() { return ordineId; }
}
Sklep z wydarzeniami: trwałość wydarzeń
L'Sklep z wydarzeniami jest to baza danych, która przechowuje sekwencję zdarzeń dla każdego agregatu. Podstawowym wymogiem jest to, że pisma święte są tylko do dołączania i wsparcie the optymistyczna kontrola współbieżności: określasz oczekiwaną wersję agregatu a pisanie kończy się niepowodzeniem, jeśli w międzyczasie ktoś inny zapisał wydarzenie (konflikt).
// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;
public interface EventStore {
// Carica tutti gli eventi per un aggregate (per il replay)
List<DomainEvent> loadEvents(String aggregateId);
// Persiste nuovi eventi con optimistic concurrency check
// expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}
// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {
private final DataSource dataSource;
private final EventSerializer serializer;
// DDL della tabella event_store
// CREATE TABLE event_store (
// id BIGSERIAL PRIMARY KEY,
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// event_type VARCHAR(255) NOT NULL,
// event_data JSONB NOT NULL,
// occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// UNIQUE(aggregate_id, version) -- optimistic locking
// );
@Override
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, aggregateId);
ResultSet rs = ps.executeQuery();
List<DomainEvent> events = new ArrayList<>();
while (rs.next()) {
events.add(serializer.deserialize(
rs.getString("event_type"),
rs.getString("event_data")
));
}
return events;
} catch (SQLException e) {
throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
}
}
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
"VALUES (?, ?, ?, ?::jsonb)";
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Optimistic concurrency check
long currentVersion = getCurrentVersion(conn, aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Conflitto per aggregate " + aggregateId +
": atteso version " + expectedVersion + ", trovato " + currentVersion
);
}
try (PreparedStatement ps = conn.prepareStatement(sql)) {
long version = expectedVersion + 1;
for (DomainEvent event : events) {
ps.setString(1, aggregateId);
ps.setLong(2, version++);
ps.setString(3, event.getClass().getSimpleName());
ps.setString(4, serializer.serialize(event));
ps.addBatch();
}
ps.executeBatch();
}
conn.commit();
} catch (SQLException e) {
throw new EventStoreException("Errore scrittura eventi", e);
}
}
}
Projekcja: budowanie modelu odczytu na podstawie wydarzeń
Una występ to konsument zdarzeń, który buduje widok (model odczytu) zoptymalizowany pod kątem zapytań. Każda projekcja jest idempotentna i może zostać zbudowane od nowa poprzez ponowne wczytanie wszystkich wydarzeń od początku Sklepu Wydarzeń.
Projekcje są aktualizowane asynchronicznie po każdym zdarzeniu: to jest źródło z ostateczna spójność. Klient, który właśnie wykonał polecenie, może nie widać od razu wyników w modelu odczytu.
// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata
@Component
public class OrdiniProjectionHandler {
private final OrdiniReadRepository readRepo;
// Gestisce OrdineEffettuatoEvent
@EventHandler
public void on(OrdineEffettuatoEvent event) {
OrdineReadModel readModel = OrdineReadModel.builder()
.ordineId(event.getOrdineId())
.clienteId(event.getClienteId())
.stato("IN_ATTESA_PAGAMENTO")
.totale(event.getTotale())
.linee(event.getLinee())
.dataCreazione(event.getOccurredAt())
.build();
readRepo.save(readModel);
}
// Gestisce PagamentoConfermatoEvent
@EventHandler
public void on(PagamentoConfermatoEvent event) {
readRepo.updateStato(event.getOrdineId(), "PAGATO");
readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
}
// Query ottimizzate sul read model
public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
return readRepo.findByClienteId(clienteId);
}
public List<OrdineReadModel> getOrdiniInAttesa() {
return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
}
}
// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
// ordine_id VARCHAR(36) PRIMARY KEY,
// cliente_id VARCHAR(36) NOT NULL,
// stato VARCHAR(50) NOT NULL,
// totale DECIMAL(10,2),
// data_creazione TIMESTAMPTZ,
// data_pagamento TIMESTAMPTZ,
// linee JSONB,
// -- Indici ottimizzati per le query frequenti
// CONSTRAINT idx_cliente_id USING btree(cliente_id),
// CONSTRAINT idx_stato USING btree(stato)
// );
Migawka: Optymalizacja odtwarzania agregatów z wieloma zdarzeniami
Ponieważ Agregat gromadzi zdarzenia w czasie, pełna powtórka staje się powolny: zamówienie zawierające 100 aktualizacji statusu wymaga 100 zapytań i 100 wniosków, zanim będzie gotowe zarządzać nowym Dowództwem. The Migawki rozwiązuje ten problem: okresowo zapisywany jest bieżący stan Agregatu (migawka) oraz przy następnym przeładowaniu zaczynamy od migawki, a nie od początku historii.
// SnapshotStore.java - Gestione degli snapshot degli aggregati
public interface SnapshotStore {
Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
void saveSnapshot(String aggregateId, Object aggregateState, long version);
}
// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
public OrdineAggregate load(String ordineId) {
// 1. Prova a caricare lo snapshot piu recente
Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);
OrdineAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// Parte dallo snapshot invece che dall'evento 0
aggregate = (OrdineAggregate) snapshot.get().getState();
fromVersion = snapshot.get().getVersion() + 1;
} else {
aggregate = new OrdineAggregate();
fromVersion = 0;
}
// 2. Carica solo gli eventi DOPO lo snapshot
List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
recentEvents.forEach(aggregate::applyEvent);
return aggregate;
}
public void save(OrdineAggregate aggregate) {
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(
aggregate.getOrdineId(),
uncommitted,
aggregate.getVersion() - uncommitted.size()
);
aggregate.clearUncommittedEvents();
// Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.saveSnapshot(
aggregate.getOrdineId(),
aggregate, // serializza lo stato corrente
aggregate.getVersion()
);
}
}
}
// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
// aggregate_id VARCHAR(36) NOT NULL,
// version BIGINT NOT NULL,
// state JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// PRIMARY KEY (aggregate_id, version)
// );
Zarządzanie ostateczną spójnością
Głównym wyzwaniem Event Sourcing + CQRS jest okno niespójności: po pomyślnym wykonaniu polecenia model odczytu nie jest jeszcze aktualizowany (Projekcja nadal przetwarza zdarzenie). Klient, który czyta zaraz potem możesz nie widzieć swoich zmian.
Istnieje kilka strategii radzenia sobie z tą sytuacją:
Strategia 1: Odpytywanie ze sprawdzaniem wersji
// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"
async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitMs) {
const ordine = await readModel.getOrdine(ordineId);
if (ordine && ordine.version >= expectedVersion) {
return ordine; // Projection aggiornata
}
await sleep(100); // Aspetta 100ms e riprova
}
throw new ProjectionTimeoutError(
`Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
);
}
Strategia 2: Zwróć wersję w odpowiedzi na polecenie
// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling
@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
aggregate.effettuaOrdine(cmd);
ordineRepository.save(aggregate);
return ResponseEntity.ok(CommandResponse.builder()
.aggregateId(cmd.getOrdineId())
.version(aggregate.getVersion()) // la versione dopo il salvataggio
.message("Ordine effettuato con successo")
.build());
}
Strategia 3: Synchroniczna projekcja danych krytycznych
// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente
@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
// 1. Salva eventi nell'Event Store
List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);
// 2. Aggiorna il read model nella STESSA transazione (sincrono)
uncommitted.forEach(event -> {
if (event instanceof OrdineEffettuatoEvent e) {
ordiniReadRepo.save(OrdineReadModel.from(e));
}
});
// 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
eventBus.publish(uncommitted);
aggregate.clearUncommittedEvents();
}
Rekonstrukcja projekcji
Jedną z największych zalet Event Sourcingu jest możliwość odbudowy dowolną projekcję od podstaw poprzez ponowne odczytanie wszystkich wydarzeń od początku Sklepu Wydarzeń. Umożliwia to dodawanie nowych modeli odczytu z mocą wsteczną lub naprawianie błędów w istniejących prognozach.
// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {
private final EventStore eventStore;
public void rebuild(ProjectionHandler handler, String fromAggregateType) {
System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());
// 1. Svuota il read model corrente
handler.reset();
// 2. Legge tutti gli eventi in ordine cronologico
// (implementazione dipende dall'event store specifico)
AtomicLong processed = new AtomicLong(0);
eventStore.streamAllEvents(fromAggregateType, event -> {
handler.handle(event);
long count = processed.incrementAndGet();
if (count % 1000 == 0) {
System.out.println("Processati " + count + " eventi...");
}
});
System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
}
}
Najlepsze praktyki i anty-wzorce
Anty-wzorzec: zmiana przeszłych wydarzeń
Wydarzenia w Sklepie Eventowym to niezmienny. Nigdy nie edytuj ich po napisaniu.
Jeśli schemat zdarzenia wymaga zmiany, utwórz nową wersję zdarzenia
(OrdineEffettuatoV2Event) i obsługuj przesyłanie w górę w deserializatorze.
Zmiana przeszłych wydarzeń narusza integralność ścieżki audytu i przerywa odtwarzanie.
- Jeden agregat na transakcję: Nie edytuj wielu agregatów w tej samej jednostce pracy. Użyj Sagi do koordynowania operacji obejmujących wiele agregatów.
- Wydarzenia jako fakt z przeszłości: Nazwy zdarzeń opisują coś, co już się wydarzyło (
OrdineEffettuatoEvent, NieEffettuaOrdineEvent). - Projekcje idempotentne: Każdy moduł obsługi projekcji musi mieć możliwość wielokrotnego wywoływania tego samego zdarzenia (dostawa co najmniej raz).
- Nie używaj zdarzeń takich jak Command: Publikowanie zdarzenia nie może bezpośrednio uruchamiać polecenia na innym agregacie. Użyj Sagi lub Process Managera.
- Polityka migawek oparta na liczbie zdarzeń: Twórz migawki co 50–100 zdarzeń. Zbyt częste = obciążenie serializacją; zbyt rzadkie = wolne odtwarzanie.
Kolejne kroki w serii
- Artykuł 5 – Wzór Sagi: gdy operacja obejmuje wiele agregatów lub wiele usług, Saga Pattern obsługuje transakcje rozproszone bez 2 komputerów, z transakcjami kompensującymi w przypadku niepowodzenia.
- Artykuł 6 – AWS EventBridge: jak publikować wydarzenia ze sklepu z wydarzeniami na EventBridge, aby dotrzeć do Lambda, SQS i innych celów w sposób bezserwerowy.
Połącz z innymi seriami
- Apache Kafka (seria 38): Kafka to idealny magazyn wydarzeń i autobus komunikatów dla Event Sourcingu w produkcji. Artykuły na temat Kafka Streams i Kafka Connect integrują się bezpośrednio z opisanymi tutaj wzorami.
- PostgreSQL AI i pgvector: model odczytu projekcji może być baza danych PostgreSQL ze specjalistycznymi indeksami, w tym kolumnami wektorowymi dla podobieństwa semantyka opisów tekstowych zleceń.







