Arhitectura EventBridge: autobuz, regulă, țintă

EventBridge este organizat în trei concepte fundamentale:

  • Autobuz de evenimente: canalul care primește evenimentele. Fiecare cont AWS are un magistrala de evenimente implicită (primește evenimente de la servicii AWS precum EC2, S3, RDS), mai multe autobuze personalizate (autobuze de evenimente personalizate) pentru evenimente de aplicare, de ex Autobuz de evenimente partener pentru integrări SaaS (Zendesk, Datadog, Salesforce).
  • Regula evenimentului: o regulă cu a modelul evenimentului (filtru JSON) care determină ce evenimente să direcționeze către ce ținte. O singură regulă poate avea până la 5 ținte.
  • Ţintă: Destinația evenimentului. Suportă Lambda, SQS, SNS, funcții Step, API Gateway, Kinesis, EventBridge Bus din alte conturi și peste 20 de alte destinații.

Structura unui eveniment EventBridge

{
  "version": "0",
  "id": "12345678-1234-1234-1234-123456789012",
  "source": "com.mioapp.ordini",
  "account": "123456789012",
  "time": "2026-03-20T10:30:00Z",
  "region": "eu-west-1",
  "detail-type": "OrdineEffettuato",
  "detail": {
    "ordineId": "ord-abc123",
    "clienteId": "cli-xyz789",
    "totale": 149.99,
    "stato": "IN_ATTESA_PAGAMENTO",
    "categoria": "ELETTRONICA"
  }
}

Câmpul detail conține încărcătura utilă a aplicației. source e detail-type identifica tipul de eveniment.

Publicați evenimente pe EventBridge

// EventBridgePublisher.java - Pubblica eventi custom su EventBridge
import software.amazon.awssdk.services.eventbridge.EventBridgeClient;
import software.amazon.awssdk.services.eventbridge.model.*;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;

public class EventBridgePublisher {

    private final EventBridgeClient eventBridgeClient;
    private final ObjectMapper objectMapper;
    private static final String EVENT_BUS_NAME = "mioapp-production";

    public EventBridgePublisher() {
        this.eventBridgeClient = EventBridgeClient.builder()
            .region(Region.EU_WEST_1)
            .build();
        this.objectMapper = new ObjectMapper();
    }

    public void publishOrdineEffettuato(OrdineEffettuatoEvent event) throws Exception {
        String detailJson = objectMapper.writeValueAsString(event);

        PutEventsRequestEntry entry = PutEventsRequestEntry.builder()
            .source("com.mioapp.ordini")
            .detailType("OrdineEffettuato")
            .detail(detailJson)
            .eventBusName(EVENT_BUS_NAME)
            // TraceHeader per X-Ray distributed tracing (opzionale)
            .traceHeader("Root=1-63441c4a-abcdef")
            .build();

        PutEventsResponse response = eventBridgeClient.putEvents(
            PutEventsRequest.builder()
                .entries(entry)
                .build()
        );

        if (response.failedEntryCount() > 0) {
            response.entries().forEach(e -> {
                if (e.errorCode() != null) {
                    throw new EventPublishException(
                        "Errore pubblicazione evento: " + e.errorCode() + " - " + e.errorMessage()
                    );
                }
            });
        }
    }

    // Batch publish (max 10 eventi per chiamata, 256 KB totale)
    public void publishBatch(List<DomainEvent> events) throws Exception {
        List<PutEventsRequestEntry> entries = new ArrayList<>();

        for (DomainEvent event : events) {
            entries.add(PutEventsRequestEntry.builder()
                .source("com.mioapp." + event.getAggregateType())
                .detailType(event.getClass().getSimpleName())
                .detail(objectMapper.writeValueAsString(event))
                .eventBusName(EVENT_BUS_NAME)
                .build());
        }

        PutEventsResponse response = eventBridgeClient.putEvents(
            PutEventsRequest.builder()
                .entries(entries)
                .build()
        );

        System.out.printf("Pubblicati %d/%d eventi. Falliti: %d%n",
            entries.size() - response.failedEntryCount(),
            entries.size(),
            response.failedEntryCount());
    }
}

Tipare de evenimente: rutare bazată pe conținut

Inima EventBridge este rutare bazată pe conținut: filtru reguli evenimente bazate pe conținutul încărcăturii utile JSON, nu doar pe tipul de eveniment. Modelele acceptă potrivirea pe prefixe, sufixe, valori specifice, intervale numerice și condiții negative.

# Esempi di Event Pattern per il routing

# Pattern 1: Tutti gli ordini di importo alto (sopra 500 EUR)
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "totale": [{ "numeric": [">", 500] }]
  }
}

# Pattern 2: Ordini con prodotti della categoria ELETTRONICA o GAMING
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "categoria": ["ELETTRONICA", "GAMING"]
  }
}

# Pattern 3: Ordini di clienti premium con pagamento completato
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato", "PagamentoConfermato"],
  "detail": {
    "tipoCliente": ["PREMIUM", "VIP"],
    "stato": [{ "prefix": "PAGA" }]
  }
}

# Pattern 4: Qualsiasi evento di errore da tutti i servizi dell'app
{
  "source": [{ "prefix": "com.mioapp." }],
  "detail-type": [{ "suffix": "Failed" }]
}

# Pattern 5: Esclusione (not): tutti gli ordini ECCETTO i test
{
  "source": ["com.mioapp.ordini"],
  "detail-type": ["OrdineEffettuato"],
  "detail": {
    "ambiente": [{ "anything-but": ["test", "staging"] }]
  }
}

Creați o regulă cu CloudFormation / Terraform

# Terraform: event bus, rule e target Lambda

# Custom Event Bus
resource "aws_cloudwatch_event_bus" "mioapp" {
  name = "mioapp-production"
}

# Rule: ordini ad alto valore verso Lambda di VIP handling
resource "aws_cloudwatch_event_rule" "ordini_vip" {
  name           = "ordini-vip-handler"
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name

  event_pattern = jsonencode({
    source      = ["com.mioapp.ordini"]
    detail-type = ["OrdineEffettuato"]
    detail = {
      totale     = [{ numeric = [">", 500] }]
      tipoCliente = ["PREMIUM", "VIP"]
    }
  })

  description = "Instrada ordini VIP alto valore alla Lambda dedicata"
}

# Target: Lambda VIP handler
resource "aws_cloudwatch_event_target" "ordini_vip_lambda" {
  rule           = aws_cloudwatch_event_rule.ordini_vip.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_lambda_function.vip_handler.arn

  # Retry policy per il target
  retry_policy {
    maximum_event_age_in_seconds = 3600  # 1 ora
    maximum_retry_attempts       = 3
  }

  # Dead letter queue per gli eventi che non vengono consegnati
  dead_letter_config {
    arn = aws_sqs_queue.eventbridge_dlq.arn
  }
}

# Rule: tutti gli errori verso SQS per analisi
resource "aws_cloudwatch_event_rule" "tutti_errori" {
  name           = "tutti-errori-sqs"
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name

  event_pattern = jsonencode({
    source      = [{ prefix = "com.mioapp." }]
    detail-type = [{ suffix = "Failed" }]
  })
}

resource "aws_cloudwatch_event_target" "errori_sqs" {
  rule           = aws_cloudwatch_event_rule.tutti_errori.name
  event_bus_name = aws_cloudwatch_event_bus.mioapp.name
  arn            = aws_sqs_queue.errori_queue.arn
}

Schema Registry și Schema Discovery

EventBridge include unul Registrul Schemei încorporat: poate descoperi automat tiparele de la evenimentele care trec în autobuz (schema de descoperire) și persistă definițiile a schemei pentru validare și generare de cod.

Principalul avantaj este generarea codului: pornind de la schema descoperita, EventBridge generează automat clase Java, TypeScript sau Python care corespund încărcăturii utile pentru eveniment.

# Abilitare schema discovery su un event bus (AWS CLI)
aws schemas create-discoverer \
  --source-arn arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production \
  --description "Auto-discovery per mioapp-production"

# Elencare gli schemi scoperti
aws schemas list-schemas \
  --registry-name discovered-schemas

# Scaricare il codice generato per Java
aws schemas get-code-binding-source \
  --registry-name discovered-schemas \
  --schema-name "com.mioapp.ordini@OrdineEffettuato" \
  --language "java8" \
  --schema-version "1" \
  --output text > OrdineEffettuatoEvent.java

Arhiva evenimentelor și reluare

Una dintre cele mai puternice caracteristici ale EventBridge esteArhiva evenimentelor: arhivează automat toate evenimentele care tranzitează pe autobuz pentru o perioadă configurabilă, permițând reluare a evenimentelor trecute (utile pentru reconstrucții de proiecție, depanarea problemelor de producție sau testarea de noi consumatori).

# Creare un archivio per il bus degli ordini
aws events create-archive \
  --archive-name mioapp-ordini-archive \
  --event-source-arn arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production \
  --retention-days 90 \
  --event-pattern '{
    "source": ["com.mioapp.ordini"],
    "detail-type": ["OrdineEffettuato", "PagamentoConfermato"]
  }'

# Replay degli eventi archiviati (utile per rebuild di read model)
aws events start-replay \
  --replay-name rebuild-read-model-20260320 \
  --event-source-arn arn:aws:events:eu-west-1:123456789012:archive/mioapp-ordini-archive \
  --event-start-time "2026-01-01T00:00:00Z" \
  --event-end-time "2026-03-20T23:59:59Z" \
  --destination '{
    "Arn": "arn:aws:events:eu-west-1:123456789012:event-bus/mioapp-production",
    "FilterArns": [
      "arn:aws:events:eu-west-1:123456789012:rule/mioapp-production/ordini-vip-handler"
    ]
  }'

# Monitorare il replay
aws events describe-replay \
  --replay-name rebuild-read-model-20260320

Consumator Lambda EventBridge

// Handler Lambda Java per eventi EventBridge
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.fasterxml.jackson.databind.ObjectMapper;

public class OrdiniVipHandler implements RequestHandler<ScheduledEvent, String> {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final VipNotificationService notificationService = new VipNotificationService();

    @Override
    public String handleRequest(ScheduledEvent event, Context context) {
        context.getLogger().log("Evento ricevuto: " + event.getDetailType());

        try {
            // Deserializza il detail field
            OrdineEffettuatoEvent ordine = objectMapper.convertValue(
                event.getDetail(),
                OrdineEffettuatoEvent.class
            );

            context.getLogger().log(String.format(
                "Ordine VIP: %s, cliente: %s, totale: %.2f",
                ordine.getOrdineId(),
                ordine.getClienteId(),
                ordine.getTotale()
            ));

            // Invia notifica personalizzata al cliente VIP
            notificationService.sendVipOrderConfirmation(
                ordine.getClienteId(),
                ordine.getOrdineId(),
                ordine.getTotale()
            );

            return "SUCCESS";

        } catch (Exception e) {
            context.getLogger().log("ERRORE: " + e.getMessage());
            // Rilancia per triggherare il retry di EventBridge
            throw new RuntimeException("Elaborazione fallita", e);
        }
    }
}

// TypeScript handler per Lambda Node.js
// export const handler = async (event: EventBridgeEvent<'OrdineEffettuato', OrdineDetail>) => {
//   const { ordineId, clienteId, totale } = event.detail;
//   await sendVipNotification(clienteId, ordineId, totale);
//   return { statusCode: 200 };
// };

Cele mai bune practici pentru EventBridge

  • Autobuz de evenimente personalizat pentru fiecare mediu: Folosiți autobuze separate pentru producție, organizare și dezvoltare. Nu trimiteți evenimente de testare către magistrala de producție.
  • Utilizați întotdeauna DLQ pentru ținte: Configurați o coadă SQS cu scrisori moarte pentru fiecare țintă pentru a nu rata evenimente în cazul defectării consumatorului.
  • Idempotenta la consumatorii Lambda: EventBridge garantează cel puțin o livrare. Lambda trebuie să gestioneze recepția duplicată a aceluiași eveniment.
  • Versiune pentru evenimente: adăugați întotdeauna un câmp schemaVersion în detaliu. EventBridge nu are un mecanism de versiuni încorporat: gestionați acest lucru în sarcina utilă.
  • Arhiva de evenimente pentru fiecare producție de autobuz: configurați întotdeauna o arhivă cu reținere cel putin 30 de zile. Replay poate salva ziua în cazul unor erori la consumatori.

Următorii pași din serie

  • Articolul 7 – SQS vs SNS vs EventBridge: ghid de decizie pentru alegere serviciul de mesagerie AWS potrivit pentru fiecare caz de utilizare specific.
  • Articolul 8 – Coada de scrisori și rezistență: Configurați corect DLQ pentru EventBridge, SQS și Lambda pentru a gestiona mesajele eșuate fără pierderi de date.

Legătură cu alte serii

  • Saga Pattern (Articolul 5): EventBridge este magistrala ideală pentru mesaje pentru Choreography Saga pe AWS: Fiecare serviciu publică evenimente pentru EventBridge și pentru ceilalți serviciile reacţionează prin regulile de rutare configurate.
  • Apache Kafka (Seria 38): pentru sisteme on-premise sau hibride, Kafka și EventBridge pot coexista: Kafka pentru mesagerie internă de mare debit, EventBridge pentru integrarea cu servicii externe AWS și SaaS.