EventBridge Architecture: Bus, Rule, Target

EventBridge je organizován do tří základních konceptů:

  • Event Bus: kanál, který přijímá události. Každý účet AWS má a výchozí sběrnice událostí (přijímá události ze služeb AWS jako EC2, S3, RDS), více vlastních sběrnic (autobusy na zakázku) pro aplikační akce, např autobus na partnerskou akci pro integrace SaaS (Zendesk, Datadog, Salesforce).
  • Pravidlo události: pravidlo s a vzor události (filtr JSON), který určuje které události směřovat ke kterým cílům. Jedno pravidlo může mít až 5 cílů.
  • Cíl: Cíl akce. Podporuje funkce Lambda, SQS, SNS, Step, API Gateway, Kinesis, EventBridge Bus z jiných účtů a více než 20 dalších destinací.

Struktura události EventBridge

{
  "version": "0",
  "id": "12345678-1234-1234-1234-123456789012",
  "source": "com.mioapp.ordini",
  "account": "123456789012",
  "time": "2026-03-20T10:30:00Z",
  "region": "eu-west-1",
  "detail-type": "OrdineEffettuato",
  "detail": {
    "ordineId": "ord-abc123",
    "clienteId": "cli-xyz789",
    "totale": 149.99,
    "stato": "IN_ATTESA_PAGAMENTO",
    "categoria": "ELETTRONICA"
  }
}

Pole detail obsahuje užitečné zatížení aplikace. source e detail-type identifikovat typ události.

Publikujte události na EventBridge

// EventBridgePublisher.java - Pubblica eventi custom su EventBridge
import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
import software.amazon.awssdk.services.eventbridge.model.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;

public class EventBridgePublisher {

    private final EventBridgeClient eventBridgeClient;
    private final ObjectMapper objectMapper;
    private static final String EVENT_BUS_NAME = "mioapp-production";

    public EventBridgePublisher() {
        this.eventBridgeClient = EventBridgeClient.builder()
            .region(Region.EU_WEST_1)
            .build();
        this.objectMapper = new ObjectMapper();
    }

    public void publishOrdineEffettuato(OrdineEffettuatoEvent event) throws Exception {
        String detailJson = objectMapper.writeValueAsString(event);

        PutEventsRequestEntry entry = PutEventsRequestEntry.builder()
            .source("com.mioapp.ordini")
            .detailType("OrdineEffettuato")
            .detail(detailJson)
            .eventBusName(EVENT_BUS_NAME)
            // TraceHeader per X-Ray distributed tracing (opzionale)
            .traceHeader("Root=1-63441c4a-abcdef")
            .build();

        PutEventsResponse response = eventBridgeClient.putEvents(
            PutEventsRequest.builder()
                .entries(entry)
                .build()
        );

        if (response.failedEntryCount() > 0) {
            response.entries().forEach(e -> {
                if (e.errorCode() != null) {
                    throw new EventPublishException(
                        "Errore pubblicazione evento: " + e.errorCode() + " - " + e.errorMessage()
                    );
                }
            });
        }
    }

    // Batch publish (max 10 eventi per chiamata, 256 KB totale)
    public void publishBatch(List<DomainEvent> events) throws Exception {
        List<PutEventsRequestEntry> entries = new ArrayList<>();

        for (DomainEvent event : events) {
            entries.add(PutEventsRequestEntry.builder()
                .source("com.mioapp." + event.getAggregateType())
                .detailType(event.getClass().getSimpleName())
                .detail(objectMapper.writeValueAsString(event))
                .eventBusName(EVENT_BUS_NAME)
                .build());
        }

        PutEventsResponse response = eventBridgeClient.putEvents(
            PutEventsRequest.builder()
                .entries(entries)
                .build()
        );

        System.out.printf("Pubblicati %d/%d eventi. Falliti: %d%n",
            entries.size() - response.failedEntryCount(),
            entries.size(),
            response.failedEntryCount());
    }
}

Vzory událostí: Směrování podle obsahu

Srdcem EventBridge je směrování založené na obsahu: filtr pravidel události založené na obsahu datové části JSON, nikoli pouze na typu události. Vzory podporují shodu s předponami, příponami, specifickými hodnotami, číselnými rozsahy a zápornými podmínkami.

# Esempi di Event Pattern per il routing

# Pattern 1: Tutti gli ordini di importo alto (sopra 500 EUR)
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "totale": [{ "numeric": [">", 500] }]
  }
}

# Pattern 2: Ordini con prodotti della categoria ELETTRONICA o GAMING
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "categoria": ["ELETTRONICA", "GAMING"]
  }
}

# Pattern 3: Ordini di clienti premium con pagamento completato
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato", "PagamentoConfermato"],
  "detail": {
    "tipoCliente": ["PREMIUM", "VIP"],
    "stato": [{ "prefix": "PAGA" }]
  }
}

# Pattern 4: Qualsiasi evento di errore da tutti i servizi dell'app
{
  "source": [{ "prefix": "com.mioapp." }],
  "detail-type": [{ "suffix": "Failed" }]
}

# Pattern 5: Esclusione (not): tutti gli ordini ECCETTO i test
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "ambiente": [{ "anything-but": ["test", "staging"] }]
  }
}

Vytvořte pravidlo pomocí CloudFormation / Terraform

# Terraform: event bus, rule e target Lambda

# Custom Event Bus
resource "aws_cloudwatch_event_bus" "mioapp" {
  name = "mioapp-production"
}

# Rule: ordini ad alto valore verso Lambda di VIP handling
resource "aws_cloudwatch_event_rule" "ordini_vip" {
  name           = "ordini-vip-handler"
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name

  event_pattern = jsonencode({
    source      = ["com.mioapp.ordini"]
    detail-type = ["OrdineEffettuato"]
    detail = {
      totale     = [{ numeric = [">", 500] }]
      tipoCliente = ["PREMIUM", "VIP"]
    }
  })

  description = "Instrada ordini VIP alto valore alla Lambda dedicata"
}

# Target: Lambda VIP handler
resource "aws_cloudwatch_event_target" "ordini_vip_lambda" {
  rule           = aws_cloudwatch_event_rule.ordini_vip.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_lambda_function.vip_handler.arn

  # Retry policy per il target
  retry_policy {
    maximum_event_age_in_seconds = 3600  # 1 ora
    maximum_retry_attempts       = 3
  }

  # Dead letter queue per gli eventi che non vengono consegnati
  dead_letter_config {
    arn = aws_sqs_queue.eventbridge_dlq.arn
  }
}

# Rule: tutti gli errori verso SQS per analisi
resource "aws_cloudwatch_event_rule" "tutti_errori" {
  name           = "tutti-errori-sqs"
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name

  event_pattern = jsonencode({
    source      = [{ prefix = "com.mioapp." }]
    detail-type = [{ suffix = "Failed" }]
  })
}

resource "aws_cloudwatch_event_target" "errori_sqs" {
  rule           = aws_cloudwatch_event_rule.tutti_errori.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_sqs_queue.errori_queue.arn
}

Schema Registry a Schema Discovery

EventBridge jeden obsahuje Registr schémat vestavěný: může objevovat automaticky vzory z událostí, které procházejí autobusem (zjišťovací schéma) a zachovat definice schéma pro validaci a generování kódu.

Hlavní výhodou je generování kódu: počínaje objeveným schématem, EventBridge automaticky generuje třídy Java, TypeScript nebo Python, které odpovídají datové části události.

# Abilitare schema discovery su un event bus (AWS CLI)
aws schemas create-discoverer \
  --source-arn arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production \
  --description "Auto-discovery per mioapp-production"

# Elencare gli schemi scoperti
aws schemas list-schemas \
  --registry-name discovered-schemas

# Scaricare il codice generato per Java
aws schemas get-code-binding-source \
  --registry-name discovered-schemas \
  --schema-name "com.mioapp.ordini@OrdineEffettuato" \
  --language "java8" \
  --schema-version "1" \
  --output text > OrdineEffettuatoEvent.java

Archiv událostí a přehrávání

Jednou z nejvýkonnějších funkcí EventBridge jeArchiv událostí: automaticky archivuje všechny události, které procházejí autobusem, po nastavitelnou dobu, umožňující přehrání minulých událostí (užitečné pro přestavby projekce, ladění produkčních problémů nebo testování nových spotřebitelů).

# Creare un archivio per il bus degli ordini
aws events create-archive \
  --archive-name mioapp-ordini-archive \
  --event-source-arn arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production \
  --retention-days 90 \
  --event-pattern '{
    "source": ["com.mioapp.ordini"],
    "detail-type": ["OrdineEffettuato", "PagamentoConfermato"]
  }'

# Replay degli eventi archiviati (utile per rebuild di read model)
aws events start-replay \
  --replay-name rebuild-read-model-20260320 \
  --event-source-arn arn:aws:events:eu-west-1:123456789012:archive/mioapp-ordini-archive \
  --event-start-time "2026-01-01T00:00:00Z" \
  --event-end-time "2026-03-20T23:59:59Z" \
  --destination '{
    "Arn": "arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production",
    "FilterArns": [
      "arn:aws:events:eu-west-1:123456789012:rule/mioapp-production/ordini-vip-handler"
    ]
  }'

# Monitorare il replay
aws events describe-replay \
  --replay-name rebuild-read-model-20260320

EventBridge Lambda spotřebitel

// Handler Lambda Java per eventi EventBridge
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrdiniVipHandler implements RequestHandler<ScheduledEvent, String> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final VipNotificationService notificationService = new VipNotificationService();

    @Override
    public String handleRequest(ScheduledEvent event, Context context) {
        context.getLogger().log("Evento ricevuto: " + event.getDetailType());

        try {
            // Deserializza il detail field
            OrdineEffettuatoEvent ordine = objectMapper.convertValue(
                event.getDetail(),
                OrdineEffettuatoEvent.class
            );

            context.getLogger().log(String.format(
                "Ordine VIP: %s, cliente: %s, totale: %.2f",
                ordine.getOrdineId(),
                ordine.getClienteId(),
                ordine.getTotale()
            ));

            // Invia notifica personalizzata al cliente VIP
            notificationService.sendVipOrderConfirmation(
                ordine.getClienteId(),
                ordine.getOrdineId(),
                ordine.getTotale()
            );

            return "SUCCESS";

        } catch (Exception e) {
            context.getLogger().log("ERRORE: " + e.getMessage());
            // Rilancia per triggherare il retry di EventBridge
            throw new RuntimeException("Elaborazione fallita", e);
        }
    }
}

// TypeScript handler per Lambda Node.js
// export const handler = async (event: EventBridgeEvent<'OrdineEffettuato', OrdineDetail>) => {
//   const { ordineId, clienteId, totale } = event.detail;
//   await sendVipNotification(clienteId, ordineId, totale);
//   return { statusCode: 200 };
// };

Nejlepší postupy pro EventBridge

  • Vlastní sběrnice událostí pro každé prostředí: Používejte samostatné sběrnice pro produkci, inscenaci a vývoj. Neposílejte testovací události do produkční sběrnice.
  • Pro cíle vždy používejte DLQ: Nakonfigurujte frontu nedoručených zpráv SQS pro každý cíl aby nezmeškaly události v případě selhání spotřebitele.
  • Idempotence u spotřebitelů Lambda: EventBridge zaručuje doručení alespoň jednou. Lambda musí zvládnout duplicitní příjem stejné události.
  • Verze událostí: Vždy přidejte pole schemaVersion v detailu. EventBridge nemá vestavěný mechanismus verzování: zpracujte to v užitečné zátěži.
  • Archiv událostí pro každou výrobu autobusu: vždy nakonfigurujte archiv s uchováváním alespoň 30 dní. Replay může zachránit den v případě chyb u spotřebitelů.

Další kroky v sérii

  • Článek 7 – SQS vs SNS vs EventBridge: průvodce rozhodováním pro výběr správná služba zasílání zpráv AWS pro každý konkrétní případ použití.
  • Článek 8 – Fronta nedoručených dopisů a odolnost: Nakonfigurujte správně DLQ pro EventBridge, SQS a Lambda pro zpracování neúspěšných zpráv bez ztráty dat.

Propojení s ostatními sériemi

  • Vzor ságy (článek 5): EventBridge je ideální sběrnice zpráv pro Choreography Saga na AWS: Každá služba publikuje události pro EventBridge a ostatní služby reagují prostřednictvím nakonfigurovaných směrovacích pravidel.
  • Apache Kafka (řada 38): pro on-premise nebo hybridní systémy, Kafka a EventBridge mohou koexistovat: Kafka pro vysoce výkonné interní zasílání zpráv, EventBridge pro integraci s externími službami AWS a SaaS.