Problem: co dzieje się z wiadomościami, które kończą się niepowodzeniem

Asynchroniczne systemy przesyłania wiadomości, takie jak SQS, SNS, Kafka i EventBridge, używają tego wzorca dostawa przynajmniej raz: wiadomość została dostarczona przynajmniej raz, ale może zostać dostarczony wiele razy (duplikaty w przypadku ponownej próby). Stwarza to dwa krytyczne scenariusze:

  1. Błędy przejściowe: Usługa niższego szczebla jest tymczasowo niedostępna. Automatyczna ponowna próba rozwiązuje problem. Po kilku próbach wiadomość jest przetwarzana poprawnie.
  2. Błędy trwałe (trująca pigułka): wiadomość jest zniekształcona, zawiera dane które naruszają niezmienniki biznesowe lub kod konsumenta zawiera błąd. Ponowna próba nie pomaga: wiadomość będzie nadal nie działać w nieskończoność, potencjalnie zużywając zasoby blokowanie przetwarzania kolejnych wiadomości.

DLQ rozwiązuje drugi scenariusz: po konfigurowalnej liczbie nieudanych prób (maxReceiveCount w SQS, MAX_RETRY_ATTEMPTS w Kafce) wiadomość zostaje przeniesiona do Kolejki Niedostarczonych Listów gdzie można je poddać analizie i ponownemu przetworzeniu w kontrolowany sposób.

DLQ: Umowa o odporności

  • Zerowa utrata danych: Żadne wiadomości nie są odrzucane w trybie cichym
  • Izolacja problemu: Trujące pigułki nie blokują dobrych wiadomości
  • Widoczność: Wiadomości w DLQ można sprawdzić pod kątem debugowania
  • Kontrolowane przetwarzanie: Po rozwiązaniu problemu wiadomości są przetwarzane ponownie

DLQ w Amazon SQS

W SQS DLQ jest po prostu kolejną kolejką SQS skonfigurowaną jako miejsce docelowe komunikatów które przekraczają maxReceiveCount. Mechanizm opiera się na limit czasu widoczności: kiedy konsument otrzymuje wiadomość, staje się ona „niewidoczna” dla innych konsumentów na pewien czas limitu czasu widoczności. Jeżeli w tym terminie nie zostanie ono usunięte (porażka Konsumenta lub upadłość), SQS sprawia, że jest on ponownie widoczny przy kolejnej próbie.

Licznik ApproximateReceiveCount zwiększa się z każdym przyjęciem. Kiedy dotrze maxReceiveCount, SQS przenosi wiadomość do skonfigurowanego DLQ.

# Configurazione DLQ per SQS con Terraform

# 1. Crea la DLQ (stessa tipologia della coda principale)
resource "aws_sqs_queue" "ordini_dlq" {
  name                       = "ordini-queue-dlq"
  message_retention_seconds  = 1209600  # 14 giorni (massimo SQS)
  visibility_timeout_seconds = 300      # 5 minuti per elaborare dalla DLQ

  # CloudWatch alarm sulla DLQ
  tags = {
    Environment = "production"
    Alert       = "critical"
  }
}

# 2. Crea la coda principale con redrive policy che punta alla DLQ
resource "aws_sqs_queue" "ordini" {
  name                       = "ordini-queue"
  visibility_timeout_seconds = 60       # 60s per elaborare ogni messaggio
  receive_wait_time_seconds  = 20       # long polling
  message_retention_seconds  = 345600   # 4 giorni

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.ordini_dlq.arn
    maxReceiveCount     = 3  # 3 tentativi falliti -> DLQ
  })
}

# 3. CloudWatch Alarm: alert quando la DLQ ha messaggi
resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" {
  alarm_name          = "ordini-dlq-not-empty"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = "60"
  statistic           = "Sum"
  threshold           = "0"
  alarm_description   = "CRITICO: messaggi in DLQ ordini"

  dimensions = {
    QueueName = aws_sqs_queue.ordini_dlq.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

Kontrola i ponowne przetwarzanie przez DLQ SQS

// DlqReprocessor.java - Riprocessa messaggi dalla DLQ SQS
import software.amazon.awssdk.services.sqs.*;
import software.amazon.awssdk.services.sqs.model.*;

public class SqsDlqReprocessor {

    private final SqsClient sqsClient;
    private final String dlqUrl;
    private final String mainQueueUrl;

    // Riprocessa tutti i messaggi dalla DLQ verso la coda principale
    public void reprocessAll() {
        int reprocessed = 0;
        List<Message> messages;

        do {
            messages = receiveMessages(dlqUrl, 10);

            for (Message message : messages) {
                try {
                    // Analizza il messaggio per capire il tipo di errore
                    System.out.printf("Riprocesso messaggio: id=%s, receiveCount=%s%n",
                        message.messageId(),
                        message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)
                    );

                    // Rimanda alla coda principale (con delay opzionale)
                    sqsClient.sendMessage(
                        SendMessageRequest.builder()
                            .queueUrl(mainQueueUrl)
                            .messageBody(message.body())
                            .messageAttributes(message.messageAttributes())
                            .delaySeconds(0)
                            .build()
                    );

                    // Elimina dalla DLQ
                    sqsClient.deleteMessage(
                        DeleteMessageRequest.builder()
                            .queueUrl(dlqUrl)
                            .receiptHandle(message.receiptHandle())
                            .build()
                    );

                    reprocessed++;

                } catch (Exception e) {
                    System.err.println("Errore reprocessing: " + e.getMessage());
                    // Non eliminare: rimane in DLQ
                }
            }

        } while (!messages.isEmpty());

        System.out.printf("Reprocessing completato: %d messaggi rimandati%n", reprocessed);
    }

    private List<Message> receiveMessages(String queueUrl, int maxMessages) {
        return sqsClient.receiveMessage(
            ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .maxNumberOfMessages(maxMessages)
                .waitTimeSeconds(5)
                .attributeNames(QueueAttributeName.ALL)
                .build()
        ).messages();
    }
}

DLQ w AWS Lambda: poziom funkcji a poziom kolejki

W AWS Lambda DLQ można skonfigurować na dwóch różnych poziomach z odrębną semantyką:

  • Kolejka SQS DLQ: Skonfigurowane w źródłowej kolejce SQS. Wiadomości zostały przeniesione w DLQ, gdy SQS przekracza maxReceiveCount. To się zdarza Zanim że Lambda jest wywoływana. Jest to zalecana konfiguracja dla Lambda + SQS.
  • Funkcja Lambda DLQ: skonfigurowany na samej Lambdzie (tylko dla wywołań asynchronicznych, nie do mapowania źródła zdarzeń za pomocą SQS). Wychwytuj błędy wywołania Lambda, a nie kolejkę.
# Terraform: Lambda con SQS event source e DLQ configurata sulla coda

resource "aws_lambda_function" "ordini_consumer" {
  function_name = "ordini-consumer"
  handler       = "handler.lambda_handler"
  runtime       = "python3.12"
  role          = aws_iam_role.lambda_role.arn
  timeout       = 30  # 30 secondi per messaggio

  # DLQ a livello di Lambda (solo per invocazioni async dirette)
  dead_letter_config {
    target_arn = aws_sqs_queue.lambda_dlq.arn
  }
}

# SQS come event source per Lambda
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn = aws_sqs_queue.ordini.arn
  function_name    = aws_lambda_function.ordini_consumer.arn
  batch_size       = 10
  enabled          = true

  # Bisection: in caso di errore batch, prova prima con metà messaggi
  # Aiuta a isolare il poison pill senza mandare tutti in DLQ
  bisect_batch_on_function_error = true

  # Report batch item failures: Lambda può indicare quali specifici
  # messaggi nel batch hanno fallito (solo quelli vanno in DLQ)
  function_response_types = ["ReportBatchItemFailures"]
}

ReportBatchItemFailures: szczegółowe DLQ dla partii

// Handler Lambda Python con batch item failures
// Solo i messaggi falliti vanno in DLQ, non l'intero batch

def lambda_handler(event, context):
    """
    ReportBatchItemFailures: ritorna solo i message ID falliti.
    SQS mandrà in DLQ solo quelli, non il batch intero.
    """
    failed_items = []

    for record in event['Records']:
        message_id = record['messageId']
        try:
            # Elabora il messaggio
            payload = json.loads(record['body'])
            process_ordine(payload)
            print(f"Successo: {message_id}")

        except PermanentError as e:
            # Errore permanente: vai in DLQ subito
            print(f"PERMANENTE: {message_id} - {e}")
            failed_items.append({'itemIdentifier': message_id})

        except TransientError as e:
            # Errore transitorio: riprova (non aggiungere a failed)
            # SQS ritenterà l'intero batch se almeno uno fallisce
            # Con ReportBatchItemFailures, solo i falliti vengono ritentati
            print(f"TRANSITORIO: {message_id} - {e}")
            failed_items.append({'itemIdentifier': message_id})

    return {'batchItemFailures': failed_items}

DLQ w EventBridge

EventBridge ma swój własny poziom DLQ cel: jeśli dostarczenie zdarzenia do celu (Lambda, SQS) kończy się niepowodzeniem po wszystkich skonfigurowanych ponownych próbach, zdarzenie jest wysyłane w SQS DLQ określonym w dead_letter_config reguły.

# EventBridge DLQ per target Lambda
resource "aws_cloudwatch_event_target" "ordini_lambda" {
  rule           = aws_cloudwatch_event_rule.ordini.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_lambda_function.ordini_consumer.arn

  # Retry policy di EventBridge: quanti tentativi prima di DLQ
  retry_policy {
    maximum_event_age_in_seconds = 86400  # Riprova per max 24h
    maximum_retry_attempts       = 185    # ~exponential backoff su 24h
  }

  # DLQ per eventi non consegnati
  dead_letter_config {
    arn = aws_sqs_queue.eventbridge_dlq.arn
  }
}

# L'evento in DLQ EventBridge include metadata di debug
# {
#   "version": "1.0",
#   "timestamp": "...",
#   "requestId": "...",
#   "condition": "...",
#   "approximateInvokeCount": 185,
#   "requestParameters": {
#     "FunctionName": "ordini-consumer"
#   },
#   "responseParameters": {
#     "statusCode": 500,
#     "errorCode": "Lambda.ServiceException"
#   },
#   "originalEvent": { ... l'evento originale ... }
# }

Wzorzec zaawansowany: ponów próbę z progresywnym wycofywaniem przy użyciu opóźnienia SQS

SQS pozwala skonfigurować opóźnienie dla pojedynczej wiadomości (do 15 minut). W połączeniu z kolejką FIFO i grupa wiadomości, jest to możliwe do wdrożenia wykładniczy wzorzec ponawiania prób bez blokowania innych wiadomości:

// RetryWithSqsDelay.java - Retry progressivo con SQS message delay
public class SqsExponentialRetry {

    private static final int MAX_ATTEMPTS = 5;
    private static final int MAX_DELAY_SECONDS = 900;  // 15 minuti (max SQS)

    public void handleWithRetry(String queueUrl, Message sqsMessage) {
        // Leggi il numero di tentativi corrente dal message attribute
        int currentAttempt = Integer.parseInt(
            sqsMessage.messageAttributes()
                .getOrDefault("retryAttempt",
                    MessageAttributeValue.builder().stringValue("0").build())
                .stringValue()
        );

        try {
            processMessage(sqsMessage.body());
            // Successo: elimina dalla coda
            sqsClient.deleteMessage(...);

        } catch (TransientException e) {
            if (currentAttempt >= MAX_ATTEMPTS) {
                // Troppi tentativi: manda in DLQ manuale
                sendToManualDLQ(sqsMessage, e);
                sqsClient.deleteMessage(...);
                return;
            }

            // Calcola delay esponenziale (1s, 2s, 4s, 8s, 16s...)
            int delaySeconds = (int) Math.min(
                Math.pow(2, currentAttempt),
                MAX_DELAY_SECONDS
            );

            // Rimanda il messaggio con delay e contatore incrementato
            sqsClient.sendMessage(
                SendMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .messageBody(sqsMessage.body())
                    .delaySeconds(delaySeconds)
                    .messageAttributes(Map.of(
                        "retryAttempt", MessageAttributeValue.builder()
                            .stringValue(String.valueOf(currentAttempt + 1))
                            .dataType("Number")
                            .build()
                    ))
                    .build()
            );

            // Elimina il messaggio originale (non usare la DLQ automatica)
            sqsClient.deleteMessage(...);
        }
    }
}

Monitorowanie DLQ: niezbędne alerty

DLQ musi być aktywnie monitorowane. Komunikat w DLQ wskazuje na prawdziwy problem to wymaga uwagi. Metryki SQS do monitorowania w CloudWatch:

# CloudWatch Metric Alarms per DLQ - AWS CLI

# Alert: qualsiasi messaggio in DLQ (soglia 0)
aws cloudwatch put-metric-alarm \
  --alarm-name "ordini-dlq-not-empty" \
  --alarm-description "CRITICO: messaggi in DLQ ordini" \
  --metric-name "ApproximateNumberOfMessagesVisible" \
  --namespace "AWS/SQS" \
  --dimensions Name=QueueName,Value=ordini-queue-dlq \
  --period 60 \
  --evaluation-periods 1 \
  --statistic Sum \
  --comparison-operator GreaterThanThreshold \
  --threshold 0 \
  --alarm-actions "arn:aws:sns:eu-west-1:123456:alerts-topic"

# Metriche importanti da monitorare su DLQ:
# - ApproximateNumberOfMessagesVisible: messaggi pronti per essere letti
# - ApproximateNumberOfMessagesNotVisible: messaggi in processing
# - NumberOfMessagesSent: rate di arrivo in DLQ (crescita = problema)
# - NumberOfMessagesDeleted: rate di reprocessing

Najlepsze praktyki dotyczące DLQ w systemach asynchronicznych

  • DLQ obowiązkowe dla każdego konsumenta: W produkcji nie ma systemu asynchronicznego bez DLQ. W przypadku ich braku komunikaty zakończone niepowodzeniem zostaną utracone lub blokują przepływ.
  • Monitoruj DLQ za pomocą alertów o zerowym progu: dowolna wiadomość w DLQ to oznaka problemu. Nie czekaj, aż zgromadzą się setki, zanim zareagujesz.
  • Wzbogacaj wiadomości DLQ metadanymi: dodaj nagłówki lub atrybuty z typem błędu, śledzeniem stosu, liczbą ponownych prób i sygnaturą czasową niepowodzenia. Bez tego debugowanie danych jest prawie niemożliwe.
  • Długie przechowywanie na DLQ: skonfiguruj przechowywanie przez 14 dni (maksymalny SQS) lub co najmniej 30 dni na platformie Kafka. Wiadomości w DLQ muszą być dostępne do odroczonej analizy.
  • Regularnie testuj przygotowanie do ponownego użycia: DLQ jest bezużyteczne, jeśli nie wiesz jak ponownie przetwarzać wiadomości. Udokumentuj i przetestuj proces ponownego przetwarzania.
  • Oddzielne typy błędów: Rozważ oddzielne DLQ dla trwałych błędów (uszkodzone ładunki) i błędy przejściowe (usługi nie działają). Strategia ponownego przetwarzania jest inna.

Anty-wzorzec: Zignoruj ​​DLQ

Najbardziej niebezpiecznym wzorcem jest skonfigurowanie DLQ, a następnie zaprzestanie jego monitorowania. Wiadomości gromadzą się w milczeniu tygodniami, a potem ktoś to zauważa brakuje krytycznych danych. Zawsze ustawiaj ostrzeżenie w DLQ: to sieć bezpieczeństwa których nigdy nie należy ignorować.

Kolejne kroki w serii

  • Artykuł 9 – Idempotencja u konsumentów: ponowne próby i ponowne przetwarzanie z DLQ może powodować duplikowanie wiadomości. Główną obroną jest wzór klucza idempotencji.
  • Artykuł 10 – Wzór skrzynki nadawczej: upewnij się, że wydarzenie zostało opublikowane zawsze się to zdarza, nawet w przypadku awarii producenta, przy użyciu tabeli skrzynek nadawczych w bazie danych.

Połącz z innymi seriami

  • SQS vs SNS vs EventBridge (artykuł 7): Każda usługa AWS ma swoją własną semantyka DLQ i ponownej próby. W tym artykule omówiono różnice konfiguracyjne między usługami.
  • Kolejka niedostarczonych listów Kafka (artykuł 10 serii Kafka): wzór DLQ w Kafce z grupą konsumencką spróbuj ponownie tematu i przetworzenia go ma te same podstawy koncepcyjne, ale inna implementacja niż SQS.