2PC Neden Mikro Hizmetlerde Çalışmıyor?

Il İki Aşamalı Taahhüt (2PC) dağıtılmış işlemler için klasik protokoldür: Koordinatör tüm katılımcılardan hazırlık yapmalarını ister (hazırlık aşaması), ardından taahhüdü sipariş edin. Bu, homojen veritabanları için iyi çalışır (örneğin, iki PostgreSQL örneği), ancak mikro hizmetlerde sorunludur:

  • Kaplin: İşlem sırasında tüm hizmetlerin mevcut olması gerekir
  • Performans: kaynaklar taahhüt edilene kadar kilitli kalır (dağıtılmış kilitler)
  • Sınırlı destek: Birçok bulut hizmeti, REST API'si ve NoSQL veri tabanı 2PC'yi desteklemez
  • Ölçeklenebilirlik: koordinatör tek bir başarısızlık noktası haline gelir

Saga Modeli, büyük bir dağıtılmış işlemi tek bir işlemle değiştirir yerel işlemlerin sırası, her biri bağımsız olarak tamamlanır, etkinlikler veya bir orkestratör aracılığıyla koordine edilir.

Örnek: E-ticaret Satın Alma Efsanesi

Bir satın alma için destan adımları:

  1. Sipariş Oluştur (Sipariş Servisi) → OrdineCreato
  2. Rezerv Envanteri (Envanter Hizmeti) → Ayrılmış Envanter
  3. Ödeme İşlemi (Ödeme Hizmeti) → ÖdemeOnaylandı
  4. Siparişi Onayla (Sipariş Hizmeti) → SiparişOnaylandı

Ödeme başarısız olursa telafi edici işlemler şunlardır:

  1. Envanter Sürümü (Envanter Hizmeti)
  2. Siparişi İptal Et (Sipariş Hizmeti)

Koreografi Efsanesi: Hizmet Özerkliği

içinde Koreografi Efsanesi, merkezi bir orkestratör yoktur. Her hizmet, bir önceki hizmetin ürettiği olayları dinler ve buna göre tepki verir, karşılığında yeni etkinlikler üretiyoruz. Koordinasyon mantığı dağıtılır hizmetler arasında: her biri yalnızca kendi adımını ve tepki verdiği olayları bilir.

Profesyonel: maksimum ayrıştırma, tek bir arıza noktası yok, uygulaması basit. Aykırı: hata ayıklaması zor (destan mantığı dağıtılmış), görünürlük yok Destanın durumunun merkezinde, kötü yönetilen olaylar döngüsü riski var.

// CHOREOGRAPHY SAGA con Spring Kafka

// ===== STEP 1: Servizio Ordini - crea l'ordine =====
@Service
public class OrdiniService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public String creaOrdine(CreaOrdineCommand cmd) {
        String ordineId = UUID.randomUUID().toString();

        // Salva l'ordine in stato PENDING
        Ordine ordine = Ordine.builder()
            .id(ordineId)
            .clienteId(cmd.getClienteId())
            .prodotti(cmd.getProdotti())
            .stato(StatoOrdine.PENDING)
            .build();
        ordineRepository.save(ordine);

        // Pubblica evento: il Servizio Inventario lo ascolterà
        OrdineCreato event = new OrdineCreato(ordineId, cmd.getProdotti());
        kafkaTemplate.send("ordini-events", ordineId, event);

        return ordineId;
    }

    // Ascolta la conferma del pagamento per finalizzare l'ordine
    @KafkaListener(topics = "pagamenti-events", groupId = "ordini-service")
    public void onPagamento(ConsumerRecord<String, Object> record) {
        if (record.value() instanceof PagamentoConfermato event) {
            ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.CONFERMATO);
            kafkaTemplate.send("ordini-events", event.getOrdineId(),
                new OrdineConfermato(event.getOrdineId()));
        }
        if (record.value() instanceof PagamentoFallito event) {
            // Compensating: annulla l'ordine
            ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.ANNULLATO);
        }
    }
}

// ===== STEP 2: Servizio Inventario - riserva i prodotti =====
@Service
public class InventarioService {

    @KafkaListener(topics = "ordini-events", groupId = "inventario-service")
    public void onOrdinCreato(ConsumerRecord<String, Object> record) {
        if (record.value() instanceof OrdineCreato event) {
            try {
                // Tenta la prenotazione dell'inventario
                prenotaInventario(event.getOrdineId(), event.getProdotti());

                // Successo: pubblica evento per il Servizio Pagamenti
                kafkaTemplate.send("inventario-events", event.getOrdineId(),
                    new InventarioRiservato(event.getOrdineId(), event.getTotale()));

            } catch (InventarioInsufficienterException e) {
                // Fallimento: pubblica evento di fallimento
                // Il Servizio Ordini annullerà l'ordine
                kafkaTemplate.send("inventario-events", event.getOrdineId(),
                    new InventarioNonDisponibile(event.getOrdineId(), e.getMessage()));
            }
        }

        // Compensating: rilascia l'inventario se il pagamento fallisce
        if (record.value() instanceof PagamentoFallito event) {
            rilasciaInventario(event.getOrdineId());
        }
    }
}

// ===== STEP 3: Servizio Pagamenti - processa il pagamento =====
@Service
public class PagamentiService {

    @KafkaListener(topics = "inventario-events", groupId = "pagamenti-service")
    public void onInventarioRiservato(ConsumerRecord<String, Object> record) {
        if (record.value() instanceof InventarioRiservato event) {
            try {
                processaPagamento(event.getOrdineId(), event.getTotale());

                kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
                    new PagamentoConfermato(event.getOrdineId()));

            } catch (PagamentoRifiutatoException e) {
                kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
                    new PagamentoFallito(event.getOrdineId(), e.getMessage()));
            }
        }
    }
}

Orkestrasyon Efsanesi: Merkezi Kontrol

'deOrkestrasyon Efsanesiadı verilen merkezi bir bileşen Saga Orkestratörü (veya Süreç Yöneticisi) tüm sırayı koordine eder. Orkestracı tüm adımları biliyor, Hizmetlere Komut gönderin ve yanıt olaylarını bekleyin. Başarısızlık durumunda, orkestratör açıkça telafi komutları gönderir.

Profesyonel: merkezi ve görünür mantık, hata ayıklaması kolay, açık destan durumu, daha hassas hata işleme. Aykırı: ana bağlantı, orkestratör ilgili tüm hizmetleri bilir.

// ORCHESTRATION SAGA con Axon Framework (Java)
// Axon gestisce automaticamente la persistenza dello stato della saga

import org.axonframework.saga.*;
import org.axonframework.eventhandling.*;

@Saga
public class AcquistoSaga {

    @Autowired
    private transient CommandGateway commandGateway;

    // Lo stato della saga persiste automaticamente tra gli step
    private String ordineId;
    private List<ProdottoOrdinato> prodotti;
    private BigDecimal totale;

    // STEP 1: Avvio della saga quando viene creato un ordine
    @StartSaga
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(OrdineCreato event) {
        this.ordineId = event.getOrdineId();
        this.prodotti = event.getProdotti();
        this.totale = event.getTotale();

        // Invia comando al servizio inventario
        commandGateway.send(new RiservaInventarioCommand(
            ordineId,
            prodotti
        ));
    }

    // STEP 2: Inventario riservato con successo - procedi con il pagamento
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(InventarioRiservato event) {
        commandGateway.send(new ProcessaPagamentoCommand(
            ordineId,
            totale,
            "CARTA_CREDITO"
        ));
    }

    // STEP 3: Pagamento confermato - finalizza l'ordine
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(PagamentoConfermato event) {
        commandGateway.send(new ConfermaOrdineCommand(ordineId));
    }

    // Fine saga (successo)
    @EndSaga
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(OrdineConfermato event) {
        System.out.println("Saga completata con successo per ordine: " + ordineId);
    }

    // ===== COMPENSATING TRANSACTIONS =====

    // Pagamento fallito: rilascia inventario, poi annulla ordine
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(PagamentoFallito event) {
        // Compensating step 1: rilascia inventario
        commandGateway.send(new RilasciaInventarioCommand(ordineId, prodotti));
    }

    // Inventario rilasciato - annulla l'ordine
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(InventarioRilasciato event) {
        // Compensating step 2: annulla l'ordine
        commandGateway.send(new AnnullaOrdineCommand(ordineId, "Pagamento fallito"));
    }

    // Inventario non disponibile - annulla direttamente
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(InventarioNonDisponibile event) {
        commandGateway.send(new AnnullaOrdineCommand(ordineId, "Inventario insufficiente"));
    }

    // Fine saga per percorso di fallimento
    @EndSaga
    @SagaEventHandler(associationProperty = "ordineId")
    public void on(OrdineAnnullato event) {
        System.out.println("Saga fallita/annullata per ordine: " + ordineId + ". Motivo: " + event.getMotivo());
    }
}

AWS Step Functions ile Saga (Sunucusuz)

AWS'deki sunucusuz mimariler için, Adım Fonksiyonları yönetilen hizmettir Orchestration Saga'yı hayata geçirmek. Her adım bir Lambda işlevi veya bir AWS eylemidir. ve Step Functions, durumu, yeniden denemeyi ve telafileri tanım gereği otomatik olarak yönetir Amazon Eyalet Dilinde (ASL).

{
  "Comment": "Saga acquisto e-commerce con compensating transactions",
  "StartAt": "CreaOrdine",
  "States": {
    "CreaOrdine": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:crea-ordine",
      "Next": "RiservaInventario",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "FallimentoSaga"
        }
      ]
    },
    "RiservaInventario": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:riserva-inventario",
      "Next": "ProcessaPagamento",
      "Catch": [
        {
          "ErrorEquals": ["InventarioInsufficienterException"],
          "Next": "CompensaAnnullaOrdine"
        }
      ]
    },
    "ProcessaPagamento": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:processa-pagamento",
      "Retry": [
        {
          "ErrorEquals": ["TransientPaymentError"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Next": "ConfermaOrdine",
      "Catch": [
        {
          "ErrorEquals": ["PagamentoRifiutatoException"],
          "Next": "CompensaRilasciaInventario"
        }
      ]
    },
    "ConfermaOrdine": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:conferma-ordine",
      "End": true
    },

    "CompensaRilasciaInventario": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:rilascia-inventario",
      "Next": "CompensaAnnullaOrdine"
    },
    "CompensaAnnullaOrdine": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456:function:annulla-ordine",
      "Next": "FallimentoSaga"
    },
    "FallimentoSaga": {
      "Type": "Fail",
      "Error": "SagaFailed",
      "Cause": "La transazione distribuita e fallita, compensazioni eseguite"
    }
  }
}

Tazminat İşlemlerinde İdemsizlik

Telafi edici işlemler yapılmalı güçsüz: birkaç kez çağrılırsa (ağ veya orkestratörün yeniden denemesi nedeniyle), sonuç aynı olmalıdır. En yaygın teknik bir kullanmaktır. idempotency anahtarı: orderId + işlem türü, yinelenen işlemleri önleyen benzersiz bir anahtar oluşturur.

// RilasciaInventarioHandler.java - Compensating transaction idempotente
@CommandHandler
public void handle(RilasciaInventarioCommand cmd) {
    // Controllo idempotenza: questa compensazione e gia stata eseguita?
    String idempotencyKey = cmd.getOrdineId() + ":RILASCIA_INVENTARIO";

    if (compensazioniEseguite.contains(idempotencyKey)) {
        System.out.println("Compensazione gia eseguita per " + idempotencyKey + ", skip");
        return;
    }

    // Esegui la compensazione
    for (ProdottoOrdinato prodotto : cmd.getProdotti()) {
        inventarioRepository.incrementaDisponibilita(
            prodotto.getProductId(),
            prodotto.getQuantita()
        );
    }

    // Segna la compensazione come eseguita
    compensazioniEseguite.add(idempotencyKey);

    // Pubblica evento per continuare la catena di compensazione
    eventBus.publish(new InventarioRilasciato(cmd.getOrdineId()));
}

Saga Durum Takibi: Eyaletin Görünürlüğü

Orchestration Saga'ya getirilen eleştirilerden biri de durumu takip etmenin zorluğudur. Yüksek hacimli sistemlerde. Açık bir destan izleme tablosu bu sorunu çözer:

-- Tabella di tracking per le saga in corso
CREATE TABLE saga_state (
    saga_id       VARCHAR(36) PRIMARY KEY,
    saga_type     VARCHAR(100) NOT NULL,
    ordine_id     VARCHAR(36) NOT NULL,
    current_step  VARCHAR(100) NOT NULL,
    stato         VARCHAR(50) NOT NULL,  -- IN_CORSO, COMPLETATA, FALLITA, COMPENSATA
    started_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at  TIMESTAMPTZ,
    error_message TEXT
);

-- Query utili per monitoring
-- Saga in corso da piu di 5 minuti (potenzialmente bloccate)
SELECT * FROM saga_state
WHERE stato = 'IN_CORSO'
  AND started_at < NOW() - INTERVAL '5 minutes';

-- Distribuzione per step corrente
SELECT current_step, COUNT(*) as count
FROM saga_state WHERE stato = 'IN_CORSO'
GROUP BY current_step;

Karşılaştırma: Koreografi ve Orkestrasyon

bekliyorum Koreografi Orkestrasyon
Kaplin Düşük (olaylar) Orta (orkestratör hizmetleri bilir)
Durum görünürlüğü Sert (dağıtılmış) Yüksek (merkezi)
Hata ayıklama Sert (dağıtılmış mantık) Kolay (açık durum)
Hizmetlerin karmaşıklığı Yüksek (her servis tazminatları bilir) Düşük (basit hizmetler, orkestratördeki mantık)
Ölçeklenebilirlik Yüksek (tek puan yok) Orta (Ölçeğe göre Orkestratör)
İdeal kullanım durumu Birkaç adım, otonom hizmetler, bağımsız ekipler Birçok adım, karmaşık mantık, gerekli görünürlük

Serideki Sonraki Adımlar

  • Madde 6 – AWS EventBridge: Koreografi Saga, bir mesaj veriyolunu kullanarak olay alışverişi. EventBridge, akıllı yönlendirme için ideal AWS yönetilen veriyoludur Lambda ve SQS arasındaki olayların.
  • Madde 10 – Giden Kutusu Modeli: Saga olaylarının atomik gönderimini sağlayın yerel veritabanı güncellemesiyle birlikte.

Diğer Serilerle Bağlantı

  • Olay Kaynak Kullanımı + CQRS (Madde 4): Destan'ın her adımı olaylar üretir toplu durumu yeniden inşa etmek için bir hakikat kaynağı olarak kullanılabilir. Axon Framework, Saga + Event Sourcing kombinasyonunu yerel olarak destekler.
  • Apaçi Kafka (Seri 38): Kafka ideal mesaj otobüsü Koreografi Efsanesi, dayanıklılık ve sipariş garantisiyle üretimde anahtar bölümü başına.