Architektura EventBridge: magistrala, reguła, cel

EventBridge jest podzielony na trzy podstawowe koncepcje:

  • Autobus imprezowy: kanał odbierający zdarzenia. Każde konto AWS ma domyślna magistrala zdarzeń (odbiera zdarzenia z usług AWS, takich jak EC2, S3, RDS), wiele niestandardowych autobusów (niestandardowe autobusy eventowe) dla zdarzeń aplikacyjnych, np autobus na imprezę partnerską do integracji SaaS (Zendesk, Datadog, Salesforce).
  • Reguła zdarzenia: reguła z a wzór zdarzenia (filtr JSON), który określa które zdarzenia kierować do jakich celów. Pojedyncza reguła może mieć maksymalnie 5 celów.
  • Cel: Miejsce docelowe wydarzenia. Obsługuje funkcje Lambda, SQS, SNS, Step, API Gateway, Kinesis, EventBridge Bus z innych kont i ponad 20 innych miejsc docelowych.

Struktura zdarzenia EventBridge

{
  "version": "0",
  "id": "12345678-1234-1234-1234-123456789012",
  "source": "com.mioapp.ordini",
  "account": "123456789012",
  "time": "2026-03-20T10:30:00Z",
  "region": "eu-west-1",
  "detail-type": "OrdineEffettuato",
  "detail": {
    "ordineId": "ord-abc123",
    "clienteId": "cli-xyz789",
    "totale": 149.99,
    "stato": "IN_ATTESA_PAGAMENTO",
    "categoria": "ELETTRONICA"
  }
}

Pole detail zawiera ładunek aplikacji. source e detail-type określić rodzaj zdarzenia.

Publikuj wydarzenia na EventBridge

// EventBridgePublisher.java - Pubblica eventi custom su EventBridge
import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
import software.amazon.awssdk.services.eventbridge.model.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;

public class EventBridgePublisher {

    private final EventBridgeClient eventBridgeClient;
    private final ObjectMapper objectMapper;
    private static final String EVENT_BUS_NAME = "mioapp-production";

    public EventBridgePublisher() {
        this.eventBridgeClient = EventBridgeClient.builder()
            .region(Region.EU_WEST_1)
            .build();
        this.objectMapper = new ObjectMapper();
    }

    public void publishOrdineEffettuato(OrdineEffettuatoEvent event) throws Exception {
        String detailJson = objectMapper.writeValueAsString(event);

        PutEventsRequestEntry entry = PutEventsRequestEntry.builder()
            .source("com.mioapp.ordini")
            .detailType("OrdineEffettuato")
            .detail(detailJson)
            .eventBusName(EVENT_BUS_NAME)
            // TraceHeader per X-Ray distributed tracing (opzionale)
            .traceHeader("Root=1-63441c4a-abcdef")
            .build();

        PutEventsResponse response = eventBridgeClient.putEvents(
            PutEventsRequest.builder()
                .entries(entry)
                .build()
        );

        if (response.failedEntryCount() > 0) {
            response.entries().forEach(e -> {
                if (e.errorCode() != null) {
                    throw new EventPublishException(
                        "Errore pubblicazione evento: " + e.errorCode() + " - " + e.errorMessage()
                    );
                }
            });
        }
    }

    // Batch publish (max 10 eventi per chiamata, 256 KB totale)
    public void publishBatch(List<DomainEvent> events) throws Exception {
        List<PutEventsRequestEntry> entries = new ArrayList<>();

        for (DomainEvent event : events) {
            entries.add(PutEventsRequestEntry.builder()
                .source("com.mioapp." + event.getAggregateType())
                .detailType(event.getClass().getSimpleName())
                .detail(objectMapper.writeValueAsString(event))
                .eventBusName(EVENT_BUS_NAME)
                .build());
        }

        PutEventsResponse response = eventBridgeClient.putEvents(
            PutEventsRequest.builder()
                .entries(entries)
                .build()
        );

        System.out.printf("Pubblicati %d/%d eventi. Falliti: %d%n",
            entries.size() - response.failedEntryCount(),
            entries.size(),
            response.failedEntryCount());
    }
}

Wzorce zdarzeń: routing oparty na treści

Sercem EventBridge jest routing oparty na treści: filtr reguł zdarzenia na podstawie zawartości ładunku JSON, a nie tylko typu zdarzenia. Wzorce obsługują dopasowywanie przedrostków, przyrostków, określonych wartości, zakresów liczbowych i warunków ujemnych.

# Esempi di Event Pattern per il routing

# Pattern 1: Tutti gli ordini di importo alto (sopra 500 EUR)
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "totale": [{ "numeric": [">", 500] }]
  }
}

# Pattern 2: Ordini con prodotti della categoria ELETTRONICA o GAMING
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "categoria": ["ELETTRONICA", "GAMING"]
  }
}

# Pattern 3: Ordini di clienti premium con pagamento completato
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato", "PagamentoConfermato"],
  "detail": {
    "tipoCliente": ["PREMIUM", "VIP"],
    "stato": [{ "prefix": "PAGA" }]
  }
}

# Pattern 4: Qualsiasi evento di errore da tutti i servizi dell'app
{
  "source": [{ "prefix": "com.mioapp." }],
  "detail-type": [{ "suffix": "Failed" }]
}

# Pattern 5: Esclusione (not): tutti gli ordini ECCETTO i test
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "ambiente": [{ "anything-but": ["test", "staging"] }]
  }
}

Utwórz regułę za pomocą CloudFormation/Terraform

# Terraform: event bus, rule e target Lambda

# Custom Event Bus
resource "aws_cloudwatch_event_bus" "mioapp" {
  name = "mioapp-production"
}

# Rule: ordini ad alto valore verso Lambda di VIP handling
resource "aws_cloudwatch_event_rule" "ordini_vip" {
  name           = "ordini-vip-handler"
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name

  event_pattern = jsonencode({
    source      = ["com.mioapp.ordini"]
    detail-type = ["OrdineEffettuato"]
    detail = {
      totale     = [{ numeric = [">", 500] }]
      tipoCliente = ["PREMIUM", "VIP"]
    }
  })

  description = "Instrada ordini VIP alto valore alla Lambda dedicata"
}

# Target: Lambda VIP handler
resource "aws_cloudwatch_event_target" "ordini_vip_lambda" {
  rule           = aws_cloudwatch_event_rule.ordini_vip.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_lambda_function.vip_handler.arn

  # Retry policy per il target
  retry_policy {
    maximum_event_age_in_seconds = 3600  # 1 ora
    maximum_retry_attempts       = 3
  }

  # Dead letter queue per gli eventi che non vengono consegnati
  dead_letter_config {
    arn = aws_sqs_queue.eventbridge_dlq.arn
  }
}

# Rule: tutti gli errori verso SQS per analisi
resource "aws_cloudwatch_event_rule" "tutti_errori" {
  name           = "tutti-errori-sqs"
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name

  event_pattern = jsonencode({
    source      = [{ prefix = "com.mioapp." }]
    detail-type = [{ suffix = "Failed" }]
  })
}

resource "aws_cloudwatch_event_target" "errori_sqs" {
  rule           = aws_cloudwatch_event_rule.tutti_errori.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_sqs_queue.errori_queue.arn
}

Rejestr schematów i wykrywanie schematów

EventBridge zawiera jeden Rejestr schematów wbudowany: może wykryć automatycznie wzorce ze zdarzeń, które przechodzą w autobusie (schemat odkrywania) i zachować definicje schematu do walidacji i generowania kodu.

Główną zaletą jest generowanie kodu: zaczynając od odkrytego schematu, EventBridge automatycznie generuje klasy Java, TypeScript lub Python odpowiadające ładunkowi zdarzenia.

# Abilitare schema discovery su un event bus (AWS CLI)
aws schemas create-discoverer \
  --source-arn arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production \
  --description "Auto-discovery per mioapp-production"

# Elencare gli schemi scoperti
aws schemas list-schemas \
  --registry-name discovered-schemas

# Scaricare il codice generato per Java
aws schemas get-code-binding-source \
  --registry-name discovered-schemas \
  --schema-name "com.mioapp.ordini@OrdineEffettuato" \
  --language "java8" \
  --schema-version "1" \
  --output text > OrdineEffettuatoEvent.java

Archiwum wydarzeń i powtórka

Jedną z najpotężniejszych funkcji EventBridge jestArchiwum wydarzeń: automatycznie archiwizuje wszystkie zdarzenia przesyłane magistralą przez konfigurowalny okres, pozwalając powtórna rozgrywka wydarzeń z przeszłości (przydatne przy rekonstrukcji projekcji, debugowanie problemów produkcyjnych lub testowanie nowych konsumentów).

# Creare un archivio per il bus degli ordini
aws events create-archive \
  --archive-name mioapp-ordini-archive \
  --event-source-arn arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production \
  --retention-days 90 \
  --event-pattern '{
    "source": ["com.mioapp.ordini"],
    "detail-type": ["OrdineEffettuato", "PagamentoConfermato"]
  }'

# Replay degli eventi archiviati (utile per rebuild di read model)
aws events start-replay \
  --replay-name rebuild-read-model-20260320 \
  --event-source-arn arn:aws:events:eu-west-1:123456789012:archive/mioapp-ordini-archive \
  --event-start-time "2026-01-01T00:00:00Z" \
  --event-end-time "2026-03-20T23:59:59Z" \
  --destination '{
    "Arn": "arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production",
    "FilterArns": [
      "arn:aws:events:eu-west-1:123456789012:rule/mioapp-production/ordini-vip-handler"
    ]
  }'

# Monitorare il replay
aws events describe-replay \
  --replay-name rebuild-read-model-20260320

Konsument EventBridge Lambda

// Handler Lambda Java per eventi EventBridge
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrdiniVipHandler implements RequestHandler<ScheduledEvent, String> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final VipNotificationService notificationService = new VipNotificationService();

    @Override
    public String handleRequest(ScheduledEvent event, Context context) {
        context.getLogger().log("Evento ricevuto: " + event.getDetailType());

        try {
            // Deserializza il detail field
            OrdineEffettuatoEvent ordine = objectMapper.convertValue(
                event.getDetail(),
                OrdineEffettuatoEvent.class
            );

            context.getLogger().log(String.format(
                "Ordine VIP: %s, cliente: %s, totale: %.2f",
                ordine.getOrdineId(),
                ordine.getClienteId(),
                ordine.getTotale()
            ));

            // Invia notifica personalizzata al cliente VIP
            notificationService.sendVipOrderConfirmation(
                ordine.getClienteId(),
                ordine.getOrdineId(),
                ordine.getTotale()
            );

            return "SUCCESS";

        } catch (Exception e) {
            context.getLogger().log("ERRORE: " + e.getMessage());
            // Rilancia per triggherare il retry di EventBridge
            throw new RuntimeException("Elaborazione fallita", e);
        }
    }
}

// TypeScript handler per Lambda Node.js
// export const handler = async (event: EventBridgeEvent<'OrdineEffettuato', OrdineDetail>) => {
//   const { ordineId, clienteId, totale } = event.detail;
//   await sendVipNotification(clienteId, ordineId, totale);
//   return { statusCode: 200 };
// };

Najlepsze praktyki dotyczące EventBridge

  • Niestandardowa magistrala zdarzeń dla każdego środowiska: Używaj oddzielnych autobusów do produkcji, testowania i programowania. Nie wysyłaj zdarzeń testowych do magistrali produkcyjnej.
  • Zawsze używaj DLQ dla celów: Skonfiguruj kolejkę niedostarczonych wiadomości SQS dla każdego celu tak, aby nie przegapić wydarzeń w przypadku awarii konsumenckiej.
  • Idempotencja u konsumentów Lambda: EventBridge gwarantuje dostawę przynajmniej raz. Lambda musi obsługiwać podwójny odbiór tego samego zdarzenia.
  • Wersjonowanie zdarzeń: Zawsze dodawaj pole schemaVersion w szczegółach. EventBridge nie ma wbudowanego mechanizmu wersjonowania: obsłuż to w ładunku.
  • Archiwum zdarzeń dla każdej produkcji autobusu: zawsze konfiguruj archiwum z przechowywaniem co najmniej 30 dni. Powtórka może uratować sytuację w przypadku błędów u konsumentów.

Kolejne kroki w serii

  • Artykuł 7 – SQS kontra SNS kontra EventBridge: przewodnik dotyczący podejmowania decyzji dotyczących wyboru odpowiednią usługę przesyłania wiadomości AWS dla każdego konkretnego przypadku użycia.
  • Artykuł 8 – Kolejka niedostarczonych listów i odporność: Skonfiguruj poprawnie DLQ dla EventBridge, SQS i Lambda do obsługi komunikatów o błędach bez utraty danych.

Połącz z innymi seriami

  • Wzór Sagi (artykuł 5): EventBridge to idealna magistrala komunikatów dla Choreography Saga na AWS: Każda usługa publikuje zdarzenia w EventBridge i innych usługi reagują poprzez skonfigurowane reguły routingu.
  • Apache Kafka (seria 38): dla systemów lokalnych lub hybrydowych, Kafka i EventBridge mogą współistnieć: Kafka do wysokoprzepustowego przesyłania wiadomości wewnętrznych, EventBridge do integracji z zewnętrznymi usługami AWS i SaaS.