Wachtrij voor dode letters en foutafhandeling in Kafka
In een gedistribueerd systeem mislukken berichten. Corrupte deserialisatie, bedrijfslogica veroorzaakt uitzonderingen, downstream-service onbereikbaar – in al deze gevallen moet de consument beslissen wat hij gaat doen. Deze gids behandelt de Fundamentele patronen voor foutafhandeling in Kafka: Dead Letter Queue, exponentiële nieuwe poging met uitstel, detectie van gifpillen en herverwerkingsstrategieën.
Het foutprobleem in Kafka
In tegenstelling tot RabbitMQ of SQS, waarbij het bericht automatisch opnieuw wordt gecodeerd in geval van een fout, Kafka heeft een andere semantiek: de consument verplicht zich expliciet tot compensatie. Als de consument zich niet verbindt en opnieuw opstart, zal het dezelfde berichten opnieuw lezen. Dit brengt een reëel risico met zich mee: een verkeerd opgemaakt bericht (gif pil) kan de hele consumentengroep in een oneindige lus blokkeren, voorkomen dat volgende berichten op dezelfde partitie worden verwerkt.
De drie belangrijkste foutscenario's die in Kafka moeten worden afgehandeld, zijn:
- Tijdelijke fouten: Downstream-service tijdelijk niet beschikbaar, netwerktime-out: opnieuw proberen is zinvol
- Permanente fouten: verkeerd opgemaakt bericht, niet-herstelbare overtreding van de bedrijfsregels - opnieuw proberen is nutteloos
- Deserialisatiefouten: Het payload-schema is op een onverenigbare manier veranderd: het gifpil-scenario
Leveringssemantiek en foutbeheer
- Hoogstens één keer: vastleggen vóór verwerking, berichten verloren bij crash - nooit gebruiken in productie
- Minstens één keer: vastleggen na succesvolle verwerking, mogelijke duplicaten bij nieuwe poging — standaard
- Precies één keer: Vereist idempotente consumenten- en Kafka-transacties — voor kritische gebruiksscenario's
Patroon 1: Wachtrij voor dode letters (DLQ)
La Wachtrij met dode letters het is een apart Kafka-topic waar berichten naartoe worden gestuurd die na een maximaal aantal pogingen niet kunnen worden verwerkt. In plaats van de consument te blokkeren of het bericht kwijt te raken, verplaatsen we het naar een quarantaineonderwerp voor handmatige analyse of toekomstige herverwerking.
De standaard naamgevingsconventie is {topic-originale}.DLT o {topic-originale}-dlq.
Het bericht in de DLQ moet het originele bericht bevatten, plus metagegevens over de fout
(stacktrace, aantal pogingen, tijdstempel van de fout) via Kafka-headers.
// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;
import java.time.Duration;
import java.util.*;
public class KafkaDLQHandler {
private static final String SOURCE_TOPIC = "ordini-effettuati";
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final int MAX_RETRY_ATTEMPTS = 3;
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Map<String, Integer> retryCount = new HashMap<>();
public KafkaDLQHandler(String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "servizio-inventario-dlq");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
this.producer = new KafkaProducer<>(producerProps);
}
public void processWithDLQ() {
consumer.subscribe(List.of(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
// Tenta l'elaborazione del messaggio
elaboraOrdine(record.value());
// Successo: rimuovi dal contatore retry e fai commit
retryCount.remove(messageKey);
consumer.commitSync();
} catch (RecoverableException e) {
// Errore transitorio: incrementa contatore
int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
retryCount.put(messageKey, attempts);
if (attempts >= MAX_RETRY_ATTEMPTS) {
// Troppi tentativi: manda in DLQ
sendToDLQ(record, e, attempts);
retryCount.remove(messageKey);
consumer.commitSync();
} else {
// Ritenta: non fare commit, il messaggio verra riletto
System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
" fallito per offset " + record.offset() + ": " + e.getMessage());
sleep(calculateBackoff(attempts));
}
} catch (PermanentException e) {
// Errore permanente: va direttamente in DLQ senza retry
sendToDLQ(record, e, 1);
consumer.commitSync();
}
}
}
}
private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
Exception error, int attempts) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
originalRecord.key(),
originalRecord.value()
);
// Arricchisci con headers per il debugging
Headers headers = dlqRecord.headers();
headers.add("dlq-original-topic", originalRecord.topic().getBytes());
headers.add("dlq-original-partition",
String.valueOf(originalRecord.partition()).getBytes());
headers.add("dlq-original-offset",
String.valueOf(originalRecord.offset()).getBytes());
headers.add("dlq-error-message", error.getMessage().getBytes());
headers.add("dlq-error-class", error.getClass().getName().getBytes());
headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
headers.add("dlq-failed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
// Copia anche gli header originali
originalRecord.headers().forEach(h ->
headers.add("original-" + h.key(), h.value()));
producer.send(dlqRecord, (metadata, ex) -> {
if (ex != null) {
System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
} else {
System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
DLQ_TOPIC, metadata.offset(), error.getMessage());
}
});
}
private long calculateBackoff(int attempt) {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
Patroon 2: Probeer het opnieuw met exponentiële uitstel
Il probeer het opnieuw met exponentieel uitstel is het standaardpatroon voor het afhandelen van tijdelijke fouten: elke mislukte poging verlengt de wachttijd tot de volgende poging, waardoor overbelasting wordt voorkomen de downstreamdienst die al in moeilijkheden verkeert. Toevoegen rilling (willekeurige ruis) bij uitstel vermijdt het probleem van donderende kudde: alle verbruikers herstarten gelijktijdig tot op de exacte seconde.
// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;
public class RetryWithBackoff {
private static final Random random = new Random();
/**
* Esegue l'operazione con retry esponenziale + jitter.
*
* @param operation La lambda da eseguire
* @param maxRetries Numero massimo di tentativi
* @param baseDelayMs Delay base in millisecondi (default: 1000ms)
* @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
*/
public static <T> T execute(Supplier<T> operation,
int maxRetries,
long baseDelayMs,
long maxDelayMs) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (RetryableException e) {
lastException = e;
attempt++;
if (attempt >= maxRetries) {
throw new MaxRetriesExceededException(
"Superato il numero massimo di tentativi: " + maxRetries, e);
}
// Calcola delay con full jitter
long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
attempt, maxRetries, delay);
Thread.sleep(delay);
}
}
throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
}
/**
* Full jitter: delay casuale tra 0 e il backoff esponenziale.
* Evita il thundering herd distribuendo i retry nel tempo.
*/
private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
// Full jitter: random tra 0 e exponentialDelay
return (long) (random.nextDouble() * exponentialDelay);
}
// Eccezioni custom per distinguere errori recuperabili da permanenti
public static class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
public static class MaxRetriesExceededException extends Exception {
public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
}
}
// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
() -> {
chiamataServizioEsterno(record.value());
return null;
},
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000
);
Patroon 3: Onderwerp opnieuw proberen (niet-blokkerende nieuwe poging)
Het probleem met de slaapbenadering bij de consument is dat vergrendel de hele partitie: terwijl we wachten op de nieuwe poging, worden er geen andere berichten van dezelfde partitie verwerkt, waardoor de consumentenvertraging toeneemt.
Het patroon Onderwerp opnieuw proberen (o Niet-blokkerende nieuwe poging) lost dit probleem op: in plaats van de partitie te vergrendelen, wordt het mislukte bericht verplaatst naar een afzonderlijk onderwerp voor opnieuw proberen met een geconfigureerde vertraging. De primaire consument blijft nieuwe berichten verwerken. Spring Kafka 2.7+ implementeert dit patroon native.
// Struttura dei topic con retry non-bloccante
// Topic principale: ordini-effettuati
// Topic retry 1: ordini-effettuati-retry-1000 (delay 1s)
// Topic retry 2: ordini-effettuati-retry-2000 (delay 2s)
// Topic retry 3: ordini-effettuati-retry-5000 (delay 5s)
// Topic DLQ: ordini-effettuati.DLT
// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;
@Component
public class OrdineConsumerNonBlocking {
@RetryableTopic(
attempts = "4", // 1 tentativo originale + 3 retry
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
dltTopicSuffix = ".DLT",
retryTopicSuffix = "-retry",
// Non riprova per errori non recuperabili
exclude = {
DeserializationException.class,
PermanentBusinessException.class
}
)
@KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
public void consumeOrdine(ConsumerRecord<String, String> record) {
// Spring Kafka gestisce automaticamente i retry e la DLQ
elaboraOrdine(record.value());
}
// Listener per la DLQ: analisi e alerting
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record,
@Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
record.key(), errorMessage);
// Invia alert, log su monitoring, notifica operatori...
alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
}
}
Patroon 4: Detectie van gifpillen
Un gif pil het is een boodschap die altijd een consumentencrash veroorzaakt, ongeacht het aantal nieuwe pogingen. Het klassieke geval is een deserialisatiefout: de berichtwaarde heeft niet de verwachte indeling (corrupte JSON, incompatibel Avro-schema).
Het risico van de gifpil is de oneindige lus: de consument faalt, verbindt zich niet, start opnieuw op, leest hetzelfde bericht, mislukt opnieuw. Hierdoor wordt de partitie volledig vergrendeld. De belangrijkste verdediging is het gebruik van a ErrorHandlingDeserializer die geen uitzondering oplevert maar verpakt de corrupte lading in een beheersbaar foutobject.
// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Ordine> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");
// ErrorHandlingDeserializer wrappa il deserializer originale
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
// Il deserializer "reale" (delegato)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class);
// Target type per la deserializzazione JSON
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
ConsumerFactory<String, Ordine> consumerFactory,
KafkaTemplate<String, Ordine> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
new FixedBackOff(1000L, 3L) // 3 tentativi, 1s di attesa
));
return factory;
}
}
Herverwerking vanuit DLQ
Een DLQ is geen permanente prullenbak: het is een quarantaineruimte van waaruit berichten worden verzonden ze moeten opnieuw worden verwerkt nadat het probleem is opgelost. Er zijn twee benaderingen:
- Handmatige herverwerking: een operator inspecteert berichten in DLQ, identificeert de oorzaak, corrigeert het probleem (implementeer een oplossing, herstel de downstream-service) en verzendt vervolgens de berichten opnieuw in het oorspronkelijke onderwerp.
- Automatische herverwerking: Een afzonderlijke consument leest periodiek de DLQ en probeert het om berichten opnieuw te verwerken, met een planningsbeleid (bijvoorbeeld elk uur, na een specifieke waarschuwing).
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final String SOURCE_TOPIC = "ordini-effettuati";
/**
* Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
* Utile dopo aver deployato un fix per un errore specifico.
*/
public void reprocessByErrorType(String targetErrorClass) {
Properties consumerProps = buildConsumerProps("dlq-reprocessor");
Properties producerProps = buildProducerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(List.of(DLQ_TOPIC));
int reprocessed = 0, skipped = 0;
ConsumerRecords<String, String> records;
do {
records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> dlqRecord : records) {
String errorClass = getHeader(dlqRecord, "dlq-error-class");
String originalTopic = getHeader(dlqRecord, "dlq-original-topic");
if (targetErrorClass.equals(errorClass)) {
// Rimanda al topic originale
ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
originalTopic != null ? originalTopic : SOURCE_TOPIC,
dlqRecord.key(),
dlqRecord.value()
);
// Aggiungi header per tracciabilita del reprocessing
reprocessRecord.headers().add("reprocessed-from-dlq",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(reprocessRecord);
reprocessed++;
} else {
skipped++;
}
}
consumer.commitSync();
} while (!records.isEmpty());
System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
reprocessed, skipped);
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
var header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
Let op: Volgorde van berichten in de DLQ
Wanneer u berichten vanuit de DLQ terugstuurt naar het oorspronkelijke onderwerp, gaat de oorspronkelijke relatieve volgorde verloren naar andere berichten die al succesvol zijn verwerkt. Voor gebruiksscenario's waarbij volgorde van cruciaal belang is (bijvoorbeeld opeenvolgende statusupdates), moet bij de herverwerking hiermee rekening worden gehouden: Mogelijk moet u enige idempotency-logica in de consument toepassen om deze te kunnen verwerken "oude" berichten die na nieuwere aankomen.
Best practices voor foutafhandeling in Kafka
- Maak altijd onderscheid tussen tijdelijke fouten en permanente fouten: Gebruik aangepaste uitzonderingen of een opsomming van het fouttype om nutteloze nieuwe pogingen te voorkomen voor fouten die nooit zullen genezen (bijvoorbeeld een schema-schending).
- DLQ is verplicht in productie: Elke consument die geen DLQ heeft, kan dat berichten geruisloos kwijtraken of vastlopen in lussen. Het is niet optioneel.
-
Controleer de grootte van de DLQ: Een opeenstapeling van berichten in DLQ is een teken van een probleem.
Waarschuwing toevoegen aan
kafka_consumer_group_partition_lag{topic="*.DLT"}. - Verrijk DLQ-berichten met metadata: fouttijdstempel, stacktrace, aantal pogingen, origineel onderwerp en offset. Zonder deze gegevens is het debuggen erg moeilijk.
- Gebruik slaap niet in de belangrijkste consumentenlus: vergrendelt de partitie en veroorzaakt vertraging. Gebruik het Retry Topic-patroon (niet-blokkerend) of ga akkoord met commit en DLQ.
- Stel een lange retentie in op de DLQ: Berichten in DLQ moeten worden geïnspecteerd. Stel retentie.ms in op minimaal 30 dagen (of zelfs compactie als herverwerking idempotent is).
Volgende stappen in de serie
- Artikel 11 – Kafka in productie: sluit de serie af met de operationele gids compleet voor clustergrootte, optimale retentie en replicatiefactorconfiguratie, en MirrorMaker 2 voor geografisch noodherstel.
Link met andere series
- Gebeurtenisgestuurde architectuur – Dead Letter-wachtrij in asynchrone systemen: hetzelfde DLQ-patroon is van toepassing op SQS, SNS en andere berichtensystemen. Artikel 708 van de EDA-serie omvat DLQ in de AWS-context met time-out en maxReceiveCount-zichtbaarheid.
- Kafka Exactly-Once-semantiek (artikel 4): om de veroorzaakte duplicaten te elimineren Vanaf nieuwe pogingen zorgt de transactionele API van Kafka voor precies één keer end-to-end garanties.







