Fronta nedoručených dopisů a zpracování chyb v Kafkovi
V distribuovaném systému zprávy selhávají. Zkorumpovaná deseralizace, obchodní logika vyvolá výjimku, navazující služba nedosažitelná – ve všech těchto případech se musí spotřebitel rozhodnout, co dělat. Tato příručka pokrývá Základní vzorce pro zpracování chyb v Kafkovi: Fronta mrtvého dopisu, exponenciální opakování s couváním, detekce otravných pilulek a strategie přepracování.
Problém chyby u Kafky
Na rozdíl od RabbitMQ nebo SQS, kde je zpráva v případě chyby automaticky překódována, Kafka má odlišnou sémantiku: spotřebitel se výslovně dopouští kompenzací. Pokud se spotřebitel nezaváže a restartuje, přečte znovu stejné zprávy. To vytváří skutečné riziko: špatně naformátovaná zpráva (jedovatá pilulka) může blokovat celou skupinu spotřebitelů v nekonečné smyčce, zabránění zpracování následných zpráv ve stejném oddílu.
Tři hlavní chybové scénáře, které je třeba v Kafce zpracovat, jsou:
- Přechodné chyby: Služba downstream dočasně nedostupná, časový limit sítě – opakování má smysl
- Trvalé chyby: chybná zpráva, porušení obchodních pravidel, které nelze obnovit – opakování je zbytečné
- Chyby deseralizace: Schéma užitečného zatížení se změnilo nekompatibilním způsobem — scénář jedovaté pilulky
Sémantika doručení a správa chyb
- Maximálně jednou: potvrzení před zpracováním, zprávy ztracené při havárii – nikdy nepoužívejte v produkci
- Alespoň jednou: potvrzení po úspěšném zpracování, možné duplikáty při opakování — standardní
- Přesně - jednou: Vyžaduje idempotentní spotřebitele + transakce Kafka — pro kritické případy použití
Vzor 1: Fronta nedoručených dopisů (DLQ)
La Fronta mrtvých dopisů je to samostatné Kafkovo téma, kam se posílají zprávy které se nepodaří zpracovat po maximálním počtu pokusů. Místo abychom zablokovali spotřebitele nebo zprávu ztratili, přesuneme ji do tématu karantény pro ruční analýzu nebo budoucí přepracování.
Standardní konvence pojmenování je {topic-originale}.DLT o {topic-originale}-dlq.
Zpráva v DLQ by měla obsahovat původní zprávu plus metadata o chybě
(stacktrace, počet pokusů, časové razítko selhání) prostřednictvím hlaviček Kafka.
// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;
import java.time.Duration;
import java.util.*;
public class KafkaDLQHandler {
private static final String SOURCE_TOPIC = "ordini-effettuati";
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final int MAX_RETRY_ATTEMPTS = 3;
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Map<String, Integer> retryCount = new HashMap<>();
public KafkaDLQHandler(String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "servizio-inventario-dlq");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
this.producer = new KafkaProducer<>(producerProps);
}
public void processWithDLQ() {
consumer.subscribe(List.of(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
// Tenta l'elaborazione del messaggio
elaboraOrdine(record.value());
// Successo: rimuovi dal contatore retry e fai commit
retryCount.remove(messageKey);
consumer.commitSync();
} catch (RecoverableException e) {
// Errore transitorio: incrementa contatore
int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
retryCount.put(messageKey, attempts);
if (attempts >= MAX_RETRY_ATTEMPTS) {
// Troppi tentativi: manda in DLQ
sendToDLQ(record, e, attempts);
retryCount.remove(messageKey);
consumer.commitSync();
} else {
// Ritenta: non fare commit, il messaggio verra riletto
System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
" fallito per offset " + record.offset() + ": " + e.getMessage());
sleep(calculateBackoff(attempts));
}
} catch (PermanentException e) {
// Errore permanente: va direttamente in DLQ senza retry
sendToDLQ(record, e, 1);
consumer.commitSync();
}
}
}
}
private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
Exception error, int attempts) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
originalRecord.key(),
originalRecord.value()
);
// Arricchisci con headers per il debugging
Headers headers = dlqRecord.headers();
headers.add("dlq-original-topic", originalRecord.topic().getBytes());
headers.add("dlq-original-partition",
String.valueOf(originalRecord.partition()).getBytes());
headers.add("dlq-original-offset",
String.valueOf(originalRecord.offset()).getBytes());
headers.add("dlq-error-message", error.getMessage().getBytes());
headers.add("dlq-error-class", error.getClass().getName().getBytes());
headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
headers.add("dlq-failed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
// Copia anche gli header originali
originalRecord.headers().forEach(h ->
headers.add("original-" + h.key(), h.value()));
producer.send(dlqRecord, (metadata, ex) -> {
if (ex != null) {
System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
} else {
System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
DLQ_TOPIC, metadata.offset(), error.getMessage());
}
});
}
private long calculateBackoff(int attempt) {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
Vzor 2: Opakujte pokus s exponenciálním stažením
Il opakujte s exponenciálním stažením je standardní vzor pro řešení přechodných chyb: každý neúspěšný pokus prodlužuje čekací dobu před dalším pokusem, čímž se zabrání přetížení navazující služba již má potíže. Přidat nervozita (náhodný hluk) na backoff se vyhýbá problému hřmící stádo: všechny spotřebiče se restartují současně na přesnou vteřinu.
// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;
public class RetryWithBackoff {
private static final Random random = new Random();
/**
* Esegue l'operazione con retry esponenziale + jitter.
*
* @param operation La lambda da eseguire
* @param maxRetries Numero massimo di tentativi
* @param baseDelayMs Delay base in millisecondi (default: 1000ms)
* @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
*/
public static <T> T execute(Supplier<T> operation,
int maxRetries,
long baseDelayMs,
long maxDelayMs) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (RetryableException e) {
lastException = e;
attempt++;
if (attempt >= maxRetries) {
throw new MaxRetriesExceededException(
"Superato il numero massimo di tentativi: " + maxRetries, e);
}
// Calcola delay con full jitter
long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
attempt, maxRetries, delay);
Thread.sleep(delay);
}
}
throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
}
/**
* Full jitter: delay casuale tra 0 e il backoff esponenziale.
* Evita il thundering herd distribuendo i retry nel tempo.
*/
private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
// Full jitter: random tra 0 e exponentialDelay
return (long) (random.nextDouble() * exponentialDelay);
}
// Eccezioni custom per distinguere errori recuperabili da permanenti
public static class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
public static class MaxRetriesExceededException extends Exception {
public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
}
}
// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
() -> {
chiamataServizioEsterno(record.value());
return null;
},
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000
);
Vzor 3: Opakovat téma (neblokující opakování)
Problém spánkového přístupu u spotřebitele je v tom uzamknout celý oddíl: zatímco čekáme na opakování, žádné další zprávy ze stejného oddílu se nezpracovávají, způsobuje růst spotřebitelského zpoždění.
Vzor Opakovat téma (o Neblokující opakování) řeší tento problém: místo uzamčení oddílu se zpráva o selhání přesune do samostatného tématu opakování s nakonfigurovaným zpožděním. Primární spotřebitel pokračuje ve zpracování nových zpráv. Spring Kafka 2.7+ tento vzor nativně implementuje.
// Struttura dei topic con retry non-bloccante
// Topic principale: ordini-effettuati
// Topic retry 1: ordini-effettuati-retry-1000 (delay 1s)
// Topic retry 2: ordini-effettuati-retry-2000 (delay 2s)
// Topic retry 3: ordini-effettuati-retry-5000 (delay 5s)
// Topic DLQ: ordini-effettuati.DLT
// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;
@Component
public class OrdineConsumerNonBlocking {
@RetryableTopic(
attempts = "4", // 1 tentativo originale + 3 retry
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
dltTopicSuffix = ".DLT",
retryTopicSuffix = "-retry",
// Non riprova per errori non recuperabili
exclude = {
DeserializationException.class,
PermanentBusinessException.class
}
)
@KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
public void consumeOrdine(ConsumerRecord<String, String> record) {
// Spring Kafka gestisce automaticamente i retry e la DLQ
elaboraOrdine(record.value());
}
// Listener per la DLQ: analisi e alerting
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record,
@Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
record.key(), errorMessage);
// Invia alert, log su monitoring, notifica operatori...
alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
}
}
Vzor 4: Detekce pilulky jedu
Un jedovatá pilulka je to zpráva, která vždy způsobí selhání spotřebitele, bez ohledu na počet opakování. Klasickým případem je chyba deseralizace: hodnota zprávy není v očekávaném formátu (poškozený JSON, nekompatibilní schéma Avro).
Rizikem jedovaté pilulky je nekonečná smyčka: spotřebitel selže, nezaváže se, restartuje, přečte stejnou zprávu, znovu selže. Tím se oddíl zcela uzamkne. Hlavní obranou je použití a ErrorHandlingDeserializer který nehází žádnou výjimku ale zabalí poškozený datový obsah do zvládnutelného chybového objektu.
// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Ordine> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");
// ErrorHandlingDeserializer wrappa il deserializer originale
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
// Il deserializer "reale" (delegato)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class);
// Target type per la deserializzazione JSON
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
ConsumerFactory<String, Ordine> consumerFactory,
KafkaTemplate<String, Ordine> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
new FixedBackOff(1000L, 3L) // 3 tentativi, 1s di attesa
));
return factory;
}
}
Přepracování z DLQ
DLQ není trvalý odpadkový koš: je to karanténní prostor, ze kterého jsou odesílány zprávy musí být po vyřešení problému znovu zpracovány. Existují dva přístupy:
- Ruční přepracování: operátor kontroluje zprávy v DLQ, identifikuje příčinu, opraví problém (nasadí opravu, obnoví následnou službu) a poté znovu odešle zprávy v původním tématu.
- Automatické přepracování: Samostatný spotřebitel pravidelně čte DLQ a zkouší k opětovnému zpracování zpráv pomocí zásady plánování (např. každou hodinu, po konkrétním upozornění).
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final String SOURCE_TOPIC = "ordini-effettuati";
/**
* Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
* Utile dopo aver deployato un fix per un errore specifico.
*/
public void reprocessByErrorType(String targetErrorClass) {
Properties consumerProps = buildConsumerProps("dlq-reprocessor");
Properties producerProps = buildProducerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(List.of(DLQ_TOPIC));
int reprocessed = 0, skipped = 0;
ConsumerRecords<String, String> records;
do {
records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> dlqRecord : records) {
String errorClass = getHeader(dlqRecord, "dlq-error-class");
String originalTopic = getHeader(dlqRecord, "dlq-original-topic");
if (targetErrorClass.equals(errorClass)) {
// Rimanda al topic originale
ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
originalTopic != null ? originalTopic : SOURCE_TOPIC,
dlqRecord.key(),
dlqRecord.value()
);
// Aggiungi header per tracciabilita del reprocessing
reprocessRecord.headers().add("reprocessed-from-dlq",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(reprocessRecord);
reprocessed++;
} else {
skipped++;
}
}
consumer.commitSync();
} while (!records.isEmpty());
System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
reprocessed, skipped);
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
var header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
Pozor: Pořadí zpráv v DLQ
Když odešlete zprávy z DLQ zpět do původního tématu, původní relativní pořadí se ztratí na další již úspěšně zpracované zprávy. Pro případy použití, kde je pořadí kritické (např. sekvenční aktualizace stavu), opětovné zpracování musí vzít v úvahu: možná budete muset u spotřebitele použít určitou logiku idempotence, abyste to zvládli „staré“ zprávy, které přicházejí po novějších.
Nejlepší postupy pro řešení chyb ve službě Kafka
- Vždy rozlišujte přechodné chyby od trvalých: Použijte vlastní výjimky nebo výčet typu chyby abyste se vyhnuli zbytečným opakováním chyb, které se nikdy nevyléčí (např. porušení schématu).
- DLQ je ve výrobě povinné: Každý spotřebitel, který nemá DLQ, může tiše ztratit zprávy nebo uvíznout ve smyčce. Není to volitelné.
-
Sledujte velikost DLQ: Hromadění zpráv v DLQ je známkou problému.
Přidat upozornění na
kafka_consumer_group_partition_lag{topic="*.DLT"}. - Obohaťte zprávy DLQ o metadata: časové razítko selhání, stacktrace, počet pokusů, původní téma a offset. Bez těchto dat je ladění velmi obtížné.
- Nepoužívejte režim spánku v hlavní spotřebitelské smyčce: uzamkne oddíl a způsobí zpoždění. Použijte vzor Opakovat téma (neblokující) nebo souhlasíte s potvrzením a DLQ.
- Nastavte dlouhé uchování na DLQ: Zprávy v DLQ je třeba zkontrolovat. Nastavte retenci.ms alespoň na 30 dní (nebo dokonce zhutnění, pokud je opětovné zpracování idempotentní).
Další kroky v sérii
- Článek 11 – Kafka ve výrobě: uzavírá sérii s provozním průvodcem kompletní pro dimenzování clusteru, optimální uchování a konfiguraci faktoru replikace, a MirrorMaker 2 pro obnovu po geografické havárii.
Propojení s ostatními sériemi
- Architektura řízená událostmi – fronta nedoručených zpráv v asynchronních systémech: stejný vzor DLQ platí pro SQS, SNS a další systémy zasílání zpráv. Článek 708 z řady EDA pokrývá DLQ v kontextu AWS s časovým limitem a viditelností maxReceiveCount.
- Kafkova sémantika přesně jednou (článek 4): k odstranění způsobených duplicit Z opakovaných pokusů umožňuje transakční API Kafky přesně jednou end-to-end záruky.







