Exactly-Once-semantiek in Kafka: transacties en coördinatie
Exact-once-semantiek is het moeilijkst te implementeren in gedistribueerde systemen. In deze handleiding wordt uitgelegd hoe Kafka transacties tussen meerdere onderwerpen beheert, de rol van de transactiecoördinator en de implicaties voor de doorvoer.
De drie leveringsgaranties in Kafka
Voordat u precies één keer aan de slag gaat, is het essentieel om de drie leveringsgaranties te begrijpen die een gedistribueerd berichtensysteem kan bieden. In het geval van een netwerkstoring of een crash van een makelaar, het gedrag is radicaal anders:
Hoogstens één keer: Het bericht wordt maximaal één keer verzonden. Als er is een fout opgetreden, het bericht gaat verloren zonder opnieuw te proberen. Alleen geschikt voor statistieken geen kritiek of logbestanden met een hoog volume waarin enig verlies acceptabel is.
Minstens één keer: het bericht wordt minstens één keer afgeleverd, maar kan bij een nieuwe poging worden gedupliceerd. En de standaardgarantie van Kafka en e in de meeste gevallen voldoende als de consumenten idempotent zijn.
Precies één keer: Het bericht wordt precies één keer afgeleverd. Het is de moeilijkste garantie om te implementeren in gedistribueerde systemen en er zijn kosten aan verbonden in termen van latentie en doorvoer.
Wat je gaat leren
- Hoe de idempotente producent duplicaten op producentenniveau elimineert
- De Transactionele API: hoe u meerdere schrijfbewerkingen in een atomaire transactie kunt verwerken
- De rol van de Transactiecoördinator bij de Kafka-makelaar
- Precies één keer end-to-end: lezen-process-schrijven met Kafka Streams
- De implicaties voor de doorvoer en wanneer EOS echt nodig is
- Voltooi de Java-installatie voor productie met EOS ingeschakeld
Stap 1: De Idempotente Producent
De eerste laag van precies ounces en de idempotente producent, geïntroduceerd in Kafka 0.11. Het probleem dat het oplost: wanneer een producent een bericht en ontvangt geen bevestiging van de makelaar (time-out, netwerkstoring), hij weet niet of het bericht is ontvangen of niet. Bij een nieuwe poging kunnen er dus duplicaten ontstaan.
De idempotente producent lost dit op door elke producent een Producent-ID (PID) uniek en één volgnummer voor elk bericht. De makelaar houdt het laatst ontvangen volgnummer bij van elke PID en ontdubbelt automatisch de nieuwe pogingen.
// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all"); // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE); // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)
// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order-123",
"{\"orderId\": \"123\", \"amount\": 99.99}"
);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Inviato: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
// Con idempotenza, i retry sono sicuri: nessun duplicato
System.err.println("Errore invio: " + e.getMessage());
} finally {
producer.close();
}
Stap 2: De Transactionele API
De idempotente producent garandeert dat er geen duplicaten zijn voor de schrijfsels van een enkele producent. Maar wat als ik erover moet schrijven? twee onderwerpen tegelijk atomair? Bijvoorbeeld een lees-proces-schrijf dat:
- Lees van
topic-input - Schrijf het resultaat op
topic-output - Verbind de compensatie aan
__consumer_offsets
Als het tussen punt 2 en punt 3 crasht, is het bericht naar uitvoer ma geschreven de offset is niet vastgelegd: bij het opnieuw opstarten wordt hetzelfde bericht opnieuw verwerkt en geschreven een duplicaat. De Transactionele API lost dit op door iedereen te omhullen en drie stappen in een atomaire transactie.
// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");
// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();
// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
// INIZIA la transazione
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// Processa il messaggio
String processed = processOrder(record.value());
// SCRIVI sull'output topic (dentro la transazione)
producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
}
// COMMITTA gli offset del consumer DENTRO la transazione
// Questo garantisce che il commit dell'offset e la scrittura sull'output
// siano atomici: o entrambi succedono, o nessuno
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());
// COMMIT della transazione: ora sia gli offset che i messaggi output
// sono visibili ai consumer con isolation.level=read_committed
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Errori non recuperabili: termina il producer
producer.close();
throw e;
} catch (KafkaException e) {
// Errore recuperabile: abortisci la transazione e riprova
producer.abortTransaction();
// Il consumer rilegge i messaggi non committati alla prossima iterazione
}
}
De transactiecoördinator: hoe het werkt
Il Transactiecoördinator en een onderdeel van de Kafka-makelaar die
beheert de transactielevenscyclus. Elke transactie wordt geregistreerd
over een speciaal intern onderwerp: __transaction_state (gepartitioneerd door
schaalbaarheid, standaard 50 partities).
# Fasi di una transazione Kafka:
# 1. initTransactions() chiamato dal producer
# -> Transaction Coordinator assegna epoch al transactional.id
# -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
# - Termina la vecchia transazione (producer fencing)
# - Garantisce che una sola istanza sia attiva
# 2. beginTransaction() - solo lato producer (no network call)
# 3. send() per ogni messaggio
# -> Producer invia al partition leader con il PID e epoch
# -> Partition leader scrive in un "transaction buffer" (non ancora visibile)
# 4. sendOffsetsToTransaction()
# -> Transaction Coordinator registra quali consumer group offset
# fanno parte di questa transazione
# 5. commitTransaction()
# -> Producer invia PREPARE_COMMIT al Transaction Coordinator
# -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
# -> TC notifica tutti i partition leader coinvolti
# -> Partition leader scrivono COMMIT marker sul log
# -> I messaggi diventano visibili ai consumer read_committed
# -> TC scrive COMPLETE_COMMIT su __transaction_state
# 6. abortTransaction()
# -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
# -> Consumer read_committed non vedono mai i messaggi abortiti
# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
--list
# TransactionalId Producer Epoch State LastModifiedTime
# order-processor-0 5 Complete 2026-03-20 10:30:00
Producentschermen: bescherming tegen zombie-instanties
# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
# (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!
# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio
# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"
EOS end-to-end met Kafka-streams
Kafka-stromen (zie artikel 06 van de serie) ondersteunt EOS
native met een enkele configuratie: processing.guarantee=exactly_once_v2.
De bibliotheek beheert automatisch de transactionele producent, de consument
read_committed en het vastleggen van de compensaties in dezelfde transactie.
// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200); // default 100ms
StreamsBuilder builder = new StreamsBuilder();
// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");
inputStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> enrichOrder(value))
.peek((key, value) -> log.info("Processed order: {}", key))
.to("orders-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// close() con timeout attende il completamento delle transazioni in corso
streams.close(Duration.ofSeconds(10));
}));
streams.start();
Implicaties voor doorvoer en latentie
Precies één keer is niet gratis. Dit zijn de kosten gekwantificeerd in typische benchmarks:
# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):
# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s
# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)
# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch
# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione
# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer
Wanneer moet u EOS gebruiken en wanneer niet?
Checklist: EOS en noodzakelijk?
- Gebruik EOS voor: betalingen, financiële overdrachten, voorraadupdates, kritieke statusgebeurtenissen (order aangemaakt/geannuleerd) waarbij een duplicaat echte bedrijfsproblemen veroorzaakt.
- EOS is niet nodig voor: analyselogboeken, monitoringstatistieken, klik/bekijk evenementen waarbij enige duplicatie of verlies acceptabel is. Gebruik minstens één keer met idempotente consumenten.
- Alternatief voor EOS: consumenten idempotent maken (controleer of je hebt het bericht al verwerkt via een uniek ID in de database) + minimaal één keer. Vaak eenvoudiger en efficiënter dan EOS op Kafka-niveau.
Let op: EOS dekt geen externe bijwerkingen
Exactly-once in Kafka garandeert semantiek alleen binnen het Kafka-cluster. Als uw consument naar een externe database schrijft (PostgreSQL, Redis, enz.), Kafka EOS garandeert voor deze handelingen niets. Voor hybride systemen (Kafka + externe database), je hebt extra patronen nodig zoals Transactionele Outbox of Saga.
Conclusies
De precies-eens-semantiek in Kafka is een van de meest geavanceerde implementaties in de geschiedenis van gedistribueerde systemen. De idempotente producent elimineert duplicaten op netwerkniveau garandeert de Transactionele API atomiciteit tussen meerdere onderwerpen, en Kafka Streams vat dit alles samen in één enkele configuratie. De kosten in latentie en doorvoer en reëel maar beheersbaar met de juiste optimalisaties.
De complete serie: Apache Kafka
- Artikel 01 - Apache Kafka Fundamentals: onderwerpen, partities en consumentengroepen
- Artikel 02 — KRaft in Kafka 4.0: Tot ziens ZooKeeper
- Artikel 03 - Geavanceerde Kafka-producent en consument: Acks, Idempotency en Retry
- Artikel 04 (dit) - Exactly Once Semantiek in Kafka: transacties en coördinatie
- Artikel 05 — Schemaregister: Avro, Protobuf en Schema-evolutie
- Artikel 06 — Kafka-streams: KTable en Windowing
- Artikel 07 — Kafka Connect: Debezium CDC- en DB-integratie
- Artikel 08 — Kafka + Apache Flink: realtime pijplijnanalyse







