Tři záruky dodání v Kafkovi

Než se pustíte do řešení přesně jednou, je nezbytné pochopit tři záruky dodání které může nabídnout systém distribuovaného zasílání zpráv. V případě výpadku sítě nebo zhroucení makléře, chování je radikálně odlišné:

Maximálně jednou: Zpráva je odeslána maximálně jednou. Pokud došlo k chybě, zpráva je ztracena bez opakování. Vhodné pouze pro metriky žádná kritika nebo velké objemy protokolů, kde je přijatelná určitá ztráta.

Alespoň jednou: zpráva je doručena alespoň jednou, ale v případě opakovaného pokusu mohou být duplikovány. A Kafkova záruka z prodlení a e pro většinu případů postačující, pokud jsou spotřebitelé idempotentní.

Přesně - jednou: Zpráva je doručena přesně jednou. Je to nejobtížněji implementovatelná záruka v distribuovaných systémech a stojí za to z hlediska latence a propustnosti.

Co se naučíte

  • Jak idempotentní producent eliminuje duplikáty na úrovni producenta
  • Transakční API: jak zabalit více zápisů do atomické transakce
  • Role Transaction Coordinator v Kafka broker
  • Přesně jednou end-to-end: čtení-proces-zápis s Kafka Streams
  • Důsledky propustnosti a kdy je EOS skutečně potřeba
  • Kompletní nastavení Java pro produkci s povoleným EOS

Krok 1: Idempotent Producer

První vrstva přesně-uncí a idempotentní výrobce, představeno v Kafkovi 0.11. Problém, který řeší: když výrobce pošle a zprávu a neobdrží ack od brokera (timeout, výpadek sítě), neví, zda byla zpráva přijata nebo ne. Opakovaný pokus proto může vytvořit duplikáty.

Idempotentní výrobce to řeší tak, že každému výrobci přidělí a ID producenta (PID) jedinečný a jeden pořadové číslo pro každou zprávu. Broker sleduje poslední přijaté pořadové číslo z každého PID a automaticky deduplikuje opakované pokusy.

// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all");          // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE);   // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)

// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",
    "order-123",
    "{\"orderId\": \"123\", \"amount\": 99.99}"
);

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.printf("Inviato: partition=%d, offset=%d%n",
        metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
    // Con idempotenza, i retry sono sicuri: nessun duplicato
    System.err.println("Errore invio: " + e.getMessage());
} finally {
    producer.close();
}

Krok 2: Transakční API

Idempotentní výrobce garantuje neduplikáty pro zápisy jediného výrobce. Ale co když o tom musím psát dvě témata zároveň atomově? Například čtení-proces-zápis, který musí:

  1. Číst z topic-input
  2. Napište výsledek na topic-output
  3. Potvrdit offset __consumer_offsets

Pokud dojde k chybě mezi bodem 2 a bodem 3, byla zpráva zapsána na výstup ma offset není potvrzen: po restartu znovu zpracuje stejnou zprávu a zapíše duplikát. The Transakční API řeší to tím, že všechny obalí a tři kroky v atomové transakci.

// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");

// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();

// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (records.isEmpty()) continue;

    try {
        // INIZIA la transazione
        producer.beginTransaction();

        for (ConsumerRecord<String, String> record : records) {
            // Processa il messaggio
            String processed = processOrder(record.value());

            // SCRIVI sull'output topic (dentro la transazione)
            producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
        }

        // COMMITTA gli offset del consumer DENTRO la transazione
        // Questo garantisce che il commit dell'offset e la scrittura sull'output
        // siano atomici: o entrambi succedono, o nessuno
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
        }
        producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());

        // COMMIT della transazione: ora sia gli offset che i messaggi output
        // sono visibili ai consumer con isolation.level=read_committed
        producer.commitTransaction();

    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // Errori non recuperabili: termina il producer
        producer.close();
        throw e;
    } catch (KafkaException e) {
        // Errore recuperabile: abortisci la transazione e riprova
        producer.abortTransaction();
        // Il consumer rilegge i messaggi non committati alla prossima iterazione
    }
}

Transakční koordinátor: Jak to funguje

Il Transakční koordinátor a součást brokera Kafka, která řídí životní cyklus transakce. Každá transakce je zaznamenána na speciální interní téma: __transaction_state (rozděleno podle škálovatelnost, ve výchozím nastavení 50 oddílů).

# Fasi di una transazione Kafka:

# 1. initTransactions() chiamato dal producer
#    -> Transaction Coordinator assegna epoch al transactional.id
#    -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
#       - Termina la vecchia transazione (producer fencing)
#       - Garantisce che una sola istanza sia attiva

# 2. beginTransaction() - solo lato producer (no network call)

# 3. send() per ogni messaggio
#    -> Producer invia al partition leader con il PID e epoch
#    -> Partition leader scrive in un "transaction buffer" (non ancora visibile)

# 4. sendOffsetsToTransaction()
#    -> Transaction Coordinator registra quali consumer group offset
#       fanno parte di questa transazione

# 5. commitTransaction()
#    -> Producer invia PREPARE_COMMIT al Transaction Coordinator
#    -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
#    -> TC notifica tutti i partition leader coinvolti
#    -> Partition leader scrivono COMMIT marker sul log
#    -> I messaggi diventano visibili ai consumer read_committed
#    -> TC scrive COMPLETE_COMMIT su __transaction_state

# 6. abortTransaction()
#    -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
#    -> Consumer read_committed non vedono mai i messaggi abortiti

# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
  --list
# TransactionalId     Producer Epoch  State        LastModifiedTime
# order-processor-0   5               Complete     2026-03-20 10:30:00

Producent Fencing: Ochrana před zombie instancemi

# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
#   (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!

# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio

# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"

EOS End-to-End s Kafka Streams

Kafkovy proudy (viz článek 06 série) podporuje EOS nativně s jedinou konfigurací: processing.guarantee=exactly_once_v2. Knihovna automaticky spravuje transakčního producenta, spotřebitele read_committed a provedení kompenzací ve stejné transakci.

// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
    StreamsConfig.EXACTLY_ONCE_V2);

// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200);  // default 100ms

StreamsBuilder builder = new StreamsBuilder();

// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");

inputStream
    .filter((key, value) -> value != null && !value.isEmpty())
    .mapValues(value -> enrichOrder(value))
    .peek((key, value) -> log.info("Processed order: {}", key))
    .to("orders-enriched");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);

// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    // close() con timeout attende il completamento delle transazioni in corso
    streams.close(Duration.ofSeconds(10));
}));

streams.start();

Důsledky na propustnost a latenci

Přesně jednou není zadarmo. Zde jsou náklady kvantifikované v typických benchmarkech:

# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):

# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s

# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)

# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch

# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione

# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);          # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer

Kdy používat EOS a kdy jej nepoužívat

Kontrolní seznam: EOS a nezbytné?

  • Použijte EOS pro: platby, finanční převody, aktualizace zásob, kritické stavové události (objednávka vytvořena/zrušena) kde duplikát způsobuje skutečné obchodní problémy.
  • Není potřeba EOS pro: analytické protokoly, sledování metrik, události kliknutí/zobrazení, u kterých je akceptovatelná určitá duplicita nebo ztráta. Použijte alespoň jednou s idempotentními spotřebiteli.
  • Alternativa k EOS: Udělejte ze spotřebitelů idempotentní (zkontrolujte, zda již jste zprávu zpracovali prostřednictvím jedinečného ID v databázi) + alespoň jednou. Často jednodušší a efektivnější než EOS na úrovni Kafky.

Poznámka: EOS nepokrývá vnější vedlejší účinky

Přesně jednou v Kafce zaručuje sémantiku pouze v rámci Kafkova clusteru. Pokud váš spotřebitel zapisuje do externí databáze (PostgreSQL, Redis atd.), Kafka EOS za tyto operace nic neručí. Pro hybridní systémy (Kafka + externí databáze), potřebujete další vzory jako Transakční Outbox nebo Saga.

Závěry

Sémantika přesně jednou v Kafkovi je jednou z nejsofistikovanějších implementací v historii distribuovaných systémů. Idempotentní výrobce eliminuje duplikáty na úrovni sítě Transakční API zaručuje atomicitu mezi více tématy, a Kafka Streams to vše zapouzdřuje do jediné konfigurace. Náklady v latenci a propustnosti a reálné, ale ovladatelné se správnými optimalizacemi.

Kompletní série: Apache Kafka

  • článek 01 — Apache Kafka Fundamentals: Témata, oddíly a skupiny spotřebitelů
  • článek 02 — KRaft in Kafka 4.0: Goodbye ZooKeeper
  • článek 03 — Advanced Kafka Producer and Consumer: Acks, Idempotency and Retry
  • článek 04 (tento) — Sémantika přesně jednou v Kafkovi: Transakce a koordinace
  • článek 05 — Registr schémat: Avro, Protobuf a Schema Evolution
  • článek 06 — Kafka Streams: KTable a Windowing
  • článek 07 — Kafka Connect: Debezium CDC a integrace DB
  • článek 08 — Kafka + Apache Flink: Pipeline Analytics v reálném čase