Amazon SQS: Coada de mesaje

Amazon Simple Queue Service (SQS) este o coadă de mesaje gestionată: producătorii trimit mesaje la coadă, consumatorii le preiau prin sondaj. Semantica de bază sunt punct la punct: Un mesaj în coadă este citit de câte un consumator (primul care îl ia îl „ascunde” celorlalți prin intermediul timeout de vizibilitate).

Coadă standard vs coadă FIFO

SQS oferă două moduri:

  • Coadă standard: Debit nelimitat, livrare cel puțin o dată (dubluri posibile), comandă cu cel mai bun efort (nu este garantată). Pentru cazuri de utilizare cu volum mare unde duplicatele sunt gestionabile din partea consumatorului.
  • Coada FIFO: livrare exactă o dată (deduplicare automată prin MessageDeduplicationId), sortare garantată pentru grup de mesaje, debit limitat la 3.000 msg/s per cerere (cu dozare). Pentru cazurile de utilizare în care comanda și duplicatele sunt critice (de exemplu, tranzacții financiare).
// SQS Producer Java con AWS SDK v2
import software.amazon.awssdk.services.sqs.*;
import software.amazon.awssdk.services.sqs.model.*;

public class SqsProducer {

    private final SqsClient sqsClient = SqsClient.builder()
        .region(Region.EU_WEST_1)
        .build();

    // Standard Queue: invio semplice
    public void sendToStandardQueue(String queueUrl, String messageBody) {
        SendMessageResponse response = sqsClient.sendMessage(
            SendMessageRequest.builder()
                .queueUrl(queueUrl)
                .messageBody(messageBody)
                // Delay di consegna (0-900 secondi)
                .delaySeconds(0)
                // Attributi del messaggio per filtraggio (SNS subscription filter)
                .messageAttributes(Map.of(
                    "eventType", MessageAttributeValue.builder()
                        .stringValue("OrdineCreato")
                        .dataType("String")
                        .build()
                ))
                .build()
        );
        System.out.println("Messaggio inviato: " + response.messageId());
    }

    // FIFO Queue: richiede MessageGroupId e MessageDeduplicationId
    public void sendToFifoQueue(String queueUrl, String ordineId, String payload) {
        sqsClient.sendMessage(
            SendMessageRequest.builder()
                .queueUrl(queueUrl)  // url deve terminare in .fifo
                .messageBody(payload)
                // Group: tutti i messaggi dello stesso ordine vengono elaborati in ordine
                .messageGroupId("ordine-" + ordineId)
                // Deduplication: prevenzione duplicati (valido 5 minuti)
                .messageDeduplicationId(ordineId + "-" + System.currentTimeMillis())
                .build()
        );
    }
}

Consumatorul SQS: Intervalul pentru sondaj și vizibilitate

// SQS Consumer con Long Polling e gestione DLQ
public class SqsConsumer {

    private final SqsClient sqsClient = SqsClient.builder()
        .region(Region.EU_WEST_1).build();

    public void poll(String queueUrl) {
        while (true) {
            // Long polling: aspetta fino a 20 secondi per nuovi messaggi
            // Riduce le chiamate vuote (e i costi) rispetto al short polling
            ReceiveMessageResponse response = sqsClient.receiveMessage(
                ReceiveMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .maxNumberOfMessages(10)        // max 10 per chiamata
                    .waitTimeSeconds(20)             // long polling
                    .visibilityTimeout(30)           // 30s per elaborare
                    .messageAttributeNames("All")
                    .build()
            );

            for (Message message : response.messages()) {
                try {
                    processMessage(message.body());

                    // Successo: elimina dalla coda
                    sqsClient.deleteMessage(
                        DeleteMessageRequest.builder()
                            .queueUrl(queueUrl)
                            .receiptHandle(message.receiptHandle())
                            .build()
                    );

                } catch (Exception e) {
                    // Non eliminare: SQS rirenderà visibile il messaggio
                    // dopo il visibility timeout. Dopo maxReceiveCount tentativi
                    // finisce nella DLQ configurata
                    System.err.println("Errore, il messaggio tornera visibile: " + e.getMessage());
                }
            }
        }
    }
}

Amazon SNS: Fan-Out Pub/Sub

Serviciul de notificare simplă Amazon (SNS) implementează modelul publica/aboneaza-te: un producător postează un mesaj către a subiecte SNS, iar SNS îl oferă în paralel tuturor membrii (abonat). Un subiect SNS poate avea mii de abonați: Lambda, SQS, punct final HTTP, e-mail, SMS, push mobil.

Modelul SNS + SQS (SNS Fan Out) este unul dintre cele mai comune modele în arhitecturile AWS: SNS livrează același eveniment către mai multe cozi SQS în paralel, și fiecare coadă deservește un consumator diferit (un microserviciu diferit).

# SNS + SQS Fan-Out: un evento raggiunge 3 servizi in parallelo

# Struttura:
# SNS Topic "ordini-topic"
#   |--- SQS "inventario-queue" --- Lambda inventario
#   |--- SQS "pagamenti-queue"  --- Lambda pagamenti
#   |--- SQS "notifiche-queue"  --- Lambda notifiche email

# Terraform
resource "aws_sns_topic" "ordini" {
  name = "ordini-topic"
}

resource "aws_sqs_queue" "inventario" {
  name = "inventario-queue"
  visibility_timeout_seconds = 60
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.inventario_dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sns_topic_subscription" "ordini_inventario" {
  topic_arn = aws_sns_topic.ordini.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.inventario.arn

  # Filter policy: questa subscription riceve SOLO OrdineCreato
  filter_policy = jsonencode({
    eventType = ["OrdineCreato"]
  })
}
// SNS Publisher Java
import software.amazon.awssdk.services.sns.*;
import software.amazon.awssdk.services.sns.model.*;

public class SnsPublisher {

    private final SnsClient snsClient = SnsClient.builder()
        .region(Region.EU_WEST_1).build();

    public void publishOrdineCreato(String topicArn, OrdineCreato event) throws Exception {
        String messageJson = new ObjectMapper().writeValueAsString(event);

        PublishResponse response = snsClient.publish(
            PublishRequest.builder()
                .topicArn(topicArn)
                .message(messageJson)
                // Message attributes per subscription filter
                .messageAttributes(Map.of(
                    "eventType", MessageAttributeValue.builder()
                        .stringValue("OrdineCreato")
                        .dataType("String")
                        .build()
                ))
                // Subject visibile solo nelle email
                .subject("Nuovo ordine: " + event.getOrdineId())
                .build()
        );
        System.out.println("SNS Message ID: " + response.messageId());
    }
}

Comparația completă

Caracteristică SQS SNS EventBridge
Model Punct la punct (coadă) Fan-out (pub/sub) Dirijare bazată pe conținut
Destinatari 1 consumator la un moment dat Toate înregistrate în paralel Ținte după regulă (1-5)
Filtra Nimeni (ia totul) Atributele mesajelor (limitate) Model de eveniment pe orice câmp JSON
Triere FIFO garantat (coadă FIFO) No No
Deduplicarea Da (coada FIFO) No No
Max. debitului Nelimitat (Standard) Nelimitat 10 milioane de evenimente/e pe regiune
Latența Milisecunde (sondaj) < 1s (apăsare) < 500 ms
Registrul Schemei No No Da (integrat)
Arhiva evenimentelor/Reluare No No Si
Integrarea partenerilor SaaS No No Da (35+ parteneri)
Cost pe milion de mesaje ~0,40 USD ~0,50 USD ~1,00 USD

Ghid de decizie: Când să folosiți Care

Utilizați SQS când:

  • Trebuie să echilibrarea sarcinii între mai mulți consumatori de același tip (de exemplu, 5 Lambda care procesează aceeași coadă)
  • Mesajul trebuie elaborat exact o dată (coada FIFO)
  • Trebuie să contrapresiune: Mesajele se adună în coadă atunci când consumatorii sunt lenți
  • Vrei intarziere la livrare (până la 15 minute pentru un singur mesaj)
  • Integrați sisteme vechi care utilizează modelul tradițional de coadă

Utilizați SNS atunci când:

  • Același eveniment trebuie să ajungă mai multe sisteme în paralel (model de evantai)
  • Trebuie să trimiteți notificări (e-mail, SMS, push mobil, webhook HTTP)
  • Combinați SNS + SQS pentru a avea atât fan-out, cât și buffering pentru fiecare consumator

Utilizați EventBridge când:

  • Trebuie să rutare inteligentă pe baza conținutului încărcăturii utile
  • Integrați cu Servicii AWS (EC2, S3, RDS) sau Parteneri SaaS (Zendesk, Shopify, Stripe)
  • O vrei Registrul Schemei și guvernarea schemei
  • Trebuie să Arhiva evenimentelor și reluare
  • Construiește a autobuz pentru evenimente corporative cu rutare centralizată

SNS + SQS + Model Lambda: Stiva completă

# Architettura tipica per un microservizio event-driven su AWS
#
# Client HTTP
#    |
#    v
# API Gateway / Lambda "ordini-api"
#    | pubblica su
#    v
# SNS Topic "ordini-events"
#    |-- filtra "OrdineCreato" --> SQS "inventario-queue" --> Lambda "inventario-handler"
#    |-- filtra "OrdineCreato" --> SQS "notifiche-queue"  --> Lambda "email-notifier"
#    |-- tutti i tipi         --> SQS "audit-queue"       --> Lambda "audit-logger"
#
# Ogni SQS ha la sua DLQ per messaggi non elaborabili
# Le Lambda usano le SQS come event source (SQS trigger)

# Configurazione Lambda con SQS trigger
resource "aws_lambda_event_source_mapping" "inventario" {
  event_source_arn = aws_sqs_queue.inventario.arn
  function_name    = aws_lambda_function.inventario_handler.arn
  batch_size       = 10         # processa 10 messaggi alla volta
  enabled          = true

  # Bisection on error: in caso di errore batch, divide il batch a meta
  # per identificare il messaggio problematico (evita il poison pill)
  bisect_batch_on_function_error = true

  # Finestra di aggregazione (utile per batch processing)
  maximum_batching_window_in_seconds = 5
}

EventBridge + SQS: cel mai bun din ambele lumi

În multe arhitecturi, EventBridge și SQS sunt complementare: EventBridge face rutarea inteligentă, SQS se ocupă de tamponarea și echilibrarea încărcăturii. Un eveniment sosește pe EventBridge, este direcționat către coada SQS corespunzătoare, iar consumatorul Lambda îl procesează din coadă cu reîncercare și DLQ.

# EventBridge --> SQS (con transform di input opzionale)
resource "aws_cloudwatch_event_target" "ordini_to_sqs" {
  rule           = aws_cloudwatch_event_rule.ordini_vip.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_sqs_queue.vip_orders.arn

  # Input transformer: ristruttura il payload prima di inviarlo a SQS
  # Utile per estrarre solo i campi necessari
  input_transformer {
    input_paths = {
      ordineId  = "$.detail.ordineId"
      clienteId = "$.detail.clienteId"
      totale    = "$.detail.totale"
    }
    input_template = <<-EOF
    {
      "ordineId": "<ordineId>",
      "clienteId": "<clienteId>",
      "totale": "<totale>",
      "processato_da": "eventbridge-vip-rule"
    }
    EOF
  }
}

Următorii pași din serie

  • Articolul 8 – Coada de scrisori moarte și rezistență: configurația corectă DLQ pentru SQS, SNS și EventBridge este esențial pentru a evita pierderea mesajelor. Timeout de vizibilitate, maxReceiveCount și modelul de reprocesare.

Legătură cu alte serii

  • AWS EventBridge (articolul 6): analiza aprofundată a tiparelor de evenimente, registrul de schemă și arhiva evenimentelor, toate detaliile de configurare EventBridge.
  • Apache Kafka (Seria 38): pentru cazuri de utilizare on-premise sau cu debit foarte mare (>10 milioane msg/s), Kafka este superior SQS/SNS. Alegerea depinde de furnizorul de cloud, prin debit și necesitatea reluării evenimentului.