Kafka'da Teslim Edilmeyen Mektup Kuyruğu ve Hata Yönetimi
Dağıtılmış bir sistemde mesajlar başarısız olur. Bozuk seri durumdan çıkarma, iş mantığı atma istisnası, Alt hizmete ulaşılamıyor; tüm bu durumlarda tüketici ne yapacağına karar vermelidir. Bu kılavuz şunları kapsar: Kafka'da hata işlemenin temel kalıpları: Teslim Edilmeyen Mektup Kuyruğu, geri çekilme ile üstel yeniden deneme, zehirli hap tespiti ve yeniden işleme stratejileri.
Kafka'da Hata Sorunu
Hata durumunda mesajın otomatik olarak yeniden kodlandığı RabbitMQ veya SQS'den farklı olarak, Kafka'nın farklı bir anlamı vardır: Tüketici açıkça dengelemeleri taahhüt eder. Tüketici taahhütte bulunmazsa ve yeniden başlatıldığında aynı mesajları tekrar okuyacaktır. Bu gerçek bir risk yaratır: hatalı biçimlendirilmiş bir mesaj (zehir hapı) tüm tüketici grubunu sonsuz döngüde engelleyebilir, aynı bölümdeki sonraki mesajların işlenmesinin önlenmesi.
Kafka'da ele alınacak üç ana hata senaryosu şunlardır:
- Geçici hatalar: Aşağı akış hizmeti geçici olarak kullanılamıyor, ağ zaman aşımı — yeniden denemek mantıklıdır
- Kalıcı hatalar: hatalı biçimlendirilmiş mesaj, kurtarılamaz iş kuralı ihlali — yeniden denemek işe yaramaz
- Seri durumdan çıkarma hataları: Yük şeması uyumsuz bir şekilde değişti - zehirli hap senaryosu
Teslimat Semantiği ve Hata Yönetimi
- En fazla bir kez: işlemeden önce taahhüt edin, kilitlenme durumunda kaybolan mesajlar - asla üretimde kullanmayın
- En az bir kez: başarılı işlemden sonra işleme alın, yeniden denemede olası kopyalar - standart
- Tam olarak bir kez: Kritik kullanım durumları için bağımsız tüketici + Kafka işlemleri gerektirir
Desen 1: Teslim Edilmeyen Mektup Sırası (DLQ)
La Teslim Edilmeyen Mektup Sırası mesajların gönderildiği yer ayrı bir Kafka konusu Maksimum sayıda denemeden sonra işlenemeyenler. Tüketiciyi engellemek veya mesajı kaybetmek yerine karantina konusuna taşıyoruz manuel analiz veya gelecekte yeniden işleme için.
Standart adlandırma kuralı {topic-originale}.DLT o {topic-originale}-dlq.
DLQ'daki mesaj, orijinal mesajın yanı sıra hatayla ilgili meta verileri içermelidir
(stacktrace, deneme sayısı, başarısızlık zaman damgası) Kafka başlıkları aracılığıyla.
// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;
import java.time.Duration;
import java.util.*;
public class KafkaDLQHandler {
private static final String SOURCE_TOPIC = "ordini-effettuati";
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final int MAX_RETRY_ATTEMPTS = 3;
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Map<String, Integer> retryCount = new HashMap<>();
public KafkaDLQHandler(String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "servizio-inventario-dlq");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
this.producer = new KafkaProducer<>(producerProps);
}
public void processWithDLQ() {
consumer.subscribe(List.of(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
// Tenta l'elaborazione del messaggio
elaboraOrdine(record.value());
// Successo: rimuovi dal contatore retry e fai commit
retryCount.remove(messageKey);
consumer.commitSync();
} catch (RecoverableException e) {
// Errore transitorio: incrementa contatore
int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
retryCount.put(messageKey, attempts);
if (attempts >= MAX_RETRY_ATTEMPTS) {
// Troppi tentativi: manda in DLQ
sendToDLQ(record, e, attempts);
retryCount.remove(messageKey);
consumer.commitSync();
} else {
// Ritenta: non fare commit, il messaggio verra riletto
System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
" fallito per offset " + record.offset() + ": " + e.getMessage());
sleep(calculateBackoff(attempts));
}
} catch (PermanentException e) {
// Errore permanente: va direttamente in DLQ senza retry
sendToDLQ(record, e, 1);
consumer.commitSync();
}
}
}
}
private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
Exception error, int attempts) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
originalRecord.key(),
originalRecord.value()
);
// Arricchisci con headers per il debugging
Headers headers = dlqRecord.headers();
headers.add("dlq-original-topic", originalRecord.topic().getBytes());
headers.add("dlq-original-partition",
String.valueOf(originalRecord.partition()).getBytes());
headers.add("dlq-original-offset",
String.valueOf(originalRecord.offset()).getBytes());
headers.add("dlq-error-message", error.getMessage().getBytes());
headers.add("dlq-error-class", error.getClass().getName().getBytes());
headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
headers.add("dlq-failed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
// Copia anche gli header originali
originalRecord.headers().forEach(h ->
headers.add("original-" + h.key(), h.value()));
producer.send(dlqRecord, (metadata, ex) -> {
if (ex != null) {
System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
} else {
System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
DLQ_TOPIC, metadata.offset(), error.getMessage());
}
});
}
private long calculateBackoff(int attempt) {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
Desen 2: Üstel Geri Alma ile Yeniden Deneme
Il üstel geri çekilmeyle yeniden dene geçici hataları işlemek için standart kalıptır: Her başarısız deneme, bir sonraki denemeden önceki bekleme süresini uzatarak aşırı yüklemeyi önler aşağı yöndeki hizmet zaten zorluk içinde. Eklemek titreme (rastgele gürültü) geri çekilme problemini önler gürleyen sürü: tüm tüketiciler aynı anda yeniden başlatılıyor tam saniyesine kadar.
// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;
public class RetryWithBackoff {
private static final Random random = new Random();
/**
* Esegue l'operazione con retry esponenziale + jitter.
*
* @param operation La lambda da eseguire
* @param maxRetries Numero massimo di tentativi
* @param baseDelayMs Delay base in millisecondi (default: 1000ms)
* @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
*/
public static <T> T execute(Supplier<T> operation,
int maxRetries,
long baseDelayMs,
long maxDelayMs) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (RetryableException e) {
lastException = e;
attempt++;
if (attempt >= maxRetries) {
throw new MaxRetriesExceededException(
"Superato il numero massimo di tentativi: " + maxRetries, e);
}
// Calcola delay con full jitter
long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
attempt, maxRetries, delay);
Thread.sleep(delay);
}
}
throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
}
/**
* Full jitter: delay casuale tra 0 e il backoff esponenziale.
* Evita il thundering herd distribuendo i retry nel tempo.
*/
private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
// Full jitter: random tra 0 e exponentialDelay
return (long) (random.nextDouble() * exponentialDelay);
}
// Eccezioni custom per distinguere errori recuperabili da permanenti
public static class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
public static class MaxRetriesExceededException extends Exception {
public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
}
}
// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
() -> {
chiamataServizioEsterno(record.value());
return null;
},
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000
);
Desen 3: Konuyu Yeniden Deneme (Engellemeyen Yeniden Deneme)
Tüketicide uyku yaklaşımının sorunu şudur: tüm bölümü kilitle: yeniden denemeyi beklerken aynı bölümden başka hiçbir mesaj işlenmez, Tüketici gecikmesinde artışa neden oluyor.
Desen Konuyu Yeniden Dene (o Engellenmeyen Yeniden Deneme) bu sorunu çözer: başarısız olan mesaj, bölümü kilitlemek yerine ayrı bir yeniden deneme konusuna taşınır yapılandırılmış bir gecikmeyle. Birincil tüketici yeni mesajları işlemeye devam eder. Spring Kafka 2.7+ bu modeli yerel olarak uygular.
// Struttura dei topic con retry non-bloccante
// Topic principale: ordini-effettuati
// Topic retry 1: ordini-effettuati-retry-1000 (delay 1s)
// Topic retry 2: ordini-effettuati-retry-2000 (delay 2s)
// Topic retry 3: ordini-effettuati-retry-5000 (delay 5s)
// Topic DLQ: ordini-effettuati.DLT
// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;
@Component
public class OrdineConsumerNonBlocking {
@RetryableTopic(
attempts = "4", // 1 tentativo originale + 3 retry
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
dltTopicSuffix = ".DLT",
retryTopicSuffix = "-retry",
// Non riprova per errori non recuperabili
exclude = {
DeserializationException.class,
PermanentBusinessException.class
}
)
@KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
public void consumeOrdine(ConsumerRecord<String, String> record) {
// Spring Kafka gestisce automaticamente i retry e la DLQ
elaboraOrdine(record.value());
}
// Listener per la DLQ: analisi e alerting
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record,
@Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
record.key(), errorMessage);
// Invia alert, log su monitoring, notifica operatori...
alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
}
}
Model 4: Zehirli Hap Tespiti
Un zehir hapı her zaman tüketicinin çökmesine neden olan bir mesajdır, yeniden deneme sayısından bağımsız olarak. Klasik durum seri durumdan çıkarma hatasıdır: mesaj değeri beklenen formatta değil (bozuk JSON, uyumsuz Avro şeması).
Zehir hapının riski sonsuz döngüdür: tüketici başarısız olur, taahhütte bulunmaz, yeniden başlar, aynı mesajı okur, yine başarısız olur. Bu, bölümü tamamen kilitler. Ana savunma bir kullanmaktır Hata İşlemeSeri Çözücü bu hiçbir istisna yaratmaz ancak bozuk veriyi yönetilebilir bir hata nesnesine sarar.
// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Ordine> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");
// ErrorHandlingDeserializer wrappa il deserializer originale
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
// Il deserializer "reale" (delegato)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class);
// Target type per la deserializzazione JSON
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
ConsumerFactory<String, Ordine> consumerFactory,
KafkaTemplate<String, Ordine> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
new FixedBackOff(1000L, 3L) // 3 tentativi, 1s di attesa
));
return factory;
}
}
DLQ'dan yeniden işleme
DLQ kalıcı bir çöp kutusu değildir: mesajların alınabileceği bir karantina alanıdır. sorun çözüldükten sonra yeniden işlenmeleri gerekir. İki yaklaşım vardır:
- Manuel yeniden işleme: Bir operatör DLQ'daki mesajları inceler, sebebini belirler, sorunu düzeltir (bir düzeltme uygular, aşağı akış hizmetini geri yükler), ardından mesajları yeniden gönderir orijinal konu içinde.
- Otomatik yeniden işleme: Ayrı bir tüketici periyodik olarak DLQ'yu okur ve mesajları bir planlama politikasıyla (örneğin her saat başı, belirli bir uyarıdan sonra) yeniden işlemek için.
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final String SOURCE_TOPIC = "ordini-effettuati";
/**
* Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
* Utile dopo aver deployato un fix per un errore specifico.
*/
public void reprocessByErrorType(String targetErrorClass) {
Properties consumerProps = buildConsumerProps("dlq-reprocessor");
Properties producerProps = buildProducerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(List.of(DLQ_TOPIC));
int reprocessed = 0, skipped = 0;
ConsumerRecords<String, String> records;
do {
records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> dlqRecord : records) {
String errorClass = getHeader(dlqRecord, "dlq-error-class");
String originalTopic = getHeader(dlqRecord, "dlq-original-topic");
if (targetErrorClass.equals(errorClass)) {
// Rimanda al topic originale
ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
originalTopic != null ? originalTopic : SOURCE_TOPIC,
dlqRecord.key(),
dlqRecord.value()
);
// Aggiungi header per tracciabilita del reprocessing
reprocessRecord.headers().add("reprocessed-from-dlq",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(reprocessRecord);
reprocessed++;
} else {
skipped++;
}
}
consumer.commitSync();
} while (!records.isEmpty());
System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
reprocessed, skipped);
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
var header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
Dikkat: DLQ'daki Mesaj Sırası
DLQ'dan orijinal konuya mesaj gönderdiğinizde orijinal göreli sıra kaybolur zaten başarıyla işlenen diğer iletilere. Sıranın kritik olduğu kullanım durumları için (örneğin sıralı durum güncellemeleri), yeniden işlemede şunu dikkate almak gerekir: işlemek için tüketicide bazı eksiklik mantığı uygulamanız gerekebilir Yeni mesajlardan sonra gelen "eski" mesajlar.
Kafka'da Hata İşleme İçin En İyi Uygulamalar
- Geçici hataları daima kalıcı olanlardan ayırın: Özel istisnalar veya hata türü numaralandırması kullanın Hiçbir zaman düzeltilmeyecek hatalar üzerinde (örneğin şema ihlali) gereksiz yeniden denemelerden kaçınmak için.
- Üretimde DLQ zorunludur: DLQ'su olmayan her tüketici şunları yapabilir: mesajları sessizce kaybedebilir veya döngülere takılıp kalabilirsiniz. İsteğe bağlı değil.
-
DLQ'nun boyutunu izleyin: DLQ'da mesajların birikmesi bir sorunun işaretidir.
Uyarı ekle
kafka_consumer_group_partition_lag{topic="*.DLT"}. - DLQ mesajlarını meta verilerle zenginleştirin: başarısızlık zaman damgası, yığın izleme, deneme sayısı, orijinal konu ve ofset. Bu veriler olmadan hata ayıklama çok zordur.
- Ana tüketici döngüsünde uykuyu kullanmayın: bölümü kilitler ve gecikmeye neden olur. Konuyu Yeniden Dene modelini kullanın (engellenmeyen) veya taahhüt etmeyi ve DLQ'yu kabul edin.
- DLQ'da uzun bir saklama süresi ayarlayın: DLQ'daki mesajların incelenmesi gerekir. Retansiyon.ms'yi en az 30 güne ayarlayın (hatta yeniden işleme etkisizse sıkıştırma bile).
Serideki Sonraki Adımlar
- Madde 11 – Üretimde Kafka: seriyi operasyonel kılavuzla kapatır Küme boyutlandırma, optimum saklama ve çoğaltma faktörü yapılandırması için eksiksiz, ve coğrafi felaket kurtarma için MirrorMaker 2.
Diğer Serilerle Bağlantı
- Olay Odaklı Mimari – Asenkron Sistemlerde Teslim Edilmeyen Mektup Sırası: aynı DLQ modeli SQS, SNS ve diğer mesajlaşma sistemleri için de geçerlidir. Madde 708 EDA serisi, zaman aşımı ve maxReceiveCount görünürlüğü ile AWS bağlamındaki DLQ'yu kapsar.
- Kafka Tam Olarak-Bir Kez Anlambilimi (Madde 4): neden olunan kopyaları ortadan kaldırmak için Kafka'nın işlemsel API'si, yeniden denemelerden tam olarak bir kez uçtan uca garantilere izin verir.







