Amazon SQS: Kolejka wiadomości

Usługa prostej kolejki Amazon (SQS) to zarządzana kolejka komunikatów: producenci wysyłają wiadomości do kolejki, konsumenci pobierają je poprzez odpytywanie. Podstawowa semantyka jest taka punkt-punkt: Wiadomość w kolejce czyta go każdorazowo jeden konsument (pierwszy, który go weźmie, „ukryje” go przed pozostałymi za pośrednictwem limit czasu widoczności).

Kolejka standardowa a kolejka FIFO

SQS oferuje dwa tryby:

  • Standardowa kolejka: Nieograniczona przepustowość, dostawa co najmniej raz (możliwe duplikaty), zamówienie z największą starannością (bez gwarancji). Do zastosowań o dużej objętości gdzie duplikaty są możliwe do zarządzania po stronie konsumenta.
  • Kolejka FIFO: dostawa dokładnie raz (automatyczna deduplikacja poprzez MessageDeduplicationId), gwarantowane sortowanie dla grupa wiadomości, przepustowość ograniczona do 3000 msg/s na żądanie (z dozowaniem). Do zastosowań, w których kolejność i duplikaty są krytyczne (np. transakcje finansowe).
// SQS Producer Java con AWS SDK v2
import software.amazon.awssdk.services.sqs.*;
import software.amazon.awssdk.services.sqs.model.*;

public class SqsProducer {

    private final SqsClient sqsClient = SqsClient.builder()
        .region(Region.EU_WEST_1)
        .build();

    // Standard Queue: invio semplice
    public void sendToStandardQueue(String queueUrl, String messageBody) {
        SendMessageResponse response = sqsClient.sendMessage(
            SendMessageRequest.builder()
                .queueUrl(queueUrl)
                .messageBody(messageBody)
                // Delay di consegna (0-900 secondi)
                .delaySeconds(0)
                // Attributi del messaggio per filtraggio (SNS subscription filter)
                .messageAttributes(Map.of(
                    "eventType", MessageAttributeValue.builder()
                        .stringValue("OrdineCreato")
                        .dataType("String")
                        .build()
                ))
                .build()
        );
        System.out.println("Messaggio inviato: " + response.messageId());
    }

    // FIFO Queue: richiede MessageGroupId e MessageDeduplicationId
    public void sendToFifoQueue(String queueUrl, String ordineId, String payload) {
        sqsClient.sendMessage(
            SendMessageRequest.builder()
                .queueUrl(queueUrl)  // url deve terminare in .fifo
                .messageBody(payload)
                // Group: tutti i messaggi dello stesso ordine vengono elaborati in ordine
                .messageGroupId("ordine-" + ordineId)
                // Deduplication: prevenzione duplicati (valido 5 minuti)
                .messageDeduplicationId(ordineId + "-" + System.currentTimeMillis())
                .build()
        );
    }
}

Konsument SQS: Limit czasu odpytywania i widoczności

// SQS Consumer con Long Polling e gestione DLQ
public class SqsConsumer {

    private final SqsClient sqsClient = SqsClient.builder()
        .region(Region.EU_WEST_1).build();

    public void poll(String queueUrl) {
        while (true) {
            // Long polling: aspetta fino a 20 secondi per nuovi messaggi
            // Riduce le chiamate vuote (e i costi) rispetto al short polling
            ReceiveMessageResponse response = sqsClient.receiveMessage(
                ReceiveMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .maxNumberOfMessages(10)        // max 10 per chiamata
                    .waitTimeSeconds(20)             // long polling
                    .visibilityTimeout(30)           // 30s per elaborare
                    .messageAttributeNames("All")
                    .build()
            );

            for (Message message : response.messages()) {
                try {
                    processMessage(message.body());

                    // Successo: elimina dalla coda
                    sqsClient.deleteMessage(
                        DeleteMessageRequest.builder()
                            .queueUrl(queueUrl)
                            .receiptHandle(message.receiptHandle())
                            .build()
                    );

                } catch (Exception e) {
                    // Non eliminare: SQS rirenderà visibile il messaggio
                    // dopo il visibility timeout. Dopo maxReceiveCount tentativi
                    // finisce nella DLQ configurata
                    System.err.println("Errore, il messaggio tornera visibile: " + e.getMessage());
                }
            }
        }
    }
}

Amazon SNS: Fan-Out Pub/Sub

Usługa prostego powiadamiania Amazon (SNS) realizuje wzór publikuj/subskrybuj: producent wysyła wiadomość do a Tematy SNS, a SNS dostarcza je wszystkim równolegle członkowie (abonent). Temat SNS może mieć tysiące subskrybentów: Lambda, SQS, punkt końcowy HTTP, e-mail, SMS, push mobilny.

Wzorzec SNS + SQS (Fani SNS) to jeden z najczęstszych wzorców w architekturach AWS: SNS dostarcza to samo zdarzenie do wielu kolejek SQS równolegle, a każda kolejka obsługuje innego konsumenta (inną mikrousługę).

# SNS + SQS Fan-Out: un evento raggiunge 3 servizi in parallelo

# Struttura:
# SNS Topic "ordini-topic"
#   |--- SQS "inventario-queue" --- Lambda inventario
#   |--- SQS "pagamenti-queue"  --- Lambda pagamenti
#   |--- SQS "notifiche-queue"  --- Lambda notifiche email

# Terraform
resource "aws_sns_topic" "ordini" {
  name = "ordini-topic"
}

resource "aws_sqs_queue" "inventario" {
  name = "inventario-queue"
  visibility_timeout_seconds = 60
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.inventario_dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sns_topic_subscription" "ordini_inventario" {
  topic_arn = aws_sns_topic.ordini.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.inventario.arn

  # Filter policy: questa subscription riceve SOLO OrdineCreato
  filter_policy = jsonencode({
    eventType = ["OrdineCreato"]
  })
}
// SNS Publisher Java
import software.amazon.awssdk.services.sns.*;
import software.amazon.awssdk.services.sns.model.*;

public class SnsPublisher {

    private final SnsClient snsClient = SnsClient.builder()
        .region(Region.EU_WEST_1).build();

    public void publishOrdineCreato(String topicArn, OrdineCreato event) throws Exception {
        String messageJson = new ObjectMapper().writeValueAsString(event);

        PublishResponse response = snsClient.publish(
            PublishRequest.builder()
                .topicArn(topicArn)
                .message(messageJson)
                // Message attributes per subscription filter
                .messageAttributes(Map.of(
                    "eventType", MessageAttributeValue.builder()
                        .stringValue("OrdineCreato")
                        .dataType("String")
                        .build()
                ))
                // Subject visibile solo nelle email
                .subject("Nuovo ordine: " + event.getOrdineId())
                .build()
        );
        System.out.println("SNS Message ID: " + response.messageId());
    }
}

Pełne porównanie

Charakterystyczny SQS SNS Most zdarzeń
Model Punkt-punkt (kolejka) Fan-out (pub/sub) Routing oparty na treści
Odbiorcy 1 konsument na raz Wszystko zarejestrowane równolegle Cele według reguły (1-5)
Filtr Nikt (bierze wszystko) Atrybuty wiadomości (ograniczone) Wzorzec zdarzenia w dowolnym polu JSON
Sortowanie Gwarantowane FIFO (kolejka FIFO) No No
Deduplikacja Tak (kolejka FIFO) No No
Maks. przepustowość Nieograniczony (standardowy) Nieograniczony 10 milionów zdarzeń/s na region
Utajenie Milisekundy (odpytywanie) < 1 s (naciśnięcie) < 500 ms
Rejestr schematów No No Tak (zintegrowany)
Archiwum wydarzeń/powtórka No No Si
Integracja partnerów SaaS No No Tak (ponad 35 partnerów)
Koszt za milion wiadomości ~0,40 USD ~0,50 USD ~1,00 USD

Przewodnik decyzyjny: kiedy używać którego

Użyj SQS, gdy:

  • Musisz równoważenie obciążenia pomiędzy wieloma konsumentami tego samego typu (np. 5 Lambd przetwarzających tę samą kolejkę)
  • Przesłanie wymaga dopracowania dokładnie raz (kolejka FIFO)
  • Musisz przeciwciśnienie: Wiadomości piętrzą się w kolejce, gdy konsumenci są powolni
  • Czy chcesz opóźnienie dostawy (do 15 minut na pojedynczą wiadomość)
  • Integrujesz starsze systemy, które korzystają z tradycyjnego modelu kolejki

Używaj SNS, gdy:

  • To samo wydarzenie musi dotrzeć wiele systemów równolegle (wzór wachlarzowy)
  • Musisz wysłać powiadomienia (e-mail, SMS, mobile push, webhook HTTP)
  • Połącz SNS + SQS, aby zapewnić każdemu konsumentowi zarówno rozwieranie, jak i buforowanie

Użyj EventBridge, gdy:

  • Musisz inteligentne trasowanie na podstawie zawartości ładunku
  • Zintegruj z usługi AWS-a (EC2, S3, RDS) lub Partnerzy SaaS (Zendesk, Shopify, Stripe)
  • Chcesz tego Rejestr schematów i zarządzanie systemem
  • Musisz Archiwum wydarzeń i powtórka
  • Zbuduj autobus na imprezę firmową ze scentralizowanym routingiem

SNS + SQS + wzór Lambda: kompletny stos

# Architettura tipica per un microservizio event-driven su AWS
#
# Client HTTP
#    |
#    v
# API Gateway / Lambda "ordini-api"
#    | pubblica su
#    v
# SNS Topic "ordini-events"
#    |-- filtra "OrdineCreato" --> SQS "inventario-queue" --> Lambda "inventario-handler"
#    |-- filtra "OrdineCreato" --> SQS "notifiche-queue"  --> Lambda "email-notifier"
#    |-- tutti i tipi         --> SQS "audit-queue"       --> Lambda "audit-logger"
#
# Ogni SQS ha la sua DLQ per messaggi non elaborabili
# Le Lambda usano le SQS come event source (SQS trigger)

# Configurazione Lambda con SQS trigger
resource "aws_lambda_event_source_mapping" "inventario" {
  event_source_arn = aws_sqs_queue.inventario.arn
  function_name    = aws_lambda_function.inventario_handler.arn
  batch_size       = 10         # processa 10 messaggi alla volta
  enabled          = true

  # Bisection on error: in caso di errore batch, divide il batch a meta
  # per identificare il messaggio problematico (evita il poison pill)
  bisect_batch_on_function_error = true

  # Finestra di aggregazione (utile per batch processing)
  maximum_batching_window_in_seconds = 5
}

EventBridge + SQS: to, co najlepsze z obu światów

W wielu architekturach EventBridge i SQS uzupełniają się: EventBridge wykonuje inteligentny routing, SQS zajmuje się buforowaniem i równoważeniem obciążenia. Zdarzenie dociera do EventBridge, jest kierowane do odpowiedniej kolejki SQS, a konsumencka Lambda przetwarza go z kolejki z ponowną próbą i DLQ.

# EventBridge --> SQS (con transform di input opzionale)
resource "aws_cloudwatch_event_target" "ordini_to_sqs" {
  rule           = aws_cloudwatch_event_rule.ordini_vip.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_sqs_queue.vip_orders.arn

  # Input transformer: ristruttura il payload prima di inviarlo a SQS
  # Utile per estrarre solo i campi necessari
  input_transformer {
    input_paths = {
      ordineId  = "$.detail.ordineId"
      clienteId = "$.detail.clienteId"
      totale    = "$.detail.totale"
    }
    input_template = <<-EOF
    {
      "ordineId": "<ordineId>",
      "clienteId": "<clienteId>",
      "totale": "<totale>",
      "processato_da": "eventbridge-vip-rule"
    }
    EOF
  }
}

Kolejne kroki w serii

  • Artykuł 8 – Kolejka niedostarczonych listów i odporność: prawidłowa konfiguracja DLQ dla SQS, SNS i EventBridge ma kluczowe znaczenie dla uniknięcia utraty wiadomości. Limit czasu widoczności, maxReceiveCount i wzorzec ponownego przetwarzania.

Połącz z innymi seriami

  • AWS EventBridge (artykuł 6): dogłębna analiza wzorców zdarzeń, rejestr schematów i archiwum zdarzeń, wszystkie szczegóły konfiguracji EventBridge.
  • Apache Kafka (seria 38): do zastosowań lokalnych lub zastosowań o bardzo dużej przepustowości (>10 milionów msg/s), Kafka jest lepszy od SQS/SNS. Wybór zależy od dostawcy chmury, przepustowością i koniecznością odtwarzania zdarzeń.