De ce 2PC nu funcționează în microservicii

Il Angajare în două faze (2 PC) este protocolul clasic pentru tranzacțiile distribuite: un coordonator le cere tuturor participanților să se pregătească (faza de pregătire), apoi ordonați comiterea. Acest lucru funcționează bine pentru baze de date omogene (de exemplu, două instanțe PostgreSQL), dar este problematic în microservicii:

  • Cuplare: Toate serviciile trebuie să fie disponibile în timpul tranzacției
  • Performanţă: resursele rămân blocate până când sunt comise (blocări distribuite)
  • Suport limitat: Multe servicii cloud, API-uri REST și baze de date NoSQL nu acceptă 2PC
  • Scalabilitate: coordonatorul devine un singur punct de eșec

Modelul Saga înlocuiește o tranzacție mare distribuită cu una succesiunea tranzacțiilor locale, fiecare finalizat independent, coordonat prin evenimente sau un orchestrator.

Exemplu: E-commerce Purchasing Saga

Pașii Saga pentru o achiziție:

  1. Creați comanda (Serviciul Comenzi) → OrdineCreato
  2. Rezervă Inventar (Serviciul de inventariere) → Inventar rezervat
  3. Procesați plata (Serviciul de plată) → PaymentConfirmed
  4. Confirmați comanda (Serviciul de comandă) → Comanda confirmată

În cazul în care plata eșuează, tranzacțiile compensatoare sunt:

  1. Eliberați inventarul (Serviciul de inventariere)
  2. Anulează comanda (Serviciul de comandă)

Saga coregrafică: Autonomie de serviciu

În Saga Coregrafiei, nu există un orchestrator central. Fiecare serviciu ascultă evenimentele produse de serviciul anterior și reacționează în consecință, producând la rândul său noi evenimente. Logica de coordonare este distribuită între servicii: fiecare își cunoaște doar pasul și evenimentele la care reacționează.

Pro: decuplare maximă, fără un singur punct de defecțiune, simplu de implementat. Împotriva: greu de depanat (logica saga este distribuită), fără vizibilitate centrală pentru starea sagăi, riscul unor cicluri de evenimente prost gestionate.

// CHOREOGRAPHY SAGA con Spring Kafka

// ===== STEP 1: Servizio Ordini - crea l'ordine =====
@Service
public class OrdiniService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public String creaOrdine(CreaOrdineCommand cmd) {
        String ordineId = UUID.randomUUID().toString();

        // Salva l'ordine in stato PENDING
        Ordine ordine = Ordine.builder()
            .id(ordineId)
            .clienteId(cmd.getClienteId())
            .prodotti(cmd.getProdotti())
            .stato(StatoOrdine.PENDING)
            .build();
        ordineRepository.save(ordine);

        // Pubblica evento: il Servizio Inventario lo ascolterà
        OrdineCreato event = new OrdineCreato(ordineId, cmd.getProdotti());
        kafkaTemplate.send("ordini-events", ordineId, event);

        return ordineId;
    }

    // Ascolta la conferma del pagamento per finalizzare l'ordine
    @KafkaListener(topics = "pagamenti-events", groupId = "ordini-service")
    public void onPagamento(ConsumerRecord<String, Object> record) {
        if (record.value() instanceof PagamentoConfermato event) {
            ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.CONFERMATO);
            kafkaTemplate.send("ordini-events", event.getOrdineId(),
                new OrdineConfermato(event.getOrdineId()));
        }
        if (record.value() instanceof PagamentoFallito event) {
            // Compensating: annulla l'ordine
            ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.ANNULLATO);
        }
    }
}

// ===== STEP 2: Servizio Inventario - riserva i prodotti =====
@Service
public class InventarioService {

    @KafkaListener(topics = "ordini-events", groupId = "inventario-service")
    public void onOrdinCreato(ConsumerRecord<String, Object> record) {
        if (record.value() instanceof OrdineCreato event) {
            try {
                // Tenta la prenotazione dell'inventario
                prenotaInventario(event.getOrdineId(), event.getProdotti());

                // Successo: pubblica evento per il Servizio Pagamenti
                kafkaTemplate.send("inventario-events", event.getOrdineId(),
                    new InventarioRiservato(event.getOrdineId(), event.getTotale()));

            } catch (InventarioInsufficienterException e) {
                // Fallimento: pubblica evento di fallimento
                // Il Servizio Ordini annullerà l'ordine
                kafkaTemplate.send("inventario-events", event.getOrdineId(),
                    new InventarioNonDisponibile(event.getOrdineId(), e.getMessage()));
            }
        }

        // Compensating: rilascia l'inventario se il pagamento fallisce
        if (record.value() instanceof PagamentoFallito event) {
            rilasciaInventario(event.getOrdineId());
        }
    }
}

// ===== STEP 3: Servizio Pagamenti - processa il pagamento =====
@Service
public class PagamentiService {

    @KafkaListener(topics = "inventario-events", groupId = "pagamenti-service")
    public void onInventarioRiservato(ConsumerRecord<String, Object> record) {
        if (record.value() instanceof InventarioRiservato event) {
            try {
                processaPagamento(event.getOrdineId(), event.getTotale());

                kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
                    new PagamentoConfermato(event.getOrdineId()));

            } catch (PagamentoRifiutatoException e) {
                kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
                    new PagamentoFallito(event.getOrdineId(), e.getMessage()));
            }
        }
    }
}

Orchestration Saga: Control centralizat

în'Saga orchestrației, o componentă centrală numită Saga Orchestrator (sau Process Manager) coordonează întreaga secvență. Orchestratorul cunoaște toți pașii, trimiteți comanda către servicii și așteptați evenimentele de răspuns ale acestora. În caz de eșec, orchestratorul trimite explicit comenzi de compensare.

Pro: logică centralizată și vizibilă, ușor de depanat, stare explicită de saga, tratarea mai precisă a erorilor. Împotriva: cuplaj major, orchestratorul cunoaște toate serviciile implicate.

// ORCHESTRATION SAGA con Axon Framework (Java)
// Axon gestisce automaticamente la persistenza dello stato della saga

import org.axonframework.saga.*;
import org.axonframework.eventhandling.*;

@Saga
public class AcquistoSaga {

    @Autowired
    private transient CommandGateway commandGateway;

    // Lo stato della saga persiste automaticamente tra gli step
    private String ordineId;
    private List<ProdottoOrdinato> prodotti;
    private BigDecimal totale;

    // STEP 1: Avvio della saga quando viene creato un ordine
    @StartSaga
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(OrdineCreato event) {
        this.ordineId = event.getOrdineId();
        this.prodotti = event.getProdotti();
        this.totale = event.getTotale();

        // Invia comando al servizio inventario
        commandGateway.send(new RiservaInventarioCommand(
            ordineId,
            prodotti
        ));
    }

    // STEP 2: Inventario riservato con successo - procedi con il pagamento
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(InventarioRiservato event) {
        commandGateway.send(new ProcessaPagamentoCommand(
            ordineId,
            totale,
            "CARTA_CREDITO"
        ));
    }

    // STEP 3: Pagamento confermato - finalizza l'ordine
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(PagamentoConfermato event) {
        commandGateway.send(new ConfermaOrdineCommand(ordineId));
    }

    // Fine saga (successo)
    @EndSaga
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(OrdineConfermato event) {
        System.out.println("Saga completata con successo per ordine: " + ordineId);
    }

    // ===== COMPENSATING TRANSACTIONS =====

    // Pagamento fallito: rilascia inventario, poi annulla ordine
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(PagamentoFallito event) {
        // Compensating step 1: rilascia inventario
        commandGateway.send(new RilasciaInventarioCommand(ordineId, prodotti));
    }

    // Inventario rilasciato - annulla l'ordine
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(InventarioRilasciato event) {
        // Compensating step 2: annulla l'ordine
        commandGateway.send(new AnnullaOrdineCommand(ordineId, "Pagamento fallito"));
    }

    // Inventario non disponibile - annulla direttamente
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(InventarioNonDisponibile event) {
        commandGateway.send(new AnnullaOrdineCommand(ordineId, "Inventario insufficiente"));
    }

    // Fine saga per percorso di fallimento
    @EndSaga
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(OrdineAnnullato event) {
        System.out.println("Saga fallita/annullata per ordine: " + ordineId + ". Motivo: " + event.getMotivo());
    }
}

Saga cu AWS Step Functions (fără server)

Pentru arhitecturi fără server pe AWS, Funcții de pas este serviciul gestionat pentru a implementa Orchestration Saga. Fiecare pas este o funcție Lambda sau o acțiune AWS, iar Step Functions gestionează automat starea, reîncercarea și compensațiile prin definiție în limba Amazon Statelor (ASL).

{
  "Comment": "Saga acquisto e-commerce con compensating transactions",
  "StartAt": "CreaOrdine",
  "States": {
    "CreaOrdine": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:crea-ordine",
      "Next": "RiservaInventario",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "FallimentoSaga"
        }
      ]
    },
    "RiservaInventario": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:riserva-inventario",
      "Next": "ProcessaPagamento",
      "Catch": [
        {
          "ErrorEquals": ["InventarioInsufficienterException"],
          "Next": "CompensaAnnullaOrdine"
        }
      ]
    },
    "ProcessaPagamento": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:processa-pagamento",
      "Retry": [
        {
          "ErrorEquals": ["TransientPaymentError"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Next": "ConfermaOrdine",
      "Catch": [
        {
          "ErrorEquals": ["PagamentoRifiutatoException"],
          "Next": "CompensaRilasciaInventario"
        }
      ]
    },
    "ConfermaOrdine": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:conferma-ordine",
      "End": true
    },

    "CompensaRilasciaInventario": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:rilascia-inventario",
      "Next": "CompensaAnnullaOrdine"
    },
    "CompensaAnnullaOrdine": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:annulla-ordine",
      "Next": "FallimentoSaga"
    },
    "FallimentoSaga": {
      "Type": "Fail",
      "Error": "SagaFailed",
      "Cause": "La transazione distribuita e fallita, compensazioni eseguite"
    }
  }
}

Idempotenta in tranzactiile compensatorii

Tranzacțiile compensatoare trebuie să fie idempotent: dacă este invocat de mai multe ori (din cauza reîncercării rețelei sau a orchestratorului), rezultatul trebuie să fie același. Cea mai comună tehnică este utilizarea a cheie de idempotenta: orderId + tipul operației formează o cheie unică care împiedică operațiunile duplicate.

// RilasciaInventarioHandler.java - Compensating transaction idempotente
@CommandHandler
public void handle(RilasciaInventarioCommand cmd) {
    // Controllo idempotenza: questa compensazione e gia stata eseguita?
    String idempotencyKey = cmd.getOrdineId() + ":RILASCIA_INVENTARIO";

    if (compensazioniEseguite.contains(idempotencyKey)) {
        System.out.println("Compensazione gia eseguita per " + idempotencyKey + ", skip");
        return;
    }

    // Esegui la compensazione
    for (ProdottoOrdinato prodotto : cmd.getProdotti()) {
        inventarioRepository.incrementaDisponibilita(
            prodotto.getProductId(),
            prodotto.getQuantita()
        );
    }

    // Segna la compensazione come eseguita
    compensazioniEseguite.add(idempotencyKey);

    // Pubblica evento per continuare la catena di compensazione
    eventBus.publish(new InventarioRilasciato(cmd.getOrdineId()));
}

Urmărirea statului Saga: vizibilitate în stat

Una dintre criticile la adresa Orchestration Saga este dificultatea de a urmări statul în sisteme de volum mare. Un tabel explicit de urmărire a saga rezolvă această problemă:

-- Tabella di tracking per le saga in corso
CREATE TABLE saga_state (
    saga_id       VARCHAR(36) PRIMARY KEY,
    saga_type     VARCHAR(100) NOT NULL,
    ordine_id     VARCHAR(36) NOT NULL,
    current_step  VARCHAR(100) NOT NULL,
    stato         VARCHAR(50) NOT NULL,  -- IN_CORSO, COMPLETATA, FALLITA, COMPENSATA
    started_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at  TIMESTAMPTZ,
    error_message TEXT
);

-- Query utili per monitoring
-- Saga in corso da piu di 5 minuti (potenzialmente bloccate)
SELECT * FROM saga_state
WHERE stato = 'IN_CORSO'
  AND started_at < NOW() - INTERVAL '5 minutes';

-- Distribuzione per step corrente
SELECT current_step, COUNT(*) as count
FROM saga_state WHERE stato = 'IN_CORSO'
GROUP BY current_step;

Comparație: Coregrafie vs Orchestrație

astept Coregrafie Orchestrație
Cuplare Scăzut (evenimente) Mediu (orchestratorul cunoaște serviciile)
Vizibilitatea stării Hard (distribuit) Ridicat (centralizat)
Depanare Hard (logică distribuită) Ușor (stare explicită)
Complexitatea serviciilor Ridicat (fiecare serviciu cunoaște compensațiile) Scăzut (servicii simple, logică în orchestrator)
Scalabilitate Mare (fără puncte unice) Mediu (Orchestrator la scară)
Caz de utilizare ideal Câțiva pași, servicii autonome, echipe independente Mulți pași, logică complexă, vizibilitate necesară

Următorii pași din serie

  • Articolul 6 – AWS EventBridge: Choreography Saga folosește o magistrală de mesaje pentru evenimente de schimb. EventBridge este autobuzul gestionat AWS ideal pentru rutare inteligentă a evenimentelor dintre Lambda și SQS.
  • Articolul 10 – Model pentru căsuța de ieșire: Asigurați expedierea atomică a evenimentelor Saga împreună cu actualizarea bazei de date locală.

Legătură cu alte serii

  • Aprovizionare pentru evenimente + CQRS (articolul 4): fiecare pas al Saga produce evenimente care poate fi folosit ca sursă de adevăr pentru a reconstrui starea agregată. Axon Framework acceptă nativ combinația Saga + Event Sourcing.
  • Apache Kafka (Seria 38): Kafka este autobuzul de mesaje ideal pentru Saga Coregrafie în producție, cu garanții de durabilitate și comandă per partiție de cheie.