Semantică exact o dată în Kafka: tranzacții și coordonare
Semantica exact o dată este cel mai dificil de implementat în sistemele distribuite. Acest ghid explică modul în care Kafka gestionează tranzacțiile între mai multe subiecte, rolul a coordonatorului tranzacției și implicațiile asupra debitului.
Cele trei garanții de livrare în Kafka
Înainte de a aborda exact o dată, este esențial să înțelegeți cele trei garanții de livrare pe care le poate oferi un sistem de mesagerie distribuit. În cazul defecțiunii rețelei sau accidentul brokerului, comportamentul este radical diferit:
Cel mult-o dată: mesajul este trimis maximum o dată. Dacă există o eroare, mesajul se pierde fără a reîncerca. Potrivit numai pentru valori nicio critică sau jurnal de volum mare în cazul în care unele pierderi sunt acceptabile.
Măcar o dată: mesajul este livrat cel putin o data dar poate fi duplicat în caz de reîncercare. Și garanția implicită a lui Kafka și e suficient pentru majoritatea cazurilor dacă consumatorii sunt idempotenți.
Exact-o dată: Mesajul este livrat exact o dată. Este cea mai dificilă garanție de implementat în sistemele distribuite și are un cost în termeni de latență și debit.
Ce vei învăța
- Cum producătorul idempotent elimină duplicatele la nivel de producător
- API-ul tranzacțional: cum să încapsulați mai multe scrieri într-o tranzacție atomică
- Rolul Coordonatorului Tranzacțiilor în brokerul Kafka
- Exact o dată de la capăt la capăt: citire-procesare-scriere cu Kafka Streams
- Implicațiile de debit și când EOS este cu adevărat necesar
- Configurare Java completă pentru producție cu EOS activat
Pasul 1: Producătorul idempotent
Primul strat de exact-uncii și producător idempotent, introdus în Kafka 0.11. Problema pe care o rezolvă: când un producător trimite un mesaj și nu primește confirmarea de la broker (timeout, defecțiune de rețea), el nu știe dacă mesajul a fost primit sau nu. Prin urmare, reîncercarea poate crea duplicate.
Producătorul idempotent rezolvă acest lucru atribuind fiecărui producător a ID producător (PID) unic și unul numărul de ordine pentru fiecare mesaj. Brokerul ține evidența ultimului număr de secvență primit din fiecare PID și deduplica automat reîncercările.
// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all"); // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE); // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)
// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order-123",
"{\"orderId\": \"123\", \"amount\": 99.99}"
);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Inviato: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
// Con idempotenza, i retry sono sicuri: nessun duplicato
System.err.println("Errore invio: " + e.getMessage());
} finally {
producer.close();
}
Pasul 2: API-ul tranzacțional
Producătorul idempotent garantează că nu sunt duplicate pentru scrierile unui singur producător. Dar dacă trebuie să scriu despre asta doua subiecte in acelasi timp atomic? De exemplu, un proces de citire-scriere care trebuie:
- Citiți din
topic-input - Scrie rezultatul pe
topic-output - Compensați compensarea
__consumer_offsets
Dacă se blochează între punctul 2 și punctul 3, mesajul a fost scris în ieșirea ma offset-ul nu este comis: la repornire, reprocesează același mesaj și scrie un duplicat. The API tranzacțional rezolvă acest lucru învăluind pe toată lumea și trei pași într-o tranzacție atomică.
// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");
// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();
// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
// INIZIA la transazione
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// Processa il messaggio
String processed = processOrder(record.value());
// SCRIVI sull'output topic (dentro la transazione)
producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
}
// COMMITTA gli offset del consumer DENTRO la transazione
// Questo garantisce che il commit dell'offset e la scrittura sull'output
// siano atomici: o entrambi succedono, o nessuno
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());
// COMMIT della transazione: ora sia gli offset che i messaggi output
// sono visibili ai consumer con isolation.level=read_committed
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Errori non recuperabili: termina il producer
producer.close();
throw e;
} catch (KafkaException e) {
// Errore recuperabile: abortisci la transazione e riprova
producer.abortTransaction();
// Il consumer rilegge i messaggi non committati alla prossima iterazione
}
}
Coordonatorul tranzacțiilor: cum funcționează
Il Coordonator tranzactie şi o componentă a brokerului Kafka care
gestionează ciclul de viață al tranzacției. Fiecare tranzacție este înregistrată
pe un subiect intern special: __transaction_state (compartit de
scalabilitate, 50 de partiții în mod implicit).
# Fasi di una transazione Kafka:
# 1. initTransactions() chiamato dal producer
# -> Transaction Coordinator assegna epoch al transactional.id
# -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
# - Termina la vecchia transazione (producer fencing)
# - Garantisce che una sola istanza sia attiva
# 2. beginTransaction() - solo lato producer (no network call)
# 3. send() per ogni messaggio
# -> Producer invia al partition leader con il PID e epoch
# -> Partition leader scrive in un "transaction buffer" (non ancora visibile)
# 4. sendOffsetsToTransaction()
# -> Transaction Coordinator registra quali consumer group offset
# fanno parte di questa transazione
# 5. commitTransaction()
# -> Producer invia PREPARE_COMMIT al Transaction Coordinator
# -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
# -> TC notifica tutti i partition leader coinvolti
# -> Partition leader scrivono COMMIT marker sul log
# -> I messaggi diventano visibili ai consumer read_committed
# -> TC scrive COMPLETE_COMMIT su __transaction_state
# 6. abortTransaction()
# -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
# -> Consumer read_committed non vedono mai i messaggi abortiti
# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
--list
# TransactionalId Producer Epoch State LastModifiedTime
# order-processor-0 5 Complete 2026-03-20 10:30:00
Producer Fencing: Protecție împotriva instanțelor zombie
# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
# (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!
# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio
# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"
EOS end-to-end cu Kafka Streams
Kafka Streams (vezi articolul 06 din serie) acceptă EOS
nativ cu o singură configurație: processing.guarantee=exactly_once_v2.
Biblioteca gestionează automat producătorul tranzacțional, consumatorul
read_committed și comiterea compensațiilor în aceeași tranzacție.
// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200); // default 100ms
StreamsBuilder builder = new StreamsBuilder();
// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");
inputStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> enrichOrder(value))
.peek((key, value) -> log.info("Processed order: {}", key))
.to("orders-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// close() con timeout attende il completamento delle transazioni in corso
streams.close(Duration.ofSeconds(10));
}));
streams.start();
Implicații asupra debitului și a latenței
Exact-o dată nu este gratuit. Iată costurile cuantificate în benchmark-uri tipice:
# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):
# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s
# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)
# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch
# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione
# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer
Când să folosiți EOS și când să nu îl utilizați
Lista de verificare: EOS și este necesar?
- Utilizați EOS pentru: plăți, transferuri financiare, actualizări de inventar, evenimente de stare critică (comandă creată/anulată) unde un duplicat cauzează probleme reale de afaceri.
- Nu este nevoie de EOS pentru: jurnalele de analiză, valorile de monitorizare, faceți clic/vedeți evenimente în care sunt acceptabile dublarea sau pierderea. Utilizați cel puțin o dată cu consumatorii idempotenți.
- Alternativă la EOS: Faceți consumatorii idempotenți (verificați dacă ați procesat deja mesajul printr-un ID unic în baza de date) + cel puțin o dată. Adesea mai simplu și mai eficient decât EOS la nivelul Kafka.
Vă rugăm să rețineți: EOS nu acoperă efectele secundare externe
Exact-once in Kafka garantează semantica numai în cadrul clusterului Kafka. Dacă consumatorul dvs. scrie într-o bază de date externă (PostgreSQL, Redis etc.), Kafka EOS nu garantează nimic pentru aceste operațiuni. Pentru sisteme hibride (Kafka + bază de date externă), aveți nevoie de modele suplimentare precum Căsuță de ieșire tranzacțională sau Saga.
Concluzii
Semantica exact o dată din Kafka este una dintre cele mai sofisticate implementări în istoria sistemelor distribuite. Producătorul idempotent elimină duplicatele la nivel de rețea, API-ul tranzacțional garantează atomicitatea între mai multe subiecte, iar Kafka Streams încapsulează toate acestea într-o singură configurație. Costul în latență și debit și real, dar gestionabil cu optimizările potrivite.
Seria completă: Apache Kafka
- Articolul 01 — Fundamentele Apache Kafka: subiecte, partiții și grupuri de consumatori
- Articolul 02 — KRaft în Kafka 4.0: La revedere ZooKeeper
- Articolul 03 — Producător și consumator avansat Kafka: Acks, Idempotency și Retry
- Articolul 04 (acest) — Exact-Once Semantics în Kafka: Tranzacții și coordonare
- Articolul 05 — Schema Registry: Avro, Protobuf și Schema Evolution
- Articolul 06 — Fluxuri Kafka: KTable și Windowing
- Articolul 07 — Kafka Connect: Debezium CDC și integrare DB
- Articolul 08 — Kafka + Apache Flink: Pipeline Analytics în timp real







