Kolejka niedostarczonych listów i odporność w systemach asynchronicznych
W każdym systemie asynchronicznym komunikaty kończą się niepowodzeniem. Zniekształcony ładunek, nieosiągalna usługa podrzędna, błąd w kodzie konsumenta — bez mechanizmu bezpieczeństwa wiadomości te zostaną utracone lub zablokować przetwarzanie kolejnych. Tam Kolejka martwych listów to ten mechanizm: przechwytuje nieprzetworzone wiadomości, izoluje je w osobnej kolejce i umożliwia analizę i ponowne przetwarzanie selektywny, bez przerywania głównego przepływu.
Problem: co dzieje się z wiadomościami, które kończą się niepowodzeniem
Asynchroniczne systemy przesyłania wiadomości, takie jak SQS, SNS, Kafka i EventBridge, używają tego wzorca dostawa przynajmniej raz: wiadomość została dostarczona przynajmniej raz, ale może zostać dostarczony wiele razy (duplikaty w przypadku ponownej próby). Stwarza to dwa krytyczne scenariusze:
- Błędy przejściowe: Usługa niższego szczebla jest tymczasowo niedostępna. Automatyczna ponowna próba rozwiązuje problem. Po kilku próbach wiadomość jest przetwarzana poprawnie.
- Błędy trwałe (trująca pigułka): wiadomość jest zniekształcona, zawiera dane które naruszają niezmienniki biznesowe lub kod konsumenta zawiera błąd. Ponowna próba nie pomaga: wiadomość będzie nadal nie działać w nieskończoność, potencjalnie zużywając zasoby blokowanie przetwarzania kolejnych wiadomości.
DLQ rozwiązuje drugi scenariusz: po konfigurowalnej liczbie nieudanych prób (maxReceiveCount w SQS,
MAX_RETRY_ATTEMPTS w Kafce) wiadomość zostaje przeniesiona do Kolejki Niedostarczonych Listów
gdzie można je poddać analizie i ponownemu przetworzeniu w kontrolowany sposób.
DLQ: Umowa o odporności
- Zerowa utrata danych: Żadne wiadomości nie są odrzucane w trybie cichym
- Izolacja problemu: Trujące pigułki nie blokują dobrych wiadomości
- Widoczność: Wiadomości w DLQ można sprawdzić pod kątem debugowania
- Kontrolowane przetwarzanie: Po rozwiązaniu problemu wiadomości są przetwarzane ponownie
DLQ w Amazon SQS
W SQS DLQ jest po prostu kolejną kolejką SQS skonfigurowaną jako miejsce docelowe komunikatów
które przekraczają maxReceiveCount. Mechanizm opiera się na limit czasu widoczności:
kiedy konsument otrzymuje wiadomość, staje się ona „niewidoczna” dla innych konsumentów na pewien czas
limitu czasu widoczności. Jeżeli w tym terminie nie zostanie ono usunięte (porażka Konsumenta lub upadłość),
SQS sprawia, że jest on ponownie widoczny przy kolejnej próbie.
Licznik ApproximateReceiveCount zwiększa się z każdym przyjęciem.
Kiedy dotrze maxReceiveCount, SQS przenosi wiadomość do skonfigurowanego DLQ.
# Configurazione DLQ per SQS con Terraform
# 1. Crea la DLQ (stessa tipologia della coda principale)
resource "aws_sqs_queue" "ordini_dlq" {
name = "ordini-queue-dlq"
message_retention_seconds = 1209600 # 14 giorni (massimo SQS)
visibility_timeout_seconds = 300 # 5 minuti per elaborare dalla DLQ
# CloudWatch alarm sulla DLQ
tags = {
Environment = "production"
Alert = "critical"
}
}
# 2. Crea la coda principale con redrive policy che punta alla DLQ
resource "aws_sqs_queue" "ordini" {
name = "ordini-queue"
visibility_timeout_seconds = 60 # 60s per elaborare ogni messaggio
receive_wait_time_seconds = 20 # long polling
message_retention_seconds = 345600 # 4 giorni
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.ordini_dlq.arn
maxReceiveCount = 3 # 3 tentativi falliti -> DLQ
})
}
# 3. CloudWatch Alarm: alert quando la DLQ ha messaggi
resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" {
alarm_name = "ordini-dlq-not-empty"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = "60"
statistic = "Sum"
threshold = "0"
alarm_description = "CRITICO: messaggi in DLQ ordini"
dimensions = {
QueueName = aws_sqs_queue.ordini_dlq.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
Kontrola i ponowne przetwarzanie przez DLQ SQS
// DlqReprocessor.java - Riprocessa messaggi dalla DLQ SQS
import software.amazon.awssdk.services.sqs.*;
import software.amazon.awssdk.services.sqs.model.*;
public class SqsDlqReprocessor {
private final SqsClient sqsClient;
private final String dlqUrl;
private final String mainQueueUrl;
// Riprocessa tutti i messaggi dalla DLQ verso la coda principale
public void reprocessAll() {
int reprocessed = 0;
List<Message> messages;
do {
messages = receiveMessages(dlqUrl, 10);
for (Message message : messages) {
try {
// Analizza il messaggio per capire il tipo di errore
System.out.printf("Riprocesso messaggio: id=%s, receiveCount=%s%n",
message.messageId(),
message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)
);
// Rimanda alla coda principale (con delay opzionale)
sqsClient.sendMessage(
SendMessageRequest.builder()
.queueUrl(mainQueueUrl)
.messageBody(message.body())
.messageAttributes(message.messageAttributes())
.delaySeconds(0)
.build()
);
// Elimina dalla DLQ
sqsClient.deleteMessage(
DeleteMessageRequest.builder()
.queueUrl(dlqUrl)
.receiptHandle(message.receiptHandle())
.build()
);
reprocessed++;
} catch (Exception e) {
System.err.println("Errore reprocessing: " + e.getMessage());
// Non eliminare: rimane in DLQ
}
}
} while (!messages.isEmpty());
System.out.printf("Reprocessing completato: %d messaggi rimandati%n", reprocessed);
}
private List<Message> receiveMessages(String queueUrl, int maxMessages) {
return sqsClient.receiveMessage(
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(maxMessages)
.waitTimeSeconds(5)
.attributeNames(QueueAttributeName.ALL)
.build()
).messages();
}
}
DLQ w AWS Lambda: poziom funkcji a poziom kolejki
W AWS Lambda DLQ można skonfigurować na dwóch różnych poziomach z odrębną semantyką:
-
Kolejka SQS DLQ: Skonfigurowane w źródłowej kolejce SQS. Wiadomości zostały przeniesione
w DLQ, gdy SQS przekracza
maxReceiveCount. To się zdarza Zanim że Lambda jest wywoływana. Jest to zalecana konfiguracja dla Lambda + SQS. - Funkcja Lambda DLQ: skonfigurowany na samej Lambdzie (tylko dla wywołań asynchronicznych, nie do mapowania źródła zdarzeń za pomocą SQS). Wychwytuj błędy wywołania Lambda, a nie kolejkę.
# Terraform: Lambda con SQS event source e DLQ configurata sulla coda
resource "aws_lambda_function" "ordini_consumer" {
function_name = "ordini-consumer"
handler = "handler.lambda_handler"
runtime = "python3.12"
role = aws_iam_role.lambda_role.arn
timeout = 30 # 30 secondi per messaggio
# DLQ a livello di Lambda (solo per invocazioni async dirette)
dead_letter_config {
target_arn = aws_sqs_queue.lambda_dlq.arn
}
}
# SQS come event source per Lambda
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.ordini.arn
function_name = aws_lambda_function.ordini_consumer.arn
batch_size = 10
enabled = true
# Bisection: in caso di errore batch, prova prima con metà messaggi
# Aiuta a isolare il poison pill senza mandare tutti in DLQ
bisect_batch_on_function_error = true
# Report batch item failures: Lambda può indicare quali specifici
# messaggi nel batch hanno fallito (solo quelli vanno in DLQ)
function_response_types = ["ReportBatchItemFailures"]
}
ReportBatchItemFailures: szczegółowe DLQ dla partii
// Handler Lambda Python con batch item failures
// Solo i messaggi falliti vanno in DLQ, non l'intero batch
def lambda_handler(event, context):
"""
ReportBatchItemFailures: ritorna solo i message ID falliti.
SQS mandrà in DLQ solo quelli, non il batch intero.
"""
failed_items = []
for record in event['Records']:
message_id = record['messageId']
try:
# Elabora il messaggio
payload = json.loads(record['body'])
process_ordine(payload)
print(f"Successo: {message_id}")
except PermanentError as e:
# Errore permanente: vai in DLQ subito
print(f"PERMANENTE: {message_id} - {e}")
failed_items.append({'itemIdentifier': message_id})
except TransientError as e:
# Errore transitorio: riprova (non aggiungere a failed)
# SQS ritenterà l'intero batch se almeno uno fallisce
# Con ReportBatchItemFailures, solo i falliti vengono ritentati
print(f"TRANSITORIO: {message_id} - {e}")
failed_items.append({'itemIdentifier': message_id})
return {'batchItemFailures': failed_items}
DLQ w EventBridge
EventBridge ma swój własny poziom DLQ cel: jeśli dostarczenie zdarzenia
do celu (Lambda, SQS) kończy się niepowodzeniem po wszystkich skonfigurowanych ponownych próbach, zdarzenie jest wysyłane
w SQS DLQ określonym w dead_letter_config reguły.
# EventBridge DLQ per target Lambda
resource "aws_cloudwatch_event_target" "ordini_lambda" {
rule = aws_cloudwatch_event_rule.ordini.name
event_bus_name = aws_cloudwatch_event_bus.mioapp.name
arn = aws_lambda_function.ordini_consumer.arn
# Retry policy di EventBridge: quanti tentativi prima di DLQ
retry_policy {
maximum_event_age_in_seconds = 86400 # Riprova per max 24h
maximum_retry_attempts = 185 # ~exponential backoff su 24h
}
# DLQ per eventi non consegnati
dead_letter_config {
arn = aws_sqs_queue.eventbridge_dlq.arn
}
}
# L'evento in DLQ EventBridge include metadata di debug
# {
# "version": "1.0",
# "timestamp": "...",
# "requestId": "...",
# "condition": "...",
# "approximateInvokeCount": 185,
# "requestParameters": {
# "FunctionName": "ordini-consumer"
# },
# "responseParameters": {
# "statusCode": 500,
# "errorCode": "Lambda.ServiceException"
# },
# "originalEvent": { ... l'evento originale ... }
# }
Wzorzec zaawansowany: ponów próbę z progresywnym wycofywaniem przy użyciu opóźnienia SQS
SQS pozwala skonfigurować opóźnienie dla pojedynczej wiadomości (do 15 minut). W połączeniu z kolejką FIFO i grupa wiadomości, jest to możliwe do wdrożenia wykładniczy wzorzec ponawiania prób bez blokowania innych wiadomości:
// RetryWithSqsDelay.java - Retry progressivo con SQS message delay
public class SqsExponentialRetry {
private static final int MAX_ATTEMPTS = 5;
private static final int MAX_DELAY_SECONDS = 900; // 15 minuti (max SQS)
public void handleWithRetry(String queueUrl, Message sqsMessage) {
// Leggi il numero di tentativi corrente dal message attribute
int currentAttempt = Integer.parseInt(
sqsMessage.messageAttributes()
.getOrDefault("retryAttempt",
MessageAttributeValue.builder().stringValue("0").build())
.stringValue()
);
try {
processMessage(sqsMessage.body());
// Successo: elimina dalla coda
sqsClient.deleteMessage(...);
} catch (TransientException e) {
if (currentAttempt >= MAX_ATTEMPTS) {
// Troppi tentativi: manda in DLQ manuale
sendToManualDLQ(sqsMessage, e);
sqsClient.deleteMessage(...);
return;
}
// Calcola delay esponenziale (1s, 2s, 4s, 8s, 16s...)
int delaySeconds = (int) Math.min(
Math.pow(2, currentAttempt),
MAX_DELAY_SECONDS
);
// Rimanda il messaggio con delay e contatore incrementato
sqsClient.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(sqsMessage.body())
.delaySeconds(delaySeconds)
.messageAttributes(Map.of(
"retryAttempt", MessageAttributeValue.builder()
.stringValue(String.valueOf(currentAttempt + 1))
.dataType("Number")
.build()
))
.build()
);
// Elimina il messaggio originale (non usare la DLQ automatica)
sqsClient.deleteMessage(...);
}
}
}
Monitorowanie DLQ: niezbędne alerty
DLQ musi być aktywnie monitorowane. Komunikat w DLQ wskazuje na prawdziwy problem to wymaga uwagi. Metryki SQS do monitorowania w CloudWatch:
# CloudWatch Metric Alarms per DLQ - AWS CLI
# Alert: qualsiasi messaggio in DLQ (soglia 0)
aws cloudwatch put-metric-alarm \
--alarm-name "ordini-dlq-not-empty" \
--alarm-description "CRITICO: messaggi in DLQ ordini" \
--metric-name "ApproximateNumberOfMessagesVisible" \
--namespace "AWS/SQS" \
--dimensions Name=QueueName,Value=ordini-queue-dlq \
--period 60 \
--evaluation-periods 1 \
--statistic Sum \
--comparison-operator GreaterThanThreshold \
--threshold 0 \
--alarm-actions "arn:aws:sns:eu-west-1:123456:alerts-topic"
# Metriche importanti da monitorare su DLQ:
# - ApproximateNumberOfMessagesVisible: messaggi pronti per essere letti
# - ApproximateNumberOfMessagesNotVisible: messaggi in processing
# - NumberOfMessagesSent: rate di arrivo in DLQ (crescita = problema)
# - NumberOfMessagesDeleted: rate di reprocessing
Najlepsze praktyki dotyczące DLQ w systemach asynchronicznych
- DLQ obowiązkowe dla każdego konsumenta: W produkcji nie ma systemu asynchronicznego bez DLQ. W przypadku ich braku komunikaty zakończone niepowodzeniem zostaną utracone lub blokują przepływ.
- Monitoruj DLQ za pomocą alertów o zerowym progu: dowolna wiadomość w DLQ to oznaka problemu. Nie czekaj, aż zgromadzą się setki, zanim zareagujesz.
- Wzbogacaj wiadomości DLQ metadanymi: dodaj nagłówki lub atrybuty z typem błędu, śledzeniem stosu, liczbą ponownych prób i sygnaturą czasową niepowodzenia. Bez tego debugowanie danych jest prawie niemożliwe.
- Długie przechowywanie na DLQ: skonfiguruj przechowywanie przez 14 dni (maksymalny SQS) lub co najmniej 30 dni na platformie Kafka. Wiadomości w DLQ muszą być dostępne do odroczonej analizy.
- Regularnie testuj przygotowanie do ponownego użycia: DLQ jest bezużyteczne, jeśli nie wiesz jak ponownie przetwarzać wiadomości. Udokumentuj i przetestuj proces ponownego przetwarzania.
- Oddzielne typy błędów: Rozważ oddzielne DLQ dla trwałych błędów (uszkodzone ładunki) i błędy przejściowe (usługi nie działają). Strategia ponownego przetwarzania jest inna.
Anty-wzorzec: Zignoruj DLQ
Najbardziej niebezpiecznym wzorcem jest skonfigurowanie DLQ, a następnie zaprzestanie jego monitorowania. Wiadomości gromadzą się w milczeniu tygodniami, a potem ktoś to zauważa brakuje krytycznych danych. Zawsze ustawiaj ostrzeżenie w DLQ: to sieć bezpieczeństwa których nigdy nie należy ignorować.
Kolejne kroki w serii
- Artykuł 9 – Idempotencja u konsumentów: ponowne próby i ponowne przetwarzanie z DLQ może powodować duplikowanie wiadomości. Główną obroną jest wzór klucza idempotencji.
- Artykuł 10 – Wzór skrzynki nadawczej: upewnij się, że wydarzenie zostało opublikowane zawsze się to zdarza, nawet w przypadku awarii producenta, przy użyciu tabeli skrzynek nadawczych w bazie danych.
Połącz z innymi seriami
- SQS vs SNS vs EventBridge (artykuł 7): Każda usługa AWS ma swoją własną semantyka DLQ i ponownej próby. W tym artykule omówiono różnice konfiguracyjne między usługami.
- Kolejka niedostarczonych listów Kafka (artykuł 10 serii Kafka): wzór DLQ w Kafce z grupą konsumencką spróbuj ponownie tematu i przetworzenia go ma te same podstawy koncepcyjne, ale inna implementacja niż SQS.







