Kolejka niedostarczonych listów i obsługa błędów w Kafce
W systemie rozproszonym komunikaty kończą się niepowodzeniem. Uszkodzona deserializacja, logika biznesowa zgłaszająca wyjątek, usługa niższego szczebla nieosiągalna – we wszystkich tych przypadkach konsument musi zdecydować, co zrobić. W tym przewodniku omówiono Podstawowe wzorce obsługi błędów w Kafce: Kolejka niedostarczonych listów, wykładnicza ponowna próba z wycofywaniem, wykrywanie trujących pigułek i strategie ponownego przetwarzania.
Problem błędów w Kafce
W przeciwieństwie do RabbitMQ lub SQS, gdzie wiadomość jest automatycznie ponownie kodowana w przypadku błędu, Kafka ma inną semantykę: konsument jawnie dokonuje przesunięć. Jeśli konsument nie zobowiąże się i uruchomi się ponownie, ponownie przeczyta te same wiadomości. Stwarza to realne ryzyko: źle sformułowana wiadomość (trująca pigułka) może zablokować całą grupę odbiorców w nieskończonej pętli, uniemożliwiając przetwarzanie kolejnych wiadomości w tej samej partycji.
Trzy główne scenariusze błędów, które należy obsłużyć w Kafce, to:
- Błędy przejściowe: Usługa podrzędna chwilowo niedostępna, przekroczono limit czasu sieci — ponowna próba ma sens
- Błędy trwałe: zniekształcona wiadomość, nieodwracalne naruszenie reguł biznesowych — ponowna próba jest bezużyteczna
- Błędy deserializacji: Schemat ładunku zmienił się w niezgodny sposób – scenariusz z trującą pigułką
Semantyka dostarczania i zarządzanie błędami
- Co najwyżej raz: zatwierdzenie przed przetworzeniem, wiadomości utracone w przypadku awarii — nigdy nie używaj w środowisku produkcyjnym
- Przynajmniej raz: zatwierdzenie po pomyślnym przetworzeniu, możliwe duplikaty przy ponownej próbie — standard
- Dokładnie-raz: Wymaga idempotentnego konsumenta + transakcji Kafka — w krytycznych przypadkach użycia
Wzorzec 1: Kolejka niedostarczonych listów (DLQ)
La Kolejka martwych listów jest to osobny temat Kafki, w którym wysyłane są wiadomości które nie mogą zostać przetworzone po maksymalnej liczbie prób. Zamiast blokować konsumenta lub gubić wiadomość, przenosimy ją do tematu kwarantanny do ręcznej analizy lub przyszłego ponownego przetwarzania.
Standardowa konwencja nazewnictwa to {topic-originale}.DLT o {topic-originale}-dlq.
Wiadomość w DLQ powinna zawierać oryginalną wiadomość oraz metadane dotyczące błędu
(śledzenie stosu, liczba prób, sygnatura czasowa niepowodzenia) za pośrednictwem nagłówków Kafki.
// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;
import java.time.Duration;
import java.util.*;
public class KafkaDLQHandler {
private static final String SOURCE_TOPIC = "ordini-effettuati";
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final int MAX_RETRY_ATTEMPTS = 3;
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private final Map<String, Integer> retryCount = new HashMap<>();
public KafkaDLQHandler(String bootstrapServers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "servizio-inventario-dlq");
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
this.producer = new KafkaProducer<>(producerProps);
}
public void processWithDLQ() {
consumer.subscribe(List.of(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();
try {
// Tenta l'elaborazione del messaggio
elaboraOrdine(record.value());
// Successo: rimuovi dal contatore retry e fai commit
retryCount.remove(messageKey);
consumer.commitSync();
} catch (RecoverableException e) {
// Errore transitorio: incrementa contatore
int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
retryCount.put(messageKey, attempts);
if (attempts >= MAX_RETRY_ATTEMPTS) {
// Troppi tentativi: manda in DLQ
sendToDLQ(record, e, attempts);
retryCount.remove(messageKey);
consumer.commitSync();
} else {
// Ritenta: non fare commit, il messaggio verra riletto
System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
" fallito per offset " + record.offset() + ": " + e.getMessage());
sleep(calculateBackoff(attempts));
}
} catch (PermanentException e) {
// Errore permanente: va direttamente in DLQ senza retry
sendToDLQ(record, e, 1);
consumer.commitSync();
}
}
}
}
private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
Exception error, int attempts) {
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
originalRecord.key(),
originalRecord.value()
);
// Arricchisci con headers per il debugging
Headers headers = dlqRecord.headers();
headers.add("dlq-original-topic", originalRecord.topic().getBytes());
headers.add("dlq-original-partition",
String.valueOf(originalRecord.partition()).getBytes());
headers.add("dlq-original-offset",
String.valueOf(originalRecord.offset()).getBytes());
headers.add("dlq-error-message", error.getMessage().getBytes());
headers.add("dlq-error-class", error.getClass().getName().getBytes());
headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
headers.add("dlq-failed-at",
String.valueOf(System.currentTimeMillis()).getBytes());
// Copia anche gli header originali
originalRecord.headers().forEach(h ->
headers.add("original-" + h.key(), h.value()));
producer.send(dlqRecord, (metadata, ex) -> {
if (ex != null) {
System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
} else {
System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
DLQ_TOPIC, metadata.offset(), error.getMessage());
}
});
}
private long calculateBackoff(int attempt) {
// Exponential backoff: 1s, 2s, 4s, 8s, ...
return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
}
private void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
Wzorzec 2: Ponów próbę z wykładniczym wycofaniem
Il spróbuj ponownie z wykładniczym wycofaniem to standardowy wzorzec obsługi błędów przejściowych: każda nieudana próba wydłuża czas oczekiwania przed następną próbą, unikając przeciążenia świadczenie usług na niższym szczeblu łańcucha dostaw już znajduje się w trudnej sytuacji. Dodać drganie (losowy hałas) przy backoffie pozwala uniknąć problemu grzmiące stado: wszyscy odbiorcy uruchamiający się ponownie jednocześnie co do sekundy.
// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;
public class RetryWithBackoff {
private static final Random random = new Random();
/**
* Esegue l'operazione con retry esponenziale + jitter.
*
* @param operation La lambda da eseguire
* @param maxRetries Numero massimo di tentativi
* @param baseDelayMs Delay base in millisecondi (default: 1000ms)
* @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
*/
public static <T> T execute(Supplier<T> operation,
int maxRetries,
long baseDelayMs,
long maxDelayMs) throws Exception {
int attempt = 0;
Exception lastException = null;
while (attempt < maxRetries) {
try {
return operation.get();
} catch (RetryableException e) {
lastException = e;
attempt++;
if (attempt >= maxRetries) {
throw new MaxRetriesExceededException(
"Superato il numero massimo di tentativi: " + maxRetries, e);
}
// Calcola delay con full jitter
long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
attempt, maxRetries, delay);
Thread.sleep(delay);
}
}
throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
}
/**
* Full jitter: delay casuale tra 0 e il backoff esponenziale.
* Evita il thundering herd distribuendo i retry nel tempo.
*/
private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
// Full jitter: random tra 0 e exponentialDelay
return (long) (random.nextDouble() * exponentialDelay);
}
// Eccezioni custom per distinguere errori recuperabili da permanenti
public static class RetryableException extends RuntimeException {
public RetryableException(String message, Throwable cause) { super(message, cause); }
}
public static class MaxRetriesExceededException extends Exception {
public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
}
}
// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
() -> {
chiamataServizioEsterno(record.value());
return null;
},
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000
);
Wzorzec 3: Ponów temat (ponowna próba nieblokująca)
Problem z podejściem do snu u konsumenta polega na tym, że zablokuj całą partycję: podczas oczekiwania na ponowną próbę nie są przetwarzane żadne inne wiadomości z tej samej partycji, powodując wzrost opóźnień konsumenckich.
Wzór Ponów temat (o Ponowna próba bez blokowania) rozwiązuje ten problem: zamiast blokować partycję, komunikat o niepowodzeniu jest przenoszony do osobnego tematu dotyczącego ponawiania prób ze skonfigurowanym opóźnieniem. Główny konsument w dalszym ciągu przetwarza nowe wiadomości. Spring Kafka 2.7+ implementuje ten wzorzec natywnie.
// Struttura dei topic con retry non-bloccante
// Topic principale: ordini-effettuati
// Topic retry 1: ordini-effettuati-retry-1000 (delay 1s)
// Topic retry 2: ordini-effettuati-retry-2000 (delay 2s)
// Topic retry 3: ordini-effettuati-retry-5000 (delay 5s)
// Topic DLQ: ordini-effettuati.DLT
// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;
@Component
public class OrdineConsumerNonBlocking {
@RetryableTopic(
attempts = "4", // 1 tentativo originale + 3 retry
backoff = @Backoff(
delay = 1000,
multiplier = 2.0,
maxDelay = 10000
),
dltTopicSuffix = ".DLT",
retryTopicSuffix = "-retry",
// Non riprova per errori non recuperabili
exclude = {
DeserializationException.class,
PermanentBusinessException.class
}
)
@KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
public void consumeOrdine(ConsumerRecord<String, String> record) {
// Spring Kafka gestisce automaticamente i retry e la DLQ
elaboraOrdine(record.value());
}
// Listener per la DLQ: analisi e alerting
@DltHandler
public void handleDlt(ConsumerRecord<String, String> record,
@Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
record.key(), errorMessage);
// Invia alert, log su monitoring, notifica operatori...
alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
}
}
Wzór 4: Wykrywanie trujących pigułek
Un trująca pigułka to wiadomość, która zawsze powoduje krach konsumencki, niezależnie od liczby ponownych prób. Klasycznym przypadkiem jest błąd deserializacji: wartość wiadomości nie jest w oczekiwanym formacie (uszkodzony JSON, niezgodny schemat Avro).
Ryzyko zatrucia pigułką to nieskończona pętla: konsument ponosi porażkę, nie angażuje się, uruchamia ponownie, czyta tę samą wiadomość, ponownie kończy się niepowodzeniem. Spowoduje to całkowite zablokowanie partycji. Główną obroną jest użycie Obsługa błędówDeserializer co nie rzuca wyjątku ale otacza uszkodzony ładunek w zarządzalnym obiekcie błędu.
// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Ordine> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");
// ErrorHandlingDeserializer wrappa il deserializer originale
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
// Il deserializer "reale" (delegato)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class);
// Target type per la deserializzazione JSON
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
ConsumerFactory<String, Ordine> consumerFactory,
KafkaTemplate<String, Ordine> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
new FixedBackOff(1000L, 3L) // 3 tentativi, 1s di attesa
));
return factory;
}
}
Ponowne przetwarzanie z DLQ
DLQ nie jest stałym koszem na śmieci: jest to miejsce kwarantanny, z którego wysyłane są wiadomości należy je ponownie przetworzyć po rozwiązaniu problemu. Istnieją dwa podejścia:
- Ręczna obróbka: operator sprawdza wiadomości w DLQ, identyfikuje przyczynę, naprawia problem (wdraża poprawkę, przywraca usługę podrzędną), a następnie ponownie wysyła komunikaty w oryginalnym temacie.
- Automatyczne ponowne przetwarzanie: Oddzielny konsument okresowo czyta DLQ i próbuje do ponownego przetwarzania wiadomości, zgodnie z polityką harmonogramu (np. co godzinę, po określonym alercie).
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {
private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
private static final String SOURCE_TOPIC = "ordini-effettuati";
/**
* Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
* Utile dopo aver deployato un fix per un errore specifico.
*/
public void reprocessByErrorType(String targetErrorClass) {
Properties consumerProps = buildConsumerProps("dlq-reprocessor");
Properties producerProps = buildProducerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(List.of(DLQ_TOPIC));
int reprocessed = 0, skipped = 0;
ConsumerRecords<String, String> records;
do {
records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> dlqRecord : records) {
String errorClass = getHeader(dlqRecord, "dlq-error-class");
String originalTopic = getHeader(dlqRecord, "dlq-original-topic");
if (targetErrorClass.equals(errorClass)) {
// Rimanda al topic originale
ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
originalTopic != null ? originalTopic : SOURCE_TOPIC,
dlqRecord.key(),
dlqRecord.value()
);
// Aggiungi header per tracciabilita del reprocessing
reprocessRecord.headers().add("reprocessed-from-dlq",
String.valueOf(System.currentTimeMillis()).getBytes());
producer.send(reprocessRecord);
reprocessed++;
} else {
skipped++;
}
}
consumer.commitSync();
} while (!records.isEmpty());
System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
reprocessed, skipped);
}
}
private String getHeader(ConsumerRecord<String, String> record, String key) {
var header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
Uwaga: Kolejność komunikatów w DLQ
Kiedy wysyłasz wiadomości z DLQ z powrotem do pierwotnego tematu, pierwotna kolejność względna zostaje utracona do innych wiadomości, które zostały już pomyślnie przetworzone. Do zastosowań, w których porządek ma kluczowe znaczenie (np. sekwencyjne aktualizacje stanu), ponowne przetwarzanie musi uwzględniać następujące kwestie: może być konieczne zastosowanie logiki idempotencji u konsumenta do obsługi „stare” wiadomości, które przychodzą po nowszych.
Najlepsze praktyki dotyczące obsługi błędów w Kafce
- Zawsze odróżniaj błędy przejściowe od trwałych: Użyj niestandardowych wyjątków lub wyliczenia typu błędu aby uniknąć bezużytecznych ponownych prób w przypadku błędów, które nigdy nie zostaną naprawione (np. naruszenie schematu).
- DLQ jest obowiązkowe w produkcji: Każdy konsument, który nie ma DLQ, może to zrobić gubisz wiadomości po cichu lub utkniesz w pętli. To nie jest opcjonalne.
-
Monitoruj rozmiar DLQ: Nagromadzenie wiadomości w DLQ jest oznaką problemu.
Dodaj powiadomienie włączone
kafka_consumer_group_partition_lag{topic="*.DLT"}. - Wzbogacaj wiadomości DLQ metadanymi: znacznik czasu niepowodzenia, śledzenie stosu, liczba prób, oryginalny temat i przesunięcie. Bez tego debugowanie danych jest bardzo trudne.
- Nie używaj trybu uśpienia w głównej pętli konsumenckiej: blokuje partycję i powoduje opóźnienie. Użyj wzorca Ponów temat (nieblokującego) lub zgódź się na zatwierdzenie i DLQ.
- Ustaw długie przechowywanie na DLQ: Wiadomości w DLQ wymagają sprawdzenia. Ustaw plik retencji ms na co najmniej 30 dni (lub nawet kompresję, jeśli ponowne przetwarzanie jest idempotentne).
Kolejne kroki w serii
- Artykuł 11 – Kafka w produkcji: zamyka serię z przewodnikiem operacyjnym kompletne w zakresie rozmiaru klastra, optymalnej konfiguracji czynników retencji i replikacji, oraz MirrorMaker 2 do odzyskiwania danych po awarii geograficznej.
Połącz z innymi seriami
- Architektura sterowana zdarzeniami – kolejka niedostarczonych listów w systemach asynchronicznych: ten sam wzorzec DLQ dotyczy SQS, SNS i innych systemów przesyłania wiadomości. Artykuł 708 serii EDA obejmuje DLQ w kontekście AWS z widocznością timeout i maxReceiveCount.
- Semantyka dokładnie raz Kafki (artykuł 4): aby wyeliminować powstałe duplikaty W przypadku ponownych prób transakcyjny interfejs API platformy Kafka umożliwia kompleksową gwarancję dokładnie raz.







