Problema erorii în Kafka

Spre deosebire de RabbitMQ sau SQS, unde mesajul este re-codificat automat în caz de eroare, Kafka are o semantică diferită: consumatorul comite compensații în mod explicit. Dacă consumatorul nu se angajează și repornește, va citi din nou aceleași mesaje. Acest lucru creează un risc real: un mesaj malformat (pilula otravitoare) poate bloca întregul grup de consumatori în buclă infinită, împiedicând procesarea mesajelor ulterioare din aceeași partiție.

Cele trei scenarii de eroare principale de tratat în Kafka sunt:

  • Erori tranzitorii: Serviciul în aval indisponibil temporar, rețea expiră — reîncercați are sens
  • Erori permanente: mesaj incorect, încălcarea regulilor de afaceri nerecuperabile — reîncercarea este inutilă
  • Erori de deserializare: Schema de încărcare utilă s-a schimbat într-un mod incompatibil - scenariul pilulei otrăvitoare

Semantica livrării și managementul erorilor

  • Cel mult-o dată: comite înainte de procesare, mesaje pierdute la blocare — nu utilizați niciodată în producție
  • Măcar o dată: se comite după procesarea cu succes, posibile duplicate la reîncercare — standard
  • Exact-o dată: Necesită tranzacții de consumator idempotent + Kafka — pentru cazuri de utilizare critice

Model 1: Coadă de scrisori moarte (DLQ)

La Coada de scrisori moarte este un subiect Kafka separat unde sunt trimise mesaje care nu pot fi procesate după un număr maxim de încercări. În loc să blocăm consumatorul sau să pierdem mesajul, îl trecem la un subiect de carantină pentru analize manuale sau reprocesări viitoare.

Convenția standard de denumire este {topic-originale}.DLT o {topic-originale}-dlq. Mesajul din DLQ ar trebui să includă mesajul original plus metadate despre eroare (stacktrace, număr de încercări, timestamp eșec) prin anteturile Kafka.

// KafkaDLQHandler.java - Pattern base per Dead Letter Queue
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Headers;

import java.time.Duration;
import java.util.*;

public class KafkaDLQHandler {

    private static final String SOURCE_TOPIC = "ordini-effettuati";
    private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
    private static final int MAX_RETRY_ATTEMPTS = 3;

    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> producer;
    private final Map<String, Integer> retryCount = new HashMap<>();

    public KafkaDLQHandler(String bootstrapServers) {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", bootstrapServers);
        consumerProps.put("group.id", "servizio-inventario-dlq");
        consumerProps.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("enable.auto.commit", false);
        consumerProps.put("auto.offset.reset", "earliest");
        this.consumer = new KafkaConsumer<>(consumerProps);

        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", bootstrapServers);
        producerProps.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("acks", "all");
        this.producer = new KafkaProducer<>(producerProps);
    }

    public void processWithDLQ() {
        consumer.subscribe(List.of(SOURCE_TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                String messageKey = record.topic() + "-" + record.partition() + "-" + record.offset();

                try {
                    // Tenta l'elaborazione del messaggio
                    elaboraOrdine(record.value());

                    // Successo: rimuovi dal contatore retry e fai commit
                    retryCount.remove(messageKey);
                    consumer.commitSync();

                } catch (RecoverableException e) {
                    // Errore transitorio: incrementa contatore
                    int attempts = retryCount.getOrDefault(messageKey, 0) + 1;
                    retryCount.put(messageKey, attempts);

                    if (attempts >= MAX_RETRY_ATTEMPTS) {
                        // Troppi tentativi: manda in DLQ
                        sendToDLQ(record, e, attempts);
                        retryCount.remove(messageKey);
                        consumer.commitSync();
                    } else {
                        // Ritenta: non fare commit, il messaggio verra riletto
                        System.err.println("Tentativo " + attempts + "/" + MAX_RETRY_ATTEMPTS +
                            " fallito per offset " + record.offset() + ": " + e.getMessage());
                        sleep(calculateBackoff(attempts));
                    }

                } catch (PermanentException e) {
                    // Errore permanente: va direttamente in DLQ senza retry
                    sendToDLQ(record, e, 1);
                    consumer.commitSync();
                }
            }
        }
    }

    private void sendToDLQ(ConsumerRecord<String, String> originalRecord,
                           Exception error, int attempts) {
        ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
            DLQ_TOPIC,
            originalRecord.key(),
            originalRecord.value()
        );

        // Arricchisci con headers per il debugging
        Headers headers = dlqRecord.headers();
        headers.add("dlq-original-topic", originalRecord.topic().getBytes());
        headers.add("dlq-original-partition",
            String.valueOf(originalRecord.partition()).getBytes());
        headers.add("dlq-original-offset",
            String.valueOf(originalRecord.offset()).getBytes());
        headers.add("dlq-error-message", error.getMessage().getBytes());
        headers.add("dlq-error-class", error.getClass().getName().getBytes());
        headers.add("dlq-retry-count", String.valueOf(attempts).getBytes());
        headers.add("dlq-failed-at",
            String.valueOf(System.currentTimeMillis()).getBytes());

        // Copia anche gli header originali
        originalRecord.headers().forEach(h ->
            headers.add("original-" + h.key(), h.value()));

        producer.send(dlqRecord, (metadata, ex) -> {
            if (ex != null) {
                System.err.println("CRITICO: impossibile scrivere in DLQ: " + ex.getMessage());
            } else {
                System.out.printf("Messaggio inviato a DLQ: topic=%s, offset=%d, errore=%s%n",
                    DLQ_TOPIC, metadata.offset(), error.getMessage());
            }
        });
    }

    private long calculateBackoff(int attempt) {
        // Exponential backoff: 1s, 2s, 4s, 8s, ...
        return (long) Math.min(Math.pow(2, attempt - 1) * 1000, 30000);
    }

    private void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

Model 2: Reîncercați cu Backoff exponențial

Il reîncercați cu backoff exponențial este modelul standard pentru tratarea erorilor tranzitorii: fiecare încercare eșuată crește timpul de așteptare înainte de următoarea încercare, evitând supraîncărcarea serviciul din aval aflat deja în dificultate. Adăuga frământare (zgomot aleatoriu) la backoff evită problema de turmă tunătoare: toți consumatorii repornesc simultan la secunda exactă.

// RetryWithBackoff.java - Strategia di retry con exponential backoff + jitter
import java.util.Random;
import java.util.function.Supplier;

public class RetryWithBackoff {

    private static final Random random = new Random();

    /**
     * Esegue l'operazione con retry esponenziale + jitter.
     *
     * @param operation  La lambda da eseguire
     * @param maxRetries Numero massimo di tentativi
     * @param baseDelayMs Delay base in millisecondi (default: 1000ms)
     * @param maxDelayMs Delay massimo in millisecondi (default: 30000ms)
     */
    public static <T> T execute(Supplier<T> operation,
                                 int maxRetries,
                                 long baseDelayMs,
                                 long maxDelayMs) throws Exception {
        int attempt = 0;
        Exception lastException = null;

        while (attempt < maxRetries) {
            try {
                return operation.get();
            } catch (RetryableException e) {
                lastException = e;
                attempt++;

                if (attempt >= maxRetries) {
                    throw new MaxRetriesExceededException(
                        "Superato il numero massimo di tentativi: " + maxRetries, e);
                }

                // Calcola delay con full jitter
                long delay = calculateJitteredDelay(attempt, baseDelayMs, maxDelayMs);
                System.err.printf("Tentativo %d/%d fallito. Prossimo retry tra %dms%n",
                    attempt, maxRetries, delay);

                Thread.sleep(delay);
            }
        }

        throw new MaxRetriesExceededException("Nessun tentativo riuscito", lastException);
    }

    /**
     * Full jitter: delay casuale tra 0 e il backoff esponenziale.
     * Evita il thundering herd distribuendo i retry nel tempo.
     */
    private static long calculateJitteredDelay(int attempt, long baseDelay, long maxDelay) {
        long exponentialDelay = (long) Math.min(Math.pow(2, attempt) * baseDelay, maxDelay);
        // Full jitter: random tra 0 e exponentialDelay
        return (long) (random.nextDouble() * exponentialDelay);
    }

    // Eccezioni custom per distinguere errori recuperabili da permanenti
    public static class RetryableException extends RuntimeException {
        public RetryableException(String message, Throwable cause) { super(message, cause); }
    }

    public static class MaxRetriesExceededException extends Exception {
        public MaxRetriesExceededException(String message, Throwable cause) { super(message, cause); }
    }
}

// Utilizzo nel consumer Kafka
RetryWithBackoff.execute(
    () -> {
        chiamataServizioEsterno(record.value());
        return null;
    },
    maxRetries: 3,
    baseDelayMs: 1000,
    maxDelayMs: 30000
);

Model 3: Reîncercați subiectul (Reîncercare fără blocare)

Problema cu abordarea somnului la consumator este că blocați întreaga partiție: în timp ce așteptăm reîncercarea, nu sunt procesate alte mesaje din aceeași partiție, determinând creșterea decalajului consumatorului.

Modelul Reîncercați subiectul (o Reîncercați fără blocare) rezolvă această problemă: în loc să blocheze partiția, mesajul eșuat este mutat într-un subiect separat de reîncercare cu o întârziere configurată. Consumatorul principal continuă să proceseze mesaje noi. Spring Kafka 2.7+ implementează acest model în mod nativ.

// Struttura dei topic con retry non-bloccante
// Topic principale:   ordini-effettuati
// Topic retry 1:      ordini-effettuati-retry-1000    (delay 1s)
// Topic retry 2:      ordini-effettuati-retry-2000    (delay 2s)
// Topic retry 3:      ordini-effettuati-retry-5000    (delay 5s)
// Topic DLQ:          ordini-effettuati.DLT

// Con Spring Kafka @RetryableTopic
import org.springframework.kafka.annotation.*;
import org.springframework.kafka.retrytopic.*;

@Component
public class OrdineConsumerNonBlocking {

    @RetryableTopic(
        attempts = "4",              // 1 tentativo originale + 3 retry
        backoff = @Backoff(
            delay = 1000,
            multiplier = 2.0,
            maxDelay = 10000
        ),
        dltTopicSuffix = ".DLT",
        retryTopicSuffix = "-retry",
        // Non riprova per errori non recuperabili
        exclude = {
            DeserializationException.class,
            PermanentBusinessException.class
        }
    )
    @KafkaListener(topics = "ordini-effettuati", groupId = "servizio-inventario")
    public void consumeOrdine(ConsumerRecord<String, String> record) {
        // Spring Kafka gestisce automaticamente i retry e la DLQ
        elaboraOrdine(record.value());
    }

    // Listener per la DLQ: analisi e alerting
    @DltHandler
    public void handleDlt(ConsumerRecord<String, String> record,
                          @Header KafkaHeaders.DLT_EXCEPTION_MESSAGE String errorMessage) {
        System.err.printf("Messaggio in DLQ: key=%s, errore=%s%n",
            record.key(), errorMessage);
        // Invia alert, log su monitoring, notifica operatori...
        alertingService.sendDLQAlert(record.topic(), record.key(), errorMessage);
    }
}

Modelul 4: Detectarea pilulelor otrăvitoare

Un pilula otravitoare este un mesaj care provoacă întotdeauna un accident de consum, indiferent de numărul de reîncercări. Cazul clasic este o eroare de deserializare: valoarea mesajului nu este în formatul așteptat (JSON corupt, schemă Avro incompatibilă).

Riscul pilulei otrăvitoare este bucla infinită: consumatorul eșuează, nu se comite, repornește, citește același mesaj, eșuează din nou. Aceasta blochează complet partiția. Principala apărare este folosirea a ErrorHandlingDeserializator care nu face excepție dar învelește sarcina coruptă într-un obiect de eroare gestionabil.

// Configurazione con ErrorHandlingDeserializer (Spring Kafka)
// Questo deserializer cattura l'eccezione invece di propagarla,
// permettendo al consumer di mandare il messaggio in DLQ

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Ordine> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "servizio-inventario");

        // ErrorHandlingDeserializer wrappa il deserializer originale
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            ErrorHandlingDeserializer.class);

        // Il deserializer "reale" (delegato)
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
            StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
            JsonDeserializer.class);

        // Target type per la deserializzazione JSON
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.Ordine");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Ordine> kafkaListenerContainerFactory(
        ConsumerFactory<String, Ordine> consumerFactory,
        KafkaTemplate<String, Ordine> kafkaTemplate) {

        ConcurrentKafkaListenerContainerFactory<String, Ordine> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        // DeadLetterPublishingRecoverer: invia automaticamente in DLQ su DeserializationException
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> new TopicPartition(record.topic() + ".DLT", -1)),
            new FixedBackOff(1000L, 3L)  // 3 tentativi, 1s di attesa
        ));

        return factory;
    }
}

Reprocesare din DLQ

Un DLQ nu este un coș de gunoi permanent: este un spațiu de carantină din care sunt trimise mesajele acestea trebuie reprocesate după rezolvarea problemei. Există două abordări:

  • Reprocesare manuală: un operator inspectează mesajele din DLQ, identifică cauza, corectează problema (instalează o remediere, restabilește serviciul din aval), apoi retrimite mesajele în subiectul original.
  • Reprocesare automată: Un consumator separat citește periodic DLQ și încearcă să reproceseze mesajele, cu o politică de programare (de exemplu, la fiecare oră, după o anumită alertă).
// DLQReprocessor.java - Consumer della DLQ per reprocessing selettivo
public class DLQReprocessor {

    private static final String DLQ_TOPIC = "ordini-effettuati.DLT";
    private static final String SOURCE_TOPIC = "ordini-effettuati";

    /**
     * Rimanda in produzione i messaggi dalla DLQ filtrando per tipo di errore.
     * Utile dopo aver deployato un fix per un errore specifico.
     */
    public void reprocessByErrorType(String targetErrorClass) {
        Properties consumerProps = buildConsumerProps("dlq-reprocessor");
        Properties producerProps = buildProducerProps();

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
             KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {

            consumer.subscribe(List.of(DLQ_TOPIC));

            int reprocessed = 0, skipped = 0;
            ConsumerRecords<String, String> records;

            do {
                records = consumer.poll(Duration.ofSeconds(5));

                for (ConsumerRecord<String, String> dlqRecord : records) {
                    String errorClass = getHeader(dlqRecord, "dlq-error-class");
                    String originalTopic = getHeader(dlqRecord, "dlq-original-topic");

                    if (targetErrorClass.equals(errorClass)) {
                        // Rimanda al topic originale
                        ProducerRecord<String, String> reprocessRecord = new ProducerRecord<>(
                            originalTopic != null ? originalTopic : SOURCE_TOPIC,
                            dlqRecord.key(),
                            dlqRecord.value()
                        );
                        // Aggiungi header per tracciabilita del reprocessing
                        reprocessRecord.headers().add("reprocessed-from-dlq",
                            String.valueOf(System.currentTimeMillis()).getBytes());

                        producer.send(reprocessRecord);
                        reprocessed++;
                    } else {
                        skipped++;
                    }
                }

                consumer.commitSync();

            } while (!records.isEmpty());

            System.out.printf("Reprocessing completato: %d riprocessati, %d saltati%n",
                reprocessed, skipped);
        }
    }

    private String getHeader(ConsumerRecord<String, String> record, String key) {
        var header = record.headers().lastHeader(key);
        return header != null ? new String(header.value()) : null;
    }
}

Atenție: Ordinea mesajelor în DLQ

Când trimiteți mesaje din DLQ înapoi la subiectul inițial, ordinea relativă inițială se pierde la alte mesaje deja procesate cu succes. Pentru cazurile de utilizare în care ordinea este critică (de exemplu, actualizări secvențiale de stare), reprocesarea trebuie să țină cont de acest lucru: s-ar putea să fie nevoie să aplicați o logică de idempotnță în consumator pentru a le gestiona mesaje „vechi” care ajung după cele mai noi.

Cele mai bune practici pentru tratarea erorilor în Kafka

  • Deosebiți întotdeauna erorile tranzitorii de cele permanente: Folosiți excepții personalizate sau un tip de eroare enumerare pentru a evita reîncercări inutile asupra erorilor care nu se vor vindeca niciodată (de exemplu, încălcarea schemei).
  • DLQ este obligatoriu în producție: Orice consumator care nu are un DLQ poate pierdeți mesajele în tăcere sau rămâneți blocați în bucle. Nu este optional.
  • Monitorizați dimensiunea DLQ: O acumulare de mesaje în DLQ este un semn al unei probleme. Adăugați alertă activată kafka_consumer_group_partition_lag{topic="*.DLT"}.
  • Îmbogățiți mesajele DLQ cu metadate: marcaj de timp al eșecului, stacktrace, număr de încercări, subiect original și offset. Fără aceste date, depanarea este foarte dificilă.
  • Nu utilizați somnul în bucla principală a consumatorului: blochează partiția și provoacă întârziere. Folosiți modelul Reîncercați subiectul (neblocare) sau acceptați comiterea și DLQ.
  • Setați o retenție lungă pe DLQ: Mesajele din DLQ trebuie inspectate. Setați retention.ms la cel puțin 30 de zile (sau chiar compactarea dacă reprocesarea este idempotentă).

Următorii pași în serie

  • Articolul 11 ​​– Kafka în producție: închide seria cu ghidul operațional complet pentru dimensionarea clusterului, retenția optimă și configurarea factorului de replicare, și MirrorMaker 2 pentru recuperarea în caz de dezastru geografic.

Legătură cu alte serii

  • Arhitectură bazată pe evenimente – Coadă cu scrisori moarte în sisteme asincrone: același model DLQ se aplică SQS, SNS și altor sisteme de mesagerie. Articolul 708 din seria EDA acoperă DLQ în contextul AWS cu timeout și vizibilitate maxReceiveCount.
  • Kafka Exact-Once Semantics (articolul 4): pentru a elimina duplicatele cauzate Din reîncercări, API-ul tranzacțional Kafka permite garanții end-to-end exact o dată.