Podsumowanie: Pozyskiwanie zdarzeń i oddzielne CQRS

Zanim je połączymy, podsumujmy oba wzorce oddzielnie:

  • Pozyskiwanie zdarzeń: Zamiast zapisywać bieżący stan agregatu w bazie danych, oszczędzasz sekwencja wydarzeń kto to stworzył. Państwo zostaje odbudowane reprodukcja (odtwarzanie) wydarzenia od początku. Zaleta: pełna ścieżka audytu, podróże w czasie, spostrzeżenia biznesowe pochodzące z wydarzeń.
  • CQRS: Strona poleceń (model zapisu) i strona zapytań (model odczytu) są oddzielne. Model zapisu odbiera polecenie i generuje zdarzenia. Model odczytu jest zoptymalizowany pod kątem zapytania specyficzne dla aplikacji, zbudowane na podstawie zdarzeń.

Połączenie jest naturalne: Event Sourcing tworzy zdarzenia, CQRS wykorzystuje je do budowy zoptymalizowane modele odczytu (np projekcje).

Kiedy stosować łącznie pozyskiwanie źródeł zdarzeń i CQRS

  • Systemy z obowiązkową ścieżką audytu (fintech, opieka zdrowotna, e-commerce)
  • Modele odczytu mają bardzo różną strukturę od modelu zapisu
  • Kiedy odczyty wymagają skalowania 10-100x w porównaniu do zapisów
  • Systemy z zapytaniami dotyczącymi podróży w czasie („Jaki był stan 15 marca?”)
  • Nie używać dla prostych aplikacji CRUD: znaczna przesada

Kompletna architektura: kompleksowy przepływ

Przebieg żądania w systemie Event Sourcing + CQRS wygląda następująco:

  1. Klient wysyła Rozkaz (np. EffettuaOrdineCommand)
  2. Il Osoba obsługująca polecenia załaduj agregat ze sklepu zdarzeń (powtórka wydarzenia)
  3. Agregat sprawdza polecenie i generuje jedno lub więcej Zdarzenie domeny (np. OrdineEffettuatoEvent)
  4. Wydarzenia są utrwalaneSklep z wydarzeniami (tylko do dołączenia)
  5. Wydarzenia publikowane są na Autobus wiadomości (Kafka, EventBridge itp.)
  6. Jeden lub więcej Obsługa projekcji konsumują wydarzenia i aktualizują Przeczytaj model
  7. Zapytania klientów odczytywane z modelu odczytu (zazwyczaj baza danych zoptymalizowana pod kątem odczytu)

Implementacja: Agregat z pozyskiwaniem zdarzeń

// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"

import java.util.*;
import java.time.Instant;

public class OrdineAggregate {

    // ID univoco dell'aggregate
    private String ordineId;
    // Versione corrente (numero di eventi applicati)
    private long version = -1;
    // Stato ricostruito dagli eventi
    private StatoOrdine stato;
    private String clienteId;
    private List<LineaOrdine> linee;
    private BigDecimal totale;

    // Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Costruttore privato: si crea sempre tramite replay o factory method
    private OrdineAggregate() {}

    // ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
    public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
        OrdineAggregate aggregate = new OrdineAggregate();
        events.forEach(aggregate::apply);
        return aggregate;
    }

    // ===== COMMAND HANDLING: valida il comando e produce eventi =====

    public void effettuaOrdine(EffettuaOrdineCommand cmd) {
        // Validazione business rules
        if (this.stato != null) {
            throw new IllegalStateException("Ordine gia esistente: " + ordineId);
        }
        if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
            throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
        }

        // Produce l'evento (nessuna modifica di stato diretta!)
        OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
            cmd.getOrdineId(),
            cmd.getClienteId(),
            cmd.getLinee(),
            Instant.now()
        );

        // Applica l'evento localmente e aggiungilo agli uncommitted
        applyAndRecord(event);
    }

    public void confermaPagamento(String metodoPagamento, String transactionId) {
        if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
            throw new IllegalStateException("Ordine non in attesa di pagamento");
        }

        PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
            ordineId, metodoPagamento, transactionId, Instant.now()
        );
        applyAndRecord(event);
    }

    // ===== APPLY: modifica lo stato interno applicando un evento =====
    // Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS

    private void apply(OrdineEffettuatoEvent event) {
        this.ordineId = event.getOrdineId();
        this.clienteId = event.getClienteId();
        this.linee = new ArrayList<>(event.getLinee());
        this.totale = event.getTotale();
        this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
        this.version++;
    }

    private void apply(PagamentoConfermatoEvent event) {
        this.stato = StatoOrdine.PAGATO;
        this.version++;
    }

    private void apply(DomainEvent event) {
        // Dispatch dinamico per tipo di evento
        if (event instanceof OrdineEffettuatoEvent e) apply(e);
        else if (event instanceof PagamentoConfermatoEvent e) apply(e);
        else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
    }

    private void applyAndRecord(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }

    public long getVersion() { return version; }
    public String getOrdineId() { return ordineId; }
}

Sklep z wydarzeniami: trwałość wydarzeń

L'Sklep z wydarzeniami jest to baza danych, która przechowuje sekwencję zdarzeń dla każdego agregatu. Podstawowym wymogiem jest to, że pisma święte są tylko do dołączania i wsparcie the optymistyczna kontrola współbieżności: określasz oczekiwaną wersję agregatu a pisanie kończy się niepowodzeniem, jeśli w międzyczasie ktoś inny zapisał wydarzenie (konflikt).

// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;

public interface EventStore {
    // Carica tutti gli eventi per un aggregate (per il replay)
    List<DomainEvent> loadEvents(String aggregateId);

    // Persiste nuovi eventi con optimistic concurrency check
    // expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
    void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}

// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {

    private final DataSource dataSource;
    private final EventSerializer serializer;

    // DDL della tabella event_store
    // CREATE TABLE event_store (
    //   id           BIGSERIAL PRIMARY KEY,
    //   aggregate_id VARCHAR(36) NOT NULL,
    //   version      BIGINT NOT NULL,
    //   event_type   VARCHAR(255) NOT NULL,
    //   event_data   JSONB NOT NULL,
    //   occurred_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    //   UNIQUE(aggregate_id, version)  -- optimistic locking
    // );

    @Override
    public List<DomainEvent> loadEvents(String aggregateId) {
        String sql = "SELECT event_type, event_data, version " +
                     "FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, aggregateId);
            ResultSet rs = ps.executeQuery();

            List<DomainEvent> events = new ArrayList<>();
            while (rs.next()) {
                events.add(serializer.deserialize(
                    rs.getString("event_type"),
                    rs.getString("event_data")
                ));
            }
            return events;

        } catch (SQLException e) {
            throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
        }
    }

    @Override
    public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
        String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
                     "VALUES (?, ?, ?, ?::jsonb)";

        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);

            // Optimistic concurrency check
            long currentVersion = getCurrentVersion(conn, aggregateId);
            if (currentVersion != expectedVersion) {
                throw new OptimisticConcurrencyException(
                    "Conflitto per aggregate " + aggregateId +
                    ": atteso version " + expectedVersion + ", trovato " + currentVersion
                );
            }

            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                long version = expectedVersion + 1;
                for (DomainEvent event : events) {
                    ps.setString(1, aggregateId);
                    ps.setLong(2, version++);
                    ps.setString(3, event.getClass().getSimpleName());
                    ps.setString(4, serializer.serialize(event));
                    ps.addBatch();
                }
                ps.executeBatch();
            }

            conn.commit();

        } catch (SQLException e) {
            throw new EventStoreException("Errore scrittura eventi", e);
        }
    }
}

Projekcja: budowanie modelu odczytu na podstawie wydarzeń

Una występ to konsument zdarzeń, który buduje widok (model odczytu) zoptymalizowany pod kątem zapytań. Każda projekcja jest idempotentna i może zostać zbudowane od nowa poprzez ponowne wczytanie wszystkich wydarzeń od początku Sklepu Wydarzeń.

Projekcje są aktualizowane asynchronicznie po każdym zdarzeniu: to jest źródło z ostateczna spójność. Klient, który właśnie wykonał polecenie, może nie widać od razu wyników w modelu odczytu.

// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata

@Component
public class OrdiniProjectionHandler {

    private final OrdiniReadRepository readRepo;

    // Gestisce OrdineEffettuatoEvent
    @EventHandler
    public void on(OrdineEffettuatoEvent event) {
        OrdineReadModel readModel = OrdineReadModel.builder()
            .ordineId(event.getOrdineId())
            .clienteId(event.getClienteId())
            .stato("IN_ATTESA_PAGAMENTO")
            .totale(event.getTotale())
            .linee(event.getLinee())
            .dataCreazione(event.getOccurredAt())
            .build();

        readRepo.save(readModel);
    }

    // Gestisce PagamentoConfermatoEvent
    @EventHandler
    public void on(PagamentoConfermatoEvent event) {
        readRepo.updateStato(event.getOrdineId(), "PAGATO");
        readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
    }

    // Query ottimizzate sul read model
    public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
        return readRepo.findByClienteId(clienteId);
    }

    public List<OrdineReadModel> getOrdiniInAttesa() {
        return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
    }
}

// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
//   ordine_id      VARCHAR(36) PRIMARY KEY,
//   cliente_id     VARCHAR(36) NOT NULL,
//   stato          VARCHAR(50) NOT NULL,
//   totale         DECIMAL(10,2),
//   data_creazione TIMESTAMPTZ,
//   data_pagamento TIMESTAMPTZ,
//   linee          JSONB,
//   -- Indici ottimizzati per le query frequenti
//   CONSTRAINT idx_cliente_id USING btree(cliente_id),
//   CONSTRAINT idx_stato USING btree(stato)
// );

Migawka: Optymalizacja odtwarzania agregatów z wieloma zdarzeniami

Ponieważ Agregat gromadzi zdarzenia w czasie, pełna powtórka staje się powolny: zamówienie zawierające 100 aktualizacji statusu wymaga 100 zapytań i 100 wniosków, zanim będzie gotowe zarządzać nowym Dowództwem. The Migawki rozwiązuje ten problem: okresowo zapisywany jest bieżący stan Agregatu (migawka) oraz przy następnym przeładowaniu zaczynamy od migawki, a nie od początku historii.

// SnapshotStore.java - Gestione degli snapshot degli aggregati

public interface SnapshotStore {
    Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
    void saveSnapshot(String aggregateId, Object aggregateState, long version);
}

// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {

    private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi

    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;

    public OrdineAggregate load(String ordineId) {
        // 1. Prova a caricare lo snapshot piu recente
        Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);

        OrdineAggregate aggregate;
        long fromVersion;

        if (snapshot.isPresent()) {
            // Parte dallo snapshot invece che dall'evento 0
            aggregate = (OrdineAggregate) snapshot.get().getState();
            fromVersion = snapshot.get().getVersion() + 1;
        } else {
            aggregate = new OrdineAggregate();
            fromVersion = 0;
        }

        // 2. Carica solo gli eventi DOPO lo snapshot
        List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
        recentEvents.forEach(aggregate::applyEvent);

        return aggregate;
    }

    public void save(OrdineAggregate aggregate) {
        List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
        eventStore.appendEvents(
            aggregate.getOrdineId(),
            uncommitted,
            aggregate.getVersion() - uncommitted.size()
        );
        aggregate.clearUncommittedEvents();

        // Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
        if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
            snapshotStore.saveSnapshot(
                aggregate.getOrdineId(),
                aggregate,          // serializza lo stato corrente
                aggregate.getVersion()
            );
        }
    }
}

// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
//   aggregate_id VARCHAR(36) NOT NULL,
//   version      BIGINT NOT NULL,
//   state        JSONB NOT NULL,
//   created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
//   PRIMARY KEY (aggregate_id, version)
// );

Zarządzanie ostateczną spójnością

Głównym wyzwaniem Event Sourcing + CQRS jest okno niespójności: po pomyślnym wykonaniu polecenia model odczytu nie jest jeszcze aktualizowany (Projekcja nadal przetwarza zdarzenie). Klient, który czyta zaraz potem możesz nie widzieć swoich zmian.

Istnieje kilka strategii radzenia sobie z tą sytuacją:

Strategia 1: Odpytywanie ze sprawdzaniem wersji

// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"

async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitMs) {
        const ordine = await readModel.getOrdine(ordineId);

        if (ordine && ordine.version >= expectedVersion) {
            return ordine;  // Projection aggiornata
        }

        await sleep(100);  // Aspetta 100ms e riprova
    }

    throw new ProjectionTimeoutError(
        `Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
    );
}

Strategia 2: Zwróć wersję w odpowiedzi na polecenie

// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling

@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
    OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
    aggregate.effettuaOrdine(cmd);
    ordineRepository.save(aggregate);

    return ResponseEntity.ok(CommandResponse.builder()
        .aggregateId(cmd.getOrdineId())
        .version(aggregate.getVersion())  // la versione dopo il salvataggio
        .message("Ordine effettuato con successo")
        .build());
}

Strategia 3: Synchroniczna projekcja danych krytycznych

// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente

@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
    // 1. Salva eventi nell'Event Store
    List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
    eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);

    // 2. Aggiorna il read model nella STESSA transazione (sincrono)
    uncommitted.forEach(event -> {
        if (event instanceof OrdineEffettuatoEvent e) {
            ordiniReadRepo.save(OrdineReadModel.from(e));
        }
    });

    // 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
    eventBus.publish(uncommitted);

    aggregate.clearUncommittedEvents();
}

Rekonstrukcja projekcji

Jedną z największych zalet Event Sourcingu jest możliwość odbudowy dowolną projekcję od podstaw poprzez ponowne odczytanie wszystkich wydarzeń od początku Sklepu Wydarzeń. Umożliwia to dodawanie nowych modeli odczytu z mocą wsteczną lub naprawianie błędów w istniejących prognozach.

// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {

    private final EventStore eventStore;

    public void rebuild(ProjectionHandler handler, String fromAggregateType) {
        System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());

        // 1. Svuota il read model corrente
        handler.reset();

        // 2. Legge tutti gli eventi in ordine cronologico
        // (implementazione dipende dall'event store specifico)
        AtomicLong processed = new AtomicLong(0);

        eventStore.streamAllEvents(fromAggregateType, event -> {
            handler.handle(event);
            long count = processed.incrementAndGet();
            if (count % 1000 == 0) {
                System.out.println("Processati " + count + " eventi...");
            }
        });

        System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
    }
}

Najlepsze praktyki i anty-wzorce

Anty-wzorzec: zmiana przeszłych wydarzeń

Wydarzenia w Sklepie Eventowym to niezmienny. Nigdy nie edytuj ich po napisaniu. Jeśli schemat zdarzenia wymaga zmiany, utwórz nową wersję zdarzenia (OrdineEffettuatoV2Event) i obsługuj przesyłanie w górę w deserializatorze. Zmiana przeszłych wydarzeń narusza integralność ścieżki audytu i przerywa odtwarzanie.

  • Jeden agregat na transakcję: Nie edytuj wielu agregatów w tej samej jednostce pracy. Użyj Sagi do koordynowania operacji obejmujących wiele agregatów.
  • Wydarzenia jako fakt z przeszłości: Nazwy zdarzeń opisują coś, co już się wydarzyło (OrdineEffettuatoEvent, Nie EffettuaOrdineEvent).
  • Projekcje idempotentne: Każdy moduł obsługi projekcji musi mieć możliwość wielokrotnego wywoływania tego samego zdarzenia (dostawa co najmniej raz).
  • Nie używaj zdarzeń takich jak Command: Publikowanie zdarzenia nie może bezpośrednio uruchamiać polecenia na innym agregacie. Użyj Sagi lub Process Managera.
  • Polityka migawek oparta na liczbie zdarzeń: Twórz migawki co 50–100 zdarzeń. Zbyt częste = obciążenie serializacją; zbyt rzadkie = wolne odtwarzanie.

Kolejne kroki w serii

  • Artykuł 5 – Wzór Sagi: gdy operacja obejmuje wiele agregatów lub wiele usług, Saga Pattern obsługuje transakcje rozproszone bez 2 komputerów, z transakcjami kompensującymi w przypadku niepowodzenia.
  • Artykuł 6 – AWS EventBridge: jak publikować wydarzenia ze sklepu z wydarzeniami na EventBridge, aby dotrzeć do Lambda, SQS i innych celów w sposób bezserwerowy.

Połącz z innymi seriami

  • Apache Kafka (seria 38): Kafka to idealny magazyn wydarzeń i autobus komunikatów dla Event Sourcingu w produkcji. Artykuły na temat Kafka Streams i Kafka Connect integrują się bezpośrednio z opisanymi tutaj wzorami.
  • PostgreSQL AI i pgvector: model odczytu projekcji może być baza danych PostgreSQL ze specjalistycznymi indeksami, w tym kolumnami wektorowymi dla podobieństwa semantyka opisów tekstowych zleceń.