Sémantika Exactly-Once in Kafka: Transactions and Coordination
Sémantika přesně jednou je nejobtížnější na implementaci v distribuovaných systémech. Tato příručka vysvětluje, jak Kafka spravuje transakce mezi více tématy, rolemi transakčního koordinátora a dopady na propustnost.
Tři záruky dodání v Kafkovi
Než se pustíte do řešení přesně jednou, je nezbytné pochopit tři záruky dodání které může nabídnout systém distribuovaného zasílání zpráv. V případě výpadku sítě nebo zhroucení makléře, chování je radikálně odlišné:
Maximálně jednou: Zpráva je odeslána maximálně jednou. Pokud došlo k chybě, zpráva je ztracena bez opakování. Vhodné pouze pro metriky žádná kritika nebo velké objemy protokolů, kde je přijatelná určitá ztráta.
Alespoň jednou: zpráva je doručena alespoň jednou, ale v případě opakovaného pokusu mohou být duplikovány. A Kafkova záruka z prodlení a e pro většinu případů postačující, pokud jsou spotřebitelé idempotentní.
Přesně - jednou: Zpráva je doručena přesně jednou. Je to nejobtížněji implementovatelná záruka v distribuovaných systémech a stojí za to z hlediska latence a propustnosti.
Co se naučíte
- Jak idempotentní producent eliminuje duplikáty na úrovni producenta
- Transakční API: jak zabalit více zápisů do atomické transakce
- Role Transaction Coordinator v Kafka broker
- Přesně jednou end-to-end: čtení-proces-zápis s Kafka Streams
- Důsledky propustnosti a kdy je EOS skutečně potřeba
- Kompletní nastavení Java pro produkci s povoleným EOS
Krok 1: Idempotent Producer
První vrstva přesně-uncí a idempotentní výrobce, představeno v Kafkovi 0.11. Problém, který řeší: když výrobce pošle a zprávu a neobdrží ack od brokera (timeout, výpadek sítě), neví, zda byla zpráva přijata nebo ne. Opakovaný pokus proto může vytvořit duplikáty.
Idempotentní výrobce to řeší tak, že každému výrobci přidělí a ID producenta (PID) jedinečný a jeden pořadové číslo pro každou zprávu. Broker sleduje poslední přijaté pořadové číslo z každého PID a automaticky deduplikuje opakované pokusy.
// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all"); // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE); // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)
// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order-123",
"{\"orderId\": \"123\", \"amount\": 99.99}"
);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Inviato: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
// Con idempotenza, i retry sono sicuri: nessun duplicato
System.err.println("Errore invio: " + e.getMessage());
} finally {
producer.close();
}
Krok 2: Transakční API
Idempotentní výrobce garantuje neduplikáty pro zápisy jediného výrobce. Ale co když o tom musím psát dvě témata zároveň atomově? Například čtení-proces-zápis, který musí:
- Číst z
topic-input - Napište výsledek na
topic-output - Potvrdit offset
__consumer_offsets
Pokud dojde k chybě mezi bodem 2 a bodem 3, byla zpráva zapsána na výstup ma offset není potvrzen: po restartu znovu zpracuje stejnou zprávu a zapíše duplikát. The Transakční API řeší to tím, že všechny obalí a tři kroky v atomové transakci.
// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");
// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();
// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
// INIZIA la transazione
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// Processa il messaggio
String processed = processOrder(record.value());
// SCRIVI sull'output topic (dentro la transazione)
producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
}
// COMMITTA gli offset del consumer DENTRO la transazione
// Questo garantisce che il commit dell'offset e la scrittura sull'output
// siano atomici: o entrambi succedono, o nessuno
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());
// COMMIT della transazione: ora sia gli offset che i messaggi output
// sono visibili ai consumer con isolation.level=read_committed
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Errori non recuperabili: termina il producer
producer.close();
throw e;
} catch (KafkaException e) {
// Errore recuperabile: abortisci la transazione e riprova
producer.abortTransaction();
// Il consumer rilegge i messaggi non committati alla prossima iterazione
}
}
Transakční koordinátor: Jak to funguje
Il Transakční koordinátor a součást brokera Kafka, která
řídí životní cyklus transakce. Každá transakce je zaznamenána
na speciální interní téma: __transaction_state (rozděleno podle
škálovatelnost, ve výchozím nastavení 50 oddílů).
# Fasi di una transazione Kafka:
# 1. initTransactions() chiamato dal producer
# -> Transaction Coordinator assegna epoch al transactional.id
# -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
# - Termina la vecchia transazione (producer fencing)
# - Garantisce che una sola istanza sia attiva
# 2. beginTransaction() - solo lato producer (no network call)
# 3. send() per ogni messaggio
# -> Producer invia al partition leader con il PID e epoch
# -> Partition leader scrive in un "transaction buffer" (non ancora visibile)
# 4. sendOffsetsToTransaction()
# -> Transaction Coordinator registra quali consumer group offset
# fanno parte di questa transazione
# 5. commitTransaction()
# -> Producer invia PREPARE_COMMIT al Transaction Coordinator
# -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
# -> TC notifica tutti i partition leader coinvolti
# -> Partition leader scrivono COMMIT marker sul log
# -> I messaggi diventano visibili ai consumer read_committed
# -> TC scrive COMPLETE_COMMIT su __transaction_state
# 6. abortTransaction()
# -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
# -> Consumer read_committed non vedono mai i messaggi abortiti
# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
--list
# TransactionalId Producer Epoch State LastModifiedTime
# order-processor-0 5 Complete 2026-03-20 10:30:00
Producent Fencing: Ochrana před zombie instancemi
# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
# (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!
# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio
# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"
EOS End-to-End s Kafka Streams
Kafkovy proudy (viz článek 06 série) podporuje EOS
nativně s jedinou konfigurací: processing.guarantee=exactly_once_v2.
Knihovna automaticky spravuje transakčního producenta, spotřebitele
read_committed a provedení kompenzací ve stejné transakci.
// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200); // default 100ms
StreamsBuilder builder = new StreamsBuilder();
// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");
inputStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> enrichOrder(value))
.peek((key, value) -> log.info("Processed order: {}", key))
.to("orders-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// close() con timeout attende il completamento delle transazioni in corso
streams.close(Duration.ofSeconds(10));
}));
streams.start();
Důsledky na propustnost a latenci
Přesně jednou není zadarmo. Zde jsou náklady kvantifikované v typických benchmarkech:
# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):
# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s
# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)
# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch
# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione
# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer
Kdy používat EOS a kdy jej nepoužívat
Kontrolní seznam: EOS a nezbytné?
- Použijte EOS pro: platby, finanční převody, aktualizace zásob, kritické stavové události (objednávka vytvořena/zrušena) kde duplikát způsobuje skutečné obchodní problémy.
- Není potřeba EOS pro: analytické protokoly, sledování metrik, události kliknutí/zobrazení, u kterých je akceptovatelná určitá duplicita nebo ztráta. Použijte alespoň jednou s idempotentními spotřebiteli.
- Alternativa k EOS: Udělejte ze spotřebitelů idempotentní (zkontrolujte, zda již jste zprávu zpracovali prostřednictvím jedinečného ID v databázi) + alespoň jednou. Často jednodušší a efektivnější než EOS na úrovni Kafky.
Poznámka: EOS nepokrývá vnější vedlejší účinky
Přesně jednou v Kafce zaručuje sémantiku pouze v rámci Kafkova clusteru. Pokud váš spotřebitel zapisuje do externí databáze (PostgreSQL, Redis atd.), Kafka EOS za tyto operace nic neručí. Pro hybridní systémy (Kafka + externí databáze), potřebujete další vzory jako Transakční Outbox nebo Saga.
Závěry
Sémantika přesně jednou v Kafkovi je jednou z nejsofistikovanějších implementací v historii distribuovaných systémů. Idempotentní výrobce eliminuje duplikáty na úrovni sítě Transakční API zaručuje atomicitu mezi více tématy, a Kafka Streams to vše zapouzdřuje do jediné konfigurace. Náklady v latenci a propustnosti a reálné, ale ovladatelné se správnými optimalizacemi.
Kompletní série: Apache Kafka
- článek 01 — Apache Kafka Fundamentals: Témata, oddíly a skupiny spotřebitelů
- článek 02 — KRaft in Kafka 4.0: Goodbye ZooKeeper
- článek 03 — Advanced Kafka Producer and Consumer: Acks, Idempotency and Retry
- článek 04 (tento) — Sémantika přesně jednou v Kafkovi: Transakce a koordinace
- článek 05 — Registr schémat: Avro, Protobuf a Schema Evolution
- článek 06 — Kafka Streams: KTable a Windowing
- článek 07 — Kafka Connect: Debezium CDC a integrace DB
- článek 08 — Kafka + Apache Flink: Pipeline Analytics v reálném čase







