Kafka'da Tam Bir Kez Anlambilim: İşlemler ve Koordinasyon
Tam olarak bir kez anlambilimi, dağıtılmış sistemlerde uygulanması en zor olanıdır. Bu kılavuzda Kafka'nın birden çok konu arasındaki işlemleri nasıl yönettiği, rol İşlem koordinatörünün rolü ve verim üzerindeki etkileri.
Kafka'da Teslimin Üç Garantisi
Tam olarak bir kez uğraşmadan önce, üç teslimat garantisini anlamak önemlidir. dağıtılmış bir mesajlaşma sisteminin sunabileceği. Ağ arızası durumunda veya komisyoncunun çökmesi durumunda davranış kökten farklıdır:
En fazla bir kez: Mesaj en fazla bir kez gönderilir. Eğer bir hata varsa, mesaj tekrar denenmeden kaybolur. Yalnızca metrikler için uygundur bir miktar kaybın kabul edilebilir olduğu durumlarda eleştiri veya yüksek hacimli kayıt yok.
En az bir kez: mesaj en az bir kez iletildi ancak yeniden deneme durumunda çoğaltılabilir. Ve Kafka'nın temerrüt garantisi ve e Tüketicilerin idempotent olması durumunda çoğu durumda yeterlidir.
Tam olarak bir kez: Mesaj tam olarak bir kez iletilir. Dağıtılmış sistemlerde uygulanması en zor garantidir ve bir maliyeti vardır. gecikme ve verim açısından.
Ne Öğreneceksiniz
- İdempotent üretici, üretici düzeyinde kopyaları nasıl ortadan kaldırır?
- İşlem API'si: atomik bir işlemde birden fazla yazmanın nasıl sarılacağı
- Kafka komisyoncusunda İşlem Koordinatörünün rolü
- Tam olarak bir kez uçtan uca: Kafka Akışları ile okuma-işleme-yazma
- Üretimin etkileri ve EOS'a gerçekten ne zaman ihtiyaç duyulduğu
- EOS etkinken üretim için Java kurulumunu tamamlayın
Adım 1: Idempotent Yapımcı
Tam onsluk ilk katman ve iktidarsız yapımcı, Kafka 0.11'de tanıtıldı. Çözdüğü sorun: Bir yapımcı bir mesajı alır ve komisyoncudan onay almaz (zaman aşımı, ağ hatası), mesajın alınıp alınmadığı. Bu nedenle yeniden deneme kopyalar oluşturabilir.
İdempotent üretici bunu her üreticiye bir görev atayarak çözer. Üretici Kimliği (PID) benzersiz ve bir sıra numarası her mesaj için. Broker alınan son sıra numarasını takip eder her PID'den gelir ve yeniden denemeleri otomatik olarak tekilleştirir.
// Configurazione producer idempotente in Java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// ABILITA producer idempotente
// Questo richiede automaticamente: acks=all, retries=MAX_INT, max.in.flight.requests.per.connection <= 5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Queste impostazioni vengono forzate automaticamente con idempotenza:
// props.put(ProducerConfig.ACKS_CONFIG, "all"); // auto
// props.put(ProducerConfig.RETRIES_CONFIG, MAX_VALUE); // auto
// props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // auto (max)
// Timeout per la risposta del broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// L'uso e identico a un producer non-idempotente
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
"order-123",
"{\"orderId\": \"123\", \"amount\": 99.99}"
);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Inviato: partition=%d, offset=%d%n",
metadata.partition(), metadata.offset());
} catch (ExecutionException | InterruptedException e) {
// Con idempotenza, i retry sono sicuri: nessun duplicato
System.err.println("Errore invio: " + e.getMessage());
} finally {
producer.close();
}
Adım 2: İşlemsel API
İdempotent üretici, tek bir üreticinin yazılarının kopyalanmayacağını garanti eder. Peki ya bunun hakkında yazmam gerekirse iki konu aynı anda atomik olarak mı? Örneğin, bir okuma-işlem-yazma işlemi şunları sağlamalıdır:
- Şuradan okuyun:
topic-input - Sonucu yaz
topic-output - Ofseti şuna taahhüt et:
__consumer_offsets
2. nokta ile 3. nokta arasında çökerse, mesaj çıktı ma'ya yazıldı. ofset kaydedilmez: yeniden başlatıldığında aynı mesajı yeniden işler ve yazar bir kopya. İşlemsel API herkesi sararak bu sorunu çözer ve atomik bir işlemde üç adım.
// Producer transazionale completo
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// OBBLIGATORIO per transazioni: transactional.id univoco per ogni istanza producer
// Formato consigliato: {app-name}-{partition-number}
// IMPORTANTE: due producer con lo stesso transactional.id non possono essere attivi
// contemporaneamente (il secondo termina la transazione del primo - "fencing")
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-0");
// Il producer transazionale abilita automaticamente l'idempotenza
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// OBBLIGATORIO: inizializza le transazioni PRIMA di qualsiasi operazione
// Questo registra il producer con il Transaction Coordinator
producer.initTransactions();
// Consumer per leggere l'input
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
// CRITICO per EOS: disabilita il commit automatico degli offset
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Leggi solo messaggi "committed" (non messaggi in transazione in corso)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("orders-input"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
try {
// INIZIA la transazione
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
// Processa il messaggio
String processed = processOrder(record.value());
// SCRIVI sull'output topic (dentro la transazione)
producer.send(new ProducerRecord<>("orders-processed", record.key(), processed));
}
// COMMITTA gli offset del consumer DENTRO la transazione
// Questo garantisce che il commit dell'offset e la scrittura sull'output
// siano atomici: o entrambi succedono, o nessuno
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsetsToCommit, consumer.groupMetadata());
// COMMIT della transazione: ora sia gli offset che i messaggi output
// sono visibili ai consumer con isolation.level=read_committed
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Errori non recuperabili: termina il producer
producer.close();
throw e;
} catch (KafkaException e) {
// Errore recuperabile: abortisci la transazione e riprova
producer.abortTransaction();
// Il consumer rilegge i messaggi non committati alla prossima iterazione
}
}
İşlem Koordinatörü: Nasıl Çalışır?
Il İşlem Koordinatörü ve Kafka komisyoncusunun bir bileşeni
işlem yaşam döngüsünü yönetir. Her işlem kayıt altına alınır
özel bir iç konu hakkında: __transaction_state (bölümlendirilmiş
ölçeklenebilirlik, varsayılan olarak 50 bölüm).
# Fasi di una transazione Kafka:
# 1. initTransactions() chiamato dal producer
# -> Transaction Coordinator assegna epoch al transactional.id
# -> Se c'era una transazione in corso con lo stesso ID (producer precedente):
# - Termina la vecchia transazione (producer fencing)
# - Garantisce che una sola istanza sia attiva
# 2. beginTransaction() - solo lato producer (no network call)
# 3. send() per ogni messaggio
# -> Producer invia al partition leader con il PID e epoch
# -> Partition leader scrive in un "transaction buffer" (non ancora visibile)
# 4. sendOffsetsToTransaction()
# -> Transaction Coordinator registra quali consumer group offset
# fanno parte di questa transazione
# 5. commitTransaction()
# -> Producer invia PREPARE_COMMIT al Transaction Coordinator
# -> TC scrive PREPARE_COMMIT su __transaction_state (durable)
# -> TC notifica tutti i partition leader coinvolti
# -> Partition leader scrivono COMMIT marker sul log
# -> I messaggi diventano visibili ai consumer read_committed
# -> TC scrive COMPLETE_COMMIT su __transaction_state
# 6. abortTransaction()
# -> TC scrive ABORT marker: i messaggi in buffer sono ignorati
# -> Consumer read_committed non vedono mai i messaggi abortiti
# Verificare lo stato delle transazioni (Kafka CLI):
kafka-transactions.sh --bootstrap-server kafka-1:9092 \
--list
# TransactionalId Producer Epoch State LastModifiedTime
# order-processor-0 5 Complete 2026-03-20 10:30:00
Yapımcı Eskrim: Zombi Örneklerinden Koruma
# Il fencing protegge da scenari di split-brain:
# - Producer A inizia una transazione (transactional.id="proc-0", epoch=5)
# - Producer A va in pausa (GC pause, network partition)
# - Il sistema avvia Producer B con lo stesso transactional.id
# - Producer B chiama initTransactions(): TC bumpa epoch a 6, invalida A
# - Producer A riprende e prova a fare commit: errore ProducerFencedException
# (epoch 5 < epoch corrente 6)
# - Producer A non puo piu committare: safe!
# Implicazione pratica:
# Non usare lo stesso transactional.id per piu istanze simultanee
# In Kubernetes: usa il pod name come parte del transactional.id
# Ma: se un pod muore e un nuovo pod prende lo stesso name,
# il nuovo pod "recinta" correttamente il vecchio
# Configurazione consigliata per Kubernetes:
TRANSACTIONAL_ID="${APP_NAME}-${KAFKA_PARTITION}-${POD_NAME}"
# Oppure usa l'indice del pod nel StatefulSet:
TRANSACTIONAL_ID="${APP_NAME}-${POD_INDEX}"
EOS Kafka Akışlarıyla Uçtan Uca
Kafka Akışları (bkz. serinin 06. Maddesi) EOS'u destekliyor
yerel olarak tek bir yapılandırmayla: processing.guarantee=exactly_once_v2.
Kütüphane, işlemsel üreticiyi ve tüketiciyi otomatik olarak yönetir
read_committed ve mahsupların aynı işlemde gerçekleştirilmesi.
// Kafka Streams con EOS abilitato
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-enrichment-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// EOS v2: piu efficiente di v1 (introdotto in Kafka 2.5)
// v2 riduce il numero di transazioni necessarie rispetto a v1
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Con EOS, il commit interval deve essere maggiore (overhead transazionale)
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 200); // default 100ms
StreamsBuilder builder = new StreamsBuilder();
// Pipeline EOS: ogni record viene processato esattamente una volta
KStream<String, String> inputStream = builder.stream("orders-raw");
inputStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> enrichOrder(value))
.peek((key, value) -> log.info("Processed order: {}", key))
.to("orders-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
// Gestione graceful shutdown (importante per transazioni)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// close() con timeout attende il completamento delle transazioni in corso
streams.close(Duration.ofSeconds(10));
}));
streams.start();
Verim ve Gecikme Üzerindeki Etkiler
Tam olarak bir kez ücretsiz değil. Tipik karşılaştırmalı değerlendirmelerde ölçülen maliyetler şunlardır:
# Benchmark approssimativo (Kafka 3.x/4.x su hardware standard):
# Throughput (messaggi/sec, payload 1KB):
# At-least-once (acks=all, no transactions):
# Producer: ~200.000 msg/s
# Consumer: ~500.000 msg/s
# Exactly-once con Transactional API:
# Producer: ~100.000 msg/s (-50%)
# Consumer (read_committed): ~400.000 msg/s (-20%)
# Latenza P99 aggiuntiva:
# initTransactions(): una tantum al startup (~100ms)
# beginTransaction() -> commitTransaction(): +5-15ms per batch
# Overhead dipende da: numero partizioni coinvolte, dimensione batch
# Ottimizzazioni per ridurre l'overhead EOS:
# 1. Aumenta linger.ms e batch.size per ridurre il numero di transazioni
# 2. Aumenta commit.interval.ms in Kafka Streams
# 3. Usa EXACTLY_ONCE_V2 (non v1): riduce le transazioni del 50%
# 4. Minimizza il numero di partizioni per transazione
# Configurazioni consigliate per EOS in produzione:
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); # accoda per 5ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); # batch 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); # 64MB buffer
EOS Ne Zaman Kullanılmalı ve Ne Zaman Kullanılmamalı
Kontrol Listesi: EOS ve Gerekli mi?
- EOS'u kullanın şunlar için: ödemeler, mali transferler, envanter güncellemeleri, kritik durum olayları (sipariş oluşturuldu/iptal edildi) bir kopyanın gerçek iş sorunlarına neden olduğu yer.
- EOS'a gerek yok Şunlar için: analiz günlükleri, ölçümleri izleme, bazı kopyaların veya kayıpların kabul edilebilir olduğu olayları tıklayın/görüntüleyin. Kullanım en az bir kez idempotent tüketicilerle.
- EOS'a alternatif: Tüketicileri bağımsız hale getirin (kontrol edin) mesajı veritabanındaki benzersiz bir kimlik aracılığıyla zaten işlediniz) + en az bir kez. Kafka düzeyinde genellikle EOS'tan daha basit ve daha verimlidir.
Lütfen unutmayın: EOS harici yan etkileri kapsamaz
Tam olarak bir kez Kafka'da anlambilimi yalnızca Kafka kümesi içinde garanti eder. Tüketiciniz harici bir veritabanına (PostgreSQL, Redis vb.) yazıyorsa, Kafka EOS bu işlemlere ilişkin hiçbir garanti vermez. Hibrit sistemler için (Kafka + harici veritabanı), aşağıdaki gibi ek kalıplara ihtiyacınız vardır: İşlemsel Giden Kutusu veya Saga.
Sonuçlar
Kafka'daki tam olarak bir kez anlambilimi en karmaşık uygulamalardan biridir dağıtılmış sistemlerin tarihinde. İdempotent üretici kopyaları ortadan kaldırır Ağ düzeyinde, Transactional API birden fazla konu arasında atomikliği garanti eder, ve Kafka Streams tüm bunları tek bir konfigürasyonda kapsıyor. Maliyet gecikme ve verimlilik açısından gerçektir ancak doğru optimizasyonlarla yönetilebilir.
Serinin Tamamı: Apache Kafka
- Madde 01 — Apache Kafka Temelleri: Konular, Bölümler ve Tüketici Grupları
- Madde 02 — Kafka 4.0'da KRaft: Elveda ZooKeeper
- Madde 03 — İleri Kafka Üreticisi ve Tüketicisi: Acks, Idempotency ve Retry
- Madde 04 (bu) — Kafka'da Tam Bir Kez Anlambilim: İşlemler ve Koordinasyon
- Madde 05 — Şema Kaydı: Avro, Protobuf ve Schema Evolution
- Madde 06 — Kafka Akışları: KTable ve Pencereleme
- Madde 07 — Kafka Connect: Debezium CDC ve Veritabanı Entegrasyonu
- Madde 08 — Kafka + Apache Flink: Gerçek Zamanlı İşlem Hattı Analitiği







