Semantyka dokładnie raz w Kafce: transakcje i koordynacja
Semantyka dokładnie raz jest najtrudniejsza do wdrożenia w systemach rozproszonych. W tym przewodniku wyjaśniono, w jaki sposób Kafka zarządza transakcjami między wieloma tematami i rolą koordynatora transakcji i wpływ na przepustowość.
Trzy gwarancje dostawy w Kafce
Zanim zajmiesz się tym dokładnie raz, ważne jest, aby zrozumieć trzy gwarancje dostawy jakie może zaoferować rozproszony system przesyłania wiadomości. W przypadku awarii sieci lub awaria brokera, zachowanie jest radykalnie odmienne:
Co najwyżej raz: Wiadomość jest wysyłana maksymalnie raz. Jeśli wystąpił błąd, wiadomość zostanie utracona bez ponownej próby. Nadaje się tylko do metryk żadnej krytyki ani logów o dużej objętości, gdzie akceptowalna jest pewna strata.
Przynajmniej raz: wiadomość została dostarczona przynajmniej raz, ale mogą zostać zduplikowane w przypadku ponownej próby. I domyślna gwarancja Kafki i e wystarczające w większości przypadków, jeśli konsumenci są idempotentni.
Dokładnie-raz: Wiadomość jest dostarczana dokładnie raz. Jest to gwarancja najtrudniejsza do wdrożenia w systemach rozproszonych i wiąże się z kosztami pod względem opóźnień i przepustowości.
Czego się nauczysz
- Jak idempotentny producent eliminuje duplikaty na poziomie producenta
- Transakcyjny interfejs API: jak zawijać wiele zapisów w transakcji atomowej
- Rola Koordynatora Transakcji w brokerze Kafka
- Dokładnie raz od końca do końca: odczyt, proces i zapis za pomocą strumieni Kafka
- Wpływ na przepustowość i kiedy EOS jest naprawdę potrzebny
- Zakończ konfigurację Java na potrzeby produkcyjne z włączoną obsługą EOS
Krok 1: Idempotentny producent
Pierwsza warstwa dokładnie uncji i idempotentny producent, wprowadzone w Kafce 0.11. Problem rozwiązuje: gdy producent wysyła plik wiadomość i nie otrzymuje potwierdzenia od brokera (przekroczenie limitu czasu, awaria sieci), nie wie czy wiadomość została odebrana, czy nie. Ponowna próba może zatem spowodować utworzenie duplikatów.
Idempotentny producent rozwiązuje ten problem, przypisując każdemu producentowi a Identyfikator producenta (PID) wyjątkowy i jedyny numer kolejny dla każdej wiadomości. Broker śledzi ostatni otrzymany numer kolejny z każdego PID i automatycznie deduplikuje ponowne próby.
// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all"); // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE); // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)
// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order-123",
"{\"orderId\": \"123\", \"amount\": 99.99}"
);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Inviato: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
// Con idempotenza, i retry sono sicuri: nessun duplicato
System.err.println("Errore invio: " + e.getMessage());
} finally {
producer.close();
}
Krok 2: Transakcyjne API
Idempotentny producent gwarantuje brak duplikatów zapisów jednego producenta. Ale co jeśli mam o tym pisać dwa tematy jednocześnie atomowo? Na przykład operacja odczytu-procesu-zapisu, która musi:
- Przeczytaj z
topic-input - Zapisz wynik
topic-output - Zatwierdź przesunięcie do
__consumer_offsets
Jeśli nastąpi awaria między punktem 2 i punktem 3, wiadomość została zapisana na wyjściu ma przesunięcie nie zostało zatwierdzone: po ponownym uruchomieniu ponownie przetwarza ten sam komunikat i zapisuje duplikat. The Transakcyjne API rozwiązuje ten problem, otaczając wszystkich i trzy kroki w transakcji atomowej.
// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");
// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();
// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
// INIZIA la transazione
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// Processa il messaggio
String processed = processOrder(record.value());
// SCRIVI sull'output topic (dentro la transazione)
producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
}
// COMMITTA gli offset del consumer DENTRO la transazione
// Questo garantisce che il commit dell'offset e la scrittura sull'output
// siano atomici: o entrambi succedono, o nessuno
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());
// COMMIT della transazione: ora sia gli offset che i messaggi output
// sono visibili ai consumer con isolation.level=read_committed
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Errori non recuperabili: termina il producer
producer.close();
throw e;
} catch (KafkaException e) {
// Errore recuperabile: abortisci la transazione e riprova
producer.abortTransaction();
// Il consumer rilegge i messaggi non committati alla prossima iterazione
}
}
Koordynator transakcji: jak to działa
Il Koordynator Transakcji i komponent brokera Kafka, który
zarządza cyklem życia transakcji. Każda transakcja jest rejestrowana
na specjalny temat wewnętrzny: __transaction_state (podzielone przez
skalowalność, domyślnie 50 partycji).
# Fasi di una transazione Kafka:
# 1. initTransactions() chiamato dal producer
# -> Transaction Coordinator assegna epoch al transactional.id
# -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
# - Termina la vecchia transazione (producer fencing)
# - Garantisce che una sola istanza sia attiva
# 2. beginTransaction() - solo lato producer (no network call)
# 3. send() per ogni messaggio
# -> Producer invia al partition leader con il PID e epoch
# -> Partition leader scrive in un "transaction buffer" (non ancora visibile)
# 4. sendOffsetsToTransaction()
# -> Transaction Coordinator registra quali consumer group offset
# fanno parte di questa transazione
# 5. commitTransaction()
# -> Producer invia PREPARE_COMMIT al Transaction Coordinator
# -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
# -> TC notifica tutti i partition leader coinvolti
# -> Partition leader scrivono COMMIT marker sul log
# -> I messaggi diventano visibili ai consumer read_committed
# -> TC scrive COMPLETE_COMMIT su __transaction_state
# 6. abortTransaction()
# -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
# -> Consumer read_committed non vedono mai i messaggi abortiti
# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
--list
# TransactionalId Producer Epoch State LastModifiedTime
# order-processor-0 5 Complete 2026-03-20 10:30:00
Producent Fencing: Ochrona przed instancjami zombie
# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
# (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!
# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio
# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"
EOS kompleksowo ze strumieniami Kafka
Strumienie Kafki (patrz artykuł 06 serii) obsługuje EOS
natywnie z pojedynczą konfiguracją: processing.guarantee=exactly_once_v2.
Biblioteka automatycznie zarządza producentem transakcyjnym, czyli konsumentem
read_committed i dokonanie offsetu w tej samej transakcji.
// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200); // default 100ms
StreamsBuilder builder = new StreamsBuilder();
// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");
inputStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> enrichOrder(value))
.peek((key, value) -> log.info("Processed order: {}", key))
.to("orders-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// close() con timeout attende il completamento delle transazioni in corso
streams.close(Duration.ofSeconds(10));
}));
streams.start();
Wpływ na przepustowość i opóźnienia
Dokładnie-raz nie jest darmowy. Oto koszty określone ilościowo w typowych benchmarkach:
# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):
# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s
# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)
# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch
# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione
# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer
Kiedy używać EOS, a kiedy nie
Lista kontrolna: EOS i konieczne?
- Użyj EOS-a na: płatności, przelewy finansowe, aktualizacje zapasów, zdarzenia o statusie krytycznym (zamówienie utworzone/anulowane) gdzie duplikat powoduje realne problemy biznesowe.
- Nie ma potrzeby stosowania EOS-u dla: logów analitycznych, metryk monitorujących, kliknij/wyświetl wydarzenia, w przypadku których dopuszczalne jest powielanie lub utrata. Użyj przynajmniej raz z idempotentnymi konsumentami.
- Alternatywa dla EOS-a: uczyń konsumentów idempotentnymi (sprawdź, czy przetworzyłeś już wiadomość poprzez unikalny identyfikator w bazie danych) + co najmniej raz. Często prostsze i wydajniejsze niż EOS na poziomie Kafki.
Uwaga: EOS nie obejmuje zewnętrznych skutków ubocznych
Dokładnie raz w Kafce gwarantuje semantykę tylko w obrębie klastra Kafki. Jeśli Twój konsument pisze do zewnętrznej bazy danych (PostgreSQL, Redis itp.), Kafka EOS nie gwarantuje niczego za te operacje. Do systemów hybrydowych (Kafka + zewnętrzna baza danych), potrzebujesz dodatkowych wzorców, np Transakcyjna skrzynka nadawcza lub Saga.
Wnioski
Semantyka dokładnie raz w Kafce jest jedną z najbardziej wyrafinowanych implementacji w historii systemów rozproszonych. Idempotentny producent eliminuje duplikaty na poziomie sieci Transactional API gwarantuje atomowość pomiędzy wieloma tematami, a Kafka Streams zamyka to wszystko w jednej konfiguracji. Koszt pod względem opóźnień i przepustowości i jest rzeczywisty, ale możliwy do zarządzania przy odpowiednich optymalizacjach.
Cała seria: Apache Kafka
- Artykuł 01 — Podstawy Apache Kafka: tematy, partycje i grupy konsumentów
- Artykuł 02 — KRaft w Kafce 4.0: Żegnaj ZooStrażniku
- Artykuł 03 — Zaawansowany producent i konsument Kafki: potwierdzenia, idempotencja i ponowna próba
- Artykuł 04 (ten) — Semantyka dokładnie raz w Kafce: transakcje i koordynacja
- Artykuł 05 — Rejestr schematów: Avro, Protobuf i Schema Evolution
- Artykuł 06 — Strumienie Kafki: KTable i Windowing
- Artykuł 07 — Kafka Connect: Integracja Debezium CDC i DB
- Artykuł 08 — Kafka + Apache Flink: Analiza rurociągów w czasie rzeczywistym







