Samenvatting: Event Sourcing en CQRS gescheiden

Laten we, voordat we ze combineren, de twee patronen afzonderlijk samenvatten:

  • Inkoop van evenementen: In plaats van de huidige status van het aggregaat in de database op te slaan, jij bewaart de opeenvolging van gebeurtenissen wie het heeft gemaakt. De staat wordt gereconstrueerd reproduceren (opnieuw afspelen) de gebeurtenissen vanaf het begin. Voordeel: compleet audittraject, tijdreizen, zakelijke inzichten afgeleid van evenementen.
  • CQRS: Commandozijde (schrijfmodel) en Queryzijde (leesmodel) zijn gescheiden. Het schrijfmodel ontvangt Commando en produceert gebeurtenissen. Het leesmodel is geoptimaliseerd voor toepassingsspecifieke query's, opgebouwd uit gebeurtenissen.

De combinatie is logisch: Event Sourcing produceert evenementen, CQRS gebruikt ze om te bouwen geoptimaliseerde leesmodellen (bijv projecties).

Wanneer gebruik je Event Sourcing + CQRS samen?

  • Systemen met verplichte audit trail (fintech, gezondheidszorg, e-commerce)
  • Wanneer leesmodellen een heel andere structuur hebben dan het schrijfmodel
  • Wanneer leesbewerkingen 10-100x moeten worden geschaald in vergelijking met schrijfbewerkingen
  • Systemen met tijdreisvragen ("wat was de status op 15 maart?")
  • Niet gebruiken voor eenvoudige CRUD-toepassingen: aanzienlijke overkill

Volledige architectuur: end-to-end-stroom

De stroom van een aanvraag in een Event Sourcing + CQRS-systeem is als volgt:

  1. De opdrachtgever stuurt een Commando (bijv. EffettuaOrdineCommand)
  2. Il Commandobehandelaar laad het aggregaat uit de gebeurteniswinkel (herhaling van gebeurtenissen)
  3. Het aggregaat valideert de opdracht en produceert er een of meer Domein evenement (bijv. OrdineEffettuatoEvent)
  4. Gebeurtenissen worden voortgezetEvenementenwinkel (alleen toevoegen)
  5. Evenementen worden gepubliceerd op Berichtenbus (Kafka, EventBridge, enz.)
  6. Eén of meer Projectiebehandelaar ze consumeren evenementen en updaten de Model lezen
  7. Clientquery's worden gelezen uit het leesmodel (doorgaans een voor lezen geoptimaliseerde database)

Implementatie: The Aggregate met Event Sourcing

// OrdineAggregate.java - Aggregate con Event Sourcing
// Segue il pattern "eventi come source of truth"

import java.util.*;
import java.time.Instant;

public class OrdineAggregate {

    // ID univoco dell'aggregate
    private String ordineId;
    // Versione corrente (numero di eventi applicati)
    private long version = -1;
    // Stato ricostruito dagli eventi
    private StatoOrdine stato;
    private String clienteId;
    private List<LineaOrdine> linee;
    private BigDecimal totale;

    // Lista degli eventi prodotti dalla sessione corrente (non ancora persistiti)
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Costruttore privato: si crea sempre tramite replay o factory method
    private OrdineAggregate() {}

    // ===== FACTORY METHOD: ricostruisce lo stato dal replay degli eventi =====
    public static OrdineAggregate reconstituteFrom(List<DomainEvent> events) {
        OrdineAggregate aggregate = new OrdineAggregate();
        events.forEach(aggregate::apply);
        return aggregate;
    }

    // ===== COMMAND HANDLING: valida il comando e produce eventi =====

    public void effettuaOrdine(EffettuaOrdineCommand cmd) {
        // Validazione business rules
        if (this.stato != null) {
            throw new IllegalStateException("Ordine gia esistente: " + ordineId);
        }
        if (cmd.getLinee() == null || cmd.getLinee().isEmpty()) {
            throw new IllegalArgumentException("L'ordine deve avere almeno una linea");
        }

        // Produce l'evento (nessuna modifica di stato diretta!)
        OrdineEffettuatoEvent event = new OrdineEffettuatoEvent(
            cmd.getOrdineId(),
            cmd.getClienteId(),
            cmd.getLinee(),
            Instant.now()
        );

        // Applica l'evento localmente e aggiungilo agli uncommitted
        applyAndRecord(event);
    }

    public void confermaPagamento(String metodoPagamento, String transactionId) {
        if (this.stato != StatoOrdine.IN_ATTESA_PAGAMENTO) {
            throw new IllegalStateException("Ordine non in attesa di pagamento");
        }

        PagamentoConfermatoEvent event = new PagamentoConfermatoEvent(
            ordineId, metodoPagamento, transactionId, Instant.now()
        );
        applyAndRecord(event);
    }

    // ===== APPLY: modifica lo stato interno applicando un evento =====
    // Questi metodi devono essere DETERMINISTICI e SENZA SIDE EFFECTS

    private void apply(OrdineEffettuatoEvent event) {
        this.ordineId = event.getOrdineId();
        this.clienteId = event.getClienteId();
        this.linee = new ArrayList<>(event.getLinee());
        this.totale = event.getTotale();
        this.stato = StatoOrdine.IN_ATTESA_PAGAMENTO;
        this.version++;
    }

    private void apply(PagamentoConfermatoEvent event) {
        this.stato = StatoOrdine.PAGATO;
        this.version++;
    }

    private void apply(DomainEvent event) {
        // Dispatch dinamico per tipo di evento
        if (event instanceof OrdineEffettuatoEvent e) apply(e);
        else if (event instanceof PagamentoConfermatoEvent e) apply(e);
        else throw new IllegalArgumentException("Evento sconosciuto: " + event.getClass());
    }

    private void applyAndRecord(DomainEvent event) {
        apply(event);
        uncommittedEvents.add(event);
    }

    public List<DomainEvent> getUncommittedEvents() {
        return Collections.unmodifiableList(uncommittedEvents);
    }

    public void clearUncommittedEvents() {
        uncommittedEvents.clear();
    }

    public long getVersion() { return version; }
    public String getOrdineId() { return ordineId; }
}

Event Store: het voortbestaan ​​van evenementen

L'Evenementenwinkel het is de database die de reeks gebeurtenissen voor elk aggregaat bijhoudt. De fundamentele vereiste is dat de Schriften dat zijn alleen toevoegen en ondersteuning de optimistische gelijktijdigheidscontrole: u specificeert de verwachte versie van het aggregaat en het schrijven mislukt als iemand anders in de tussentijd een gebeurtenis heeft opgeschreven (conflict).

// EventStore.java - Interfaccia e implementazione PostgreSQL
import java.util.*;

public interface EventStore {
    // Carica tutti gli eventi per un aggregate (per il replay)
    List<DomainEvent> loadEvents(String aggregateId);

    // Persiste nuovi eventi con optimistic concurrency check
    // expectedVersion: la versione che ci aspettavamo (fallisce se diversa)
    void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion);
}

// Implementazione con PostgreSQL
public class PostgresEventStore implements EventStore {

    private final DataSource dataSource;
    private final EventSerializer serializer;

    // DDL della tabella event_store
    // CREATE TABLE event_store (
    //   id           BIGSERIAL PRIMARY KEY,
    //   aggregate_id VARCHAR(36) NOT NULL,
    //   version      BIGINT NOT NULL,
    //   event_type   VARCHAR(255) NOT NULL,
    //   event_data   JSONB NOT NULL,
    //   occurred_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    //   UNIQUE(aggregate_id, version)  -- optimistic locking
    // );

    @Override
    public List<DomainEvent> loadEvents(String aggregateId) {
        String sql = "SELECT event_type, event_data, version " +
                     "FROM event_store WHERE aggregate_id = ? ORDER BY version ASC";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, aggregateId);
            ResultSet rs = ps.executeQuery();

            List<DomainEvent> events = new ArrayList<>();
            while (rs.next()) {
                events.add(serializer.deserialize(
                    rs.getString("event_type"),
                    rs.getString("event_data")
                ));
            }
            return events;

        } catch (SQLException e) {
            throw new EventStoreException("Errore caricamento eventi per " + aggregateId, e);
        }
    }

    @Override
    public void appendEvents(String aggregateId, List<DomainEvent> events, long expectedVersion) {
        String sql = "INSERT INTO event_store (aggregate_id, version, event_type, event_data) " +
                     "VALUES (?, ?, ?, ?::jsonb)";

        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);

            // Optimistic concurrency check
            long currentVersion = getCurrentVersion(conn, aggregateId);
            if (currentVersion != expectedVersion) {
                throw new OptimisticConcurrencyException(
                    "Conflitto per aggregate " + aggregateId +
                    ": atteso version " + expectedVersion + ", trovato " + currentVersion
                );
            }

            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                long version = expectedVersion + 1;
                for (DomainEvent event : events) {
                    ps.setString(1, aggregateId);
                    ps.setLong(2, version++);
                    ps.setString(3, event.getClass().getSimpleName());
                    ps.setString(4, serializer.serialize(event));
                    ps.addBatch();
                }
                ps.executeBatch();
            }

            conn.commit();

        } catch (SQLException e) {
            throw new EventStoreException("Errore scrittura eventi", e);
        }
    }
}

Projectie: het leesmodel bouwen op basis van gebeurtenissen

Una projectie is een evenementenconsument die een visie opbouwt (het leesmodel) geoptimaliseerd voor zoekopdrachten. Elke projectie is idempotent en kan opnieuw opgebouwd worden door alle gebeurtenissen vanaf het begin van de Event Store opnieuw te lezen.

De projecties worden na elke gebeurtenis asynchroon bijgewerkt: dit is de bron van uiteindelijke consistentie. De cliënt die zojuist een opdracht heeft uitgevoerd, zou dit kunnen doen zie niet onmiddellijk resultaten op het Leesmodel.

// OrdiniProjectionHandler.java - Costruisce il read model degli ordini
// Ascolta gli eventi e aggiorna la tabella di lettura ottimizzata

@Component
public class OrdiniProjectionHandler {

    private final OrdiniReadRepository readRepo;

    // Gestisce OrdineEffettuatoEvent
    @EventHandler
    public void on(OrdineEffettuatoEvent event) {
        OrdineReadModel readModel = OrdineReadModel.builder()
            .ordineId(event.getOrdineId())
            .clienteId(event.getClienteId())
            .stato("IN_ATTESA_PAGAMENTO")
            .totale(event.getTotale())
            .linee(event.getLinee())
            .dataCreazione(event.getOccurredAt())
            .build();

        readRepo.save(readModel);
    }

    // Gestisce PagamentoConfermatoEvent
    @EventHandler
    public void on(PagamentoConfermatoEvent event) {
        readRepo.updateStato(event.getOrdineId(), "PAGATO");
        readRepo.updateDataPagamento(event.getOrdineId(), event.getOccurredAt());
    }

    // Query ottimizzate sul read model
    public List<OrdineReadModel> getOrdiniCliente(String clienteId) {
        return readRepo.findByClienteId(clienteId);
    }

    public List<OrdineReadModel> getOrdiniInAttesa() {
        return readRepo.findByStato("IN_ATTESA_PAGAMENTO");
    }
}

// Schema del read model (tabella ottimizzata per le query piu comuni)
// CREATE TABLE ordini_read (
//   ordine_id      VARCHAR(36) PRIMARY KEY,
//   cliente_id     VARCHAR(36) NOT NULL,
//   stato          VARCHAR(50) NOT NULL,
//   totale         DECIMAL(10,2),
//   data_creazione TIMESTAMPTZ,
//   data_pagamento TIMESTAMPTZ,
//   linee          JSONB,
//   -- Indici ottimizzati per le query frequenti
//   CONSTRAINT idx_cliente_id USING btree(cliente_id),
//   CONSTRAINT idx_stato USING btree(stato)
// );

Momentopname: het optimaliseren van de herhaling van aggregaten met veel evenementen

Omdat een aggregaat gebeurtenissen in de loop van de tijd verzamelt, wordt de volledige herhaling wordt langzaam: een bestelling met 100 statusupdates vereist 100 vragen en 100 aanvragen voordat deze klaar is om een nieuw commando te beheren. De Momentopnamen lost dit probleem op: periodiek wordt de huidige status van het aggregaat opgeslagen (momentopname), en bij de volgende herlaadbeurt we vertrekken vanaf de momentopname in plaats van vanaf het begin van het verhaal.

// SnapshotStore.java - Gestione degli snapshot degli aggregati

public interface SnapshotStore {
    Optional<AggregateSnapshot> loadLatestSnapshot(String aggregateId);
    void saveSnapshot(String aggregateId, Object aggregateState, long version);
}

// AggregateRepository con supporto snapshot
public class SnapshottingAggregateRepository {

    private static final int SNAPSHOT_THRESHOLD = 50; // snapshot ogni 50 eventi

    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;

    public OrdineAggregate load(String ordineId) {
        // 1. Prova a caricare lo snapshot piu recente
        Optional<AggregateSnapshot> snapshot = snapshotStore.loadLatestSnapshot(ordineId);

        OrdineAggregate aggregate;
        long fromVersion;

        if (snapshot.isPresent()) {
            // Parte dallo snapshot invece che dall'evento 0
            aggregate = (OrdineAggregate) snapshot.get().getState();
            fromVersion = snapshot.get().getVersion() + 1;
        } else {
            aggregate = new OrdineAggregate();
            fromVersion = 0;
        }

        // 2. Carica solo gli eventi DOPO lo snapshot
        List<DomainEvent> recentEvents = eventStore.loadEventsFrom(ordineId, fromVersion);
        recentEvents.forEach(aggregate::applyEvent);

        return aggregate;
    }

    public void save(OrdineAggregate aggregate) {
        List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
        eventStore.appendEvents(
            aggregate.getOrdineId(),
            uncommitted,
            aggregate.getVersion() - uncommitted.size()
        );
        aggregate.clearUncommittedEvents();

        // Crea uno snapshot se l'aggregate ha accumulato abbastanza eventi
        if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
            snapshotStore.saveSnapshot(
                aggregate.getOrdineId(),
                aggregate,          // serializza lo stato corrente
                aggregate.getVersion()
            );
        }
    }
}

// Tabella snapshot
// CREATE TABLE aggregate_snapshots (
//   aggregate_id VARCHAR(36) NOT NULL,
//   version      BIGINT NOT NULL,
//   state        JSONB NOT NULL,
//   created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
//   PRIMARY KEY (aggregate_id, version)
// );

Het beheren van de uiteindelijke consistentie

De belangrijkste uitdaging van Event Sourcing + CQRS is inconsistentie venster: nadat een opdracht met succes is uitgevoerd, wordt het leesmodel nog niet bijgewerkt (De projectie verwerkt de gebeurtenis nog steeds). Een cliënt die direct daarna leest Mogelijk ziet u uw wijzigingen niet.

Er zijn verschillende strategieën om deze situatie te beheersen:

Strategie 1: Polling met versiecontrole

// Il client aspetta che il read model raggiunga la versione attesa
// Utile per operazioni sequenziali: "aspetta che l'ordine sia visibile prima di procedere"

async function waitForProjection(ordineId, expectedVersion, maxWaitMs = 5000) {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitMs) {
        const ordine = await readModel.getOrdine(ordineId);

        if (ordine && ordine.version >= expectedVersion) {
            return ordine;  // Projection aggiornata
        }

        await sleep(100);  // Aspetta 100ms e riprova
    }

    throw new ProjectionTimeoutError(
        `Timeout: read model non aggiornato dopo ${maxWaitMs}ms per ordine ${ordineId}`
    );
}

Strategie 2: Geef de versie terug in de opdrachtreactie

// Command Handler restituisce la versione dell'evento prodotto
// Il client la usa per il version check o per il polling

@PostMapping("/ordini")
public ResponseEntity<CommandResponse> effettuaOrdine(@RequestBody EffettuaOrdineCommand cmd) {
    OrdineAggregate aggregate = ordineRepository.load(cmd.getOrdineId());
    aggregate.effettuaOrdine(cmd);
    ordineRepository.save(aggregate);

    return ResponseEntity.ok(CommandResponse.builder()
        .aggregateId(cmd.getOrdineId())
        .version(aggregate.getVersion())  // la versione dopo il salvataggio
        .message("Ordine effettuato con successo")
        .build());
}

Strategie 3: Synchrone projectie voor kritieke gegevens

// Per alcune proiezioni critiche, aggiorna il read model nella stessa transazione
// del salvataggio degli eventi (sacrificando un po' di disaccoppiamento)
// Usato quando il client deve assolutamente vedere i dati immediatamente

@Transactional
public void saveWithSyncProjection(OrdineAggregate aggregate) {
    // 1. Salva eventi nell'Event Store
    List<DomainEvent> uncommitted = aggregate.getUncommittedEvents();
    eventStore.appendEvents(aggregate.getOrdineId(), uncommitted, ...);

    // 2. Aggiorna il read model nella STESSA transazione (sincrono)
    uncommitted.forEach(event -> {
        if (event instanceof OrdineEffettuatoEvent e) {
            ordiniReadRepo.save(OrdineReadModel.from(e));
        }
    });

    // 3. Pubblica gli eventi sul bus per le projection asincrone degli altri servizi
    eventBus.publish(uncommitted);

    aggregate.clearUncommittedEvents();
}

Projectie opnieuw opbouwen

Een van de grote voordelen van Event Sourcing is de mogelijkheid om opnieuw op te bouwen elke projectie helemaal opnieuw door alle gebeurtenissen vanaf het begin van de evenementenwinkel opnieuw te lezen. Hierdoor kunt u met terugwerkende kracht nieuwe leesmodellen toevoegen of bugs oplossen in bestaande projecties.

// ProjectionRebuilder.java - Ricostruisce una projection da zero
public class ProjectionRebuilder {

    private final EventStore eventStore;

    public void rebuild(ProjectionHandler handler, String fromAggregateType) {
        System.out.println("Avvio rebuild projection: " + handler.getClass().getSimpleName());

        // 1. Svuota il read model corrente
        handler.reset();

        // 2. Legge tutti gli eventi in ordine cronologico
        // (implementazione dipende dall'event store specifico)
        AtomicLong processed = new AtomicLong(0);

        eventStore.streamAllEvents(fromAggregateType, event -> {
            handler.handle(event);
            long count = processed.incrementAndGet();
            if (count % 1000 == 0) {
                System.out.println("Processati " + count + " eventi...");
            }
        });

        System.out.println("Rebuild completato: " + processed.get() + " eventi processati");
    }
}

Beste praktijken en antipatronen

Antipatroon: gebeurtenissen uit het verleden veranderen

Evenementen in de Event Store wel onveranderlijk. Bewerk ze nooit na het schrijven. Als een gebeurtenisschema moet worden gewijzigd, maakt u een nieuwe versie van de gebeurtenis (OrdineEffettuatoV2Event) en handel upcasting af in de deserializer. Het wijzigen van gebeurtenissen uit het verleden brengt de integriteit van het audittraject in gevaar en belemmert de herhaling.

  • Eén aggregaat per transactie: bewerk niet meerdere aggregaties in dezelfde werkeenheid. Gebruik Saga om geaggregeerde bewerkingen te organiseren.
  • Gebeurtenissen als een feit uit het verleden: Gebeurtenisnamen beschrijven iets dat al is gebeurd (OrdineEffettuatoEvent, Niet EffettuaOrdineEvent).
  • Idempotente projecties: Elke projectie-handler moet veilig meerdere keren kunnen worden aangeroepen met dezelfde gebeurtenis (minstens één keer levering).
  • Gebruik geen gebeurtenissen zoals Command: Het publiceren van een gebeurtenis mag niet rechtstreeks een opdracht op een ander aggregaat starten. Gebruik Saga of Process Manager.
  • Momentopnamebeleid op basis van het aantal gebeurtenissen: Maak elke 50-100 gebeurtenissen momentopnamen. Te frequent = serialisatie-overhead; te zeldzaam = langzame herhaling.

Volgende stappen in de serie

  • Artikel 5 – Saga-patroon: wanneer bij een bewerking meerdere aggregaten betrokken zijn of meerdere services, verwerkt het Saga Pattern gedistribueerde transacties zonder 2PC, met het compenseren van transacties in geval van mislukking.
  • Artikel 6 – AWS EventBridge: hoe u evenementen publiceert vanuit uw evenementenwinkel op EventBridge om Lambda, SQS en andere doelen op een serverloze manier te bereiken.

Link met andere series

  • Apache Kafka (serie 38): Kafka is de ideale Event Store en berichtenbus voor Event Sourcing in productie. De artikelen over Kafka Streams en Kafka Connect integreren direct met de hier beschreven patronen.
  • PostgreSQL AI en pgvector: het leesmodel van een projectie kan zijn een PostgreSQL-database met gespecialiseerde indexen, inclusief vectorkolommen voor gelijkenis semantiek op tekstuele beschrijvingen van bestellingen.