Efsane Deseni: Koreografi ve Orkestrasyonla Dağıtılmış İşlemler
Mikro hizmetlerde böyle bir şey yoktur BEGIN TRANSACTION ... COMMIT bu hizmet sınırlarını aşar.
Bir operasyon Sipariş Hizmetini, Envanter Hizmetini ve Ödeme Hizmetini içerdiğinde,
Üçünün de başarılı olmasını veya hiçbirinin etkili olmamasını nasıl sağlayacağız? Destan Deseni cevap şu: her hizmet kendi yerel işlemini gerçekleştirir
ve başarısızlık durumunda, telafi edici işlemler daha önce yapılmış olanı geri almak için.
2PC Neden Mikro Hizmetlerde Çalışmıyor?
Il İki Aşamalı Taahhüt (2PC) dağıtılmış işlemler için klasik protokoldür: Koordinatör tüm katılımcılardan hazırlık yapmalarını ister (hazırlık aşaması), ardından taahhüdü sipariş edin. Bu, homojen veritabanları için iyi çalışır (örneğin, iki PostgreSQL örneği), ancak mikro hizmetlerde sorunludur:
- Kaplin: İşlem sırasında tüm hizmetlerin mevcut olması gerekir
- Performans: kaynaklar taahhüt edilene kadar kilitli kalır (dağıtılmış kilitler)
- Sınırlı destek: Birçok bulut hizmeti, REST API'si ve NoSQL veri tabanı 2PC'yi desteklemez
- Ölçeklenebilirlik: koordinatör tek bir başarısızlık noktası haline gelir
Saga Modeli, büyük bir dağıtılmış işlemi tek bir işlemle değiştirir yerel işlemlerin sırası, her biri bağımsız olarak tamamlanır, etkinlikler veya bir orkestratör aracılığıyla koordine edilir.
Örnek: E-ticaret Satın Alma Efsanesi
Bir satın alma için destan adımları:
- Sipariş Oluştur (Sipariş Servisi) → OrdineCreato
- Rezerv Envanteri (Envanter Hizmeti) → Ayrılmış Envanter
- Ödeme İşlemi (Ödeme Hizmeti) → ÖdemeOnaylandı
- Siparişi Onayla (Sipariş Hizmeti) → SiparişOnaylandı
Ödeme başarısız olursa telafi edici işlemler şunlardır:
- Envanter Sürümü (Envanter Hizmeti)
- Siparişi İptal Et (Sipariş Hizmeti)
Koreografi Efsanesi: Hizmet Özerkliği
içinde Koreografi Efsanesi, merkezi bir orkestratör yoktur. Her hizmet, bir önceki hizmetin ürettiği olayları dinler ve buna göre tepki verir, karşılığında yeni etkinlikler üretiyoruz. Koordinasyon mantığı dağıtılır hizmetler arasında: her biri yalnızca kendi adımını ve tepki verdiği olayları bilir.
Profesyonel: maksimum ayrıştırma, tek bir arıza noktası yok, uygulaması basit. Aykırı: hata ayıklaması zor (destan mantığı dağıtılmış), görünürlük yok Destanın durumunun merkezinde, kötü yönetilen olaylar döngüsü riski var.
// CHOREOGRAPHY SAGA con Spring Kafka
// ===== STEP 1: Servizio Ordini - crea l'ordine =====
@Service
public class OrdiniService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public String creaOrdine(CreaOrdineCommand cmd) {
String ordineId = UUID.randomUUID().toString();
// Salva l'ordine in stato PENDING
Ordine ordine = Ordine.builder()
.id(ordineId)
.clienteId(cmd.getClienteId())
.prodotti(cmd.getProdotti())
.stato(StatoOrdine.PENDING)
.build();
ordineRepository.save(ordine);
// Pubblica evento: il Servizio Inventario lo ascolterà
OrdineCreato event = new OrdineCreato(ordineId, cmd.getProdotti());
kafkaTemplate.send("ordini-events", ordineId, event);
return ordineId;
}
// Ascolta la conferma del pagamento per finalizzare l'ordine
@KafkaListener(topics = "pagamenti-events", groupId = "ordini-service")
public void onPagamento(ConsumerRecord<String, Object> record) {
if (record.value() instanceof PagamentoConfermato event) {
ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.CONFERMATO);
kafkaTemplate.send("ordini-events", event.getOrdineId(),
new OrdineConfermato(event.getOrdineId()));
}
if (record.value() instanceof PagamentoFallito event) {
// Compensating: annulla l'ordine
ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.ANNULLATO);
}
}
}
// ===== STEP 2: Servizio Inventario - riserva i prodotti =====
@Service
public class InventarioService {
@KafkaListener(topics = "ordini-events", groupId = "inventario-service")
public void onOrdinCreato(ConsumerRecord<String, Object> record) {
if (record.value() instanceof OrdineCreato event) {
try {
// Tenta la prenotazione dell'inventario
prenotaInventario(event.getOrdineId(), event.getProdotti());
// Successo: pubblica evento per il Servizio Pagamenti
kafkaTemplate.send("inventario-events", event.getOrdineId(),
new InventarioRiservato(event.getOrdineId(), event.getTotale()));
} catch (InventarioInsufficienterException e) {
// Fallimento: pubblica evento di fallimento
// Il Servizio Ordini annullerà l'ordine
kafkaTemplate.send("inventario-events", event.getOrdineId(),
new InventarioNonDisponibile(event.getOrdineId(), e.getMessage()));
}
}
// Compensating: rilascia l'inventario se il pagamento fallisce
if (record.value() instanceof PagamentoFallito event) {
rilasciaInventario(event.getOrdineId());
}
}
}
// ===== STEP 3: Servizio Pagamenti - processa il pagamento =====
@Service
public class PagamentiService {
@KafkaListener(topics = "inventario-events", groupId = "pagamenti-service")
public void onInventarioRiservato(ConsumerRecord<String, Object> record) {
if (record.value() instanceof InventarioRiservato event) {
try {
processaPagamento(event.getOrdineId(), event.getTotale());
kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
new PagamentoConfermato(event.getOrdineId()));
} catch (PagamentoRifiutatoException e) {
kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
new PagamentoFallito(event.getOrdineId(), e.getMessage()));
}
}
}
}
Orkestrasyon Efsanesi: Merkezi Kontrol
'deOrkestrasyon Efsanesiadı verilen merkezi bir bileşen Saga Orkestratörü (veya Süreç Yöneticisi) tüm sırayı koordine eder. Orkestracı tüm adımları biliyor, Hizmetlere Komut gönderin ve yanıt olaylarını bekleyin. Başarısızlık durumunda, orkestratör açıkça telafi komutları gönderir.
Profesyonel: merkezi ve görünür mantık, hata ayıklaması kolay, açık destan durumu, daha hassas hata işleme. Aykırı: ana bağlantı, orkestratör ilgili tüm hizmetleri bilir.
// ORCHESTRATION SAGA con Axon Framework (Java)
// Axon gestisce automaticamente la persistenza dello stato della saga
import org.axonframework.saga.*;
import org.axonframework.eventhandling.*;
@Saga
public class AcquistoSaga {
@Autowired
private transient CommandGateway commandGateway;
// Lo stato della saga persiste automaticamente tra gli step
private String ordineId;
private List<ProdottoOrdinato> prodotti;
private BigDecimal totale;
// STEP 1: Avvio della saga quando viene creato un ordine
@StartSaga
@SagaEventHandler(associationProperty = "ordineId")
public void on(OrdineCreato event) {
this.ordineId = event.getOrdineId();
this.prodotti = event.getProdotti();
this.totale = event.getTotale();
// Invia comando al servizio inventario
commandGateway.send(new RiservaInventarioCommand(
ordineId,
prodotti
));
}
// STEP 2: Inventario riservato con successo - procedi con il pagamento
@SagaEventHandler(associationProperty = "ordineId")
public void on(InventarioRiservato event) {
commandGateway.send(new ProcessaPagamentoCommand(
ordineId,
totale,
"CARTA_CREDITO"
));
}
// STEP 3: Pagamento confermato - finalizza l'ordine
@SagaEventHandler(associationProperty = "ordineId")
public void on(PagamentoConfermato event) {
commandGateway.send(new ConfermaOrdineCommand(ordineId));
}
// Fine saga (successo)
@EndSaga
@SagaEventHandler(associationProperty = "ordineId")
public void on(OrdineConfermato event) {
System.out.println("Saga completata con successo per ordine: " + ordineId);
}
// ===== COMPENSATING TRANSACTIONS =====
// Pagamento fallito: rilascia inventario, poi annulla ordine
@SagaEventHandler(associationProperty = "ordineId")
public void on(PagamentoFallito event) {
// Compensating step 1: rilascia inventario
commandGateway.send(new RilasciaInventarioCommand(ordineId, prodotti));
}
// Inventario rilasciato - annulla l'ordine
@SagaEventHandler(associationProperty = "ordineId")
public void on(InventarioRilasciato event) {
// Compensating step 2: annulla l'ordine
commandGateway.send(new AnnullaOrdineCommand(ordineId, "Pagamento fallito"));
}
// Inventario non disponibile - annulla direttamente
@SagaEventHandler(associationProperty = "ordineId")
public void on(InventarioNonDisponibile event) {
commandGateway.send(new AnnullaOrdineCommand(ordineId, "Inventario insufficiente"));
}
// Fine saga per percorso di fallimento
@EndSaga
@SagaEventHandler(associationProperty = "ordineId")
public void on(OrdineAnnullato event) {
System.out.println("Saga fallita/annullata per ordine: " + ordineId + ". Motivo: " + event.getMotivo());
}
}
AWS Step Functions ile Saga (Sunucusuz)
AWS'deki sunucusuz mimariler için, Adım Fonksiyonları yönetilen hizmettir Orchestration Saga'yı hayata geçirmek. Her adım bir Lambda işlevi veya bir AWS eylemidir. ve Step Functions, durumu, yeniden denemeyi ve telafileri tanım gereği otomatik olarak yönetir Amazon Eyalet Dilinde (ASL).
{
"Comment": "Saga acquisto e-commerce con compensating transactions",
"StartAt": "CreaOrdine",
"States": {
"CreaOrdine": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:crea-ordine",
"Next": "RiservaInventario",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "FallimentoSaga"
}
]
},
"RiservaInventario": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:riserva-inventario",
"Next": "ProcessaPagamento",
"Catch": [
{
"ErrorEquals": ["InventarioInsufficienterException"],
"Next": "CompensaAnnullaOrdine"
}
]
},
"ProcessaPagamento": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:processa-pagamento",
"Retry": [
{
"ErrorEquals": ["TransientPaymentError"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Next": "ConfermaOrdine",
"Catch": [
{
"ErrorEquals": ["PagamentoRifiutatoException"],
"Next": "CompensaRilasciaInventario"
}
]
},
"ConfermaOrdine": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:conferma-ordine",
"End": true
},
"CompensaRilasciaInventario": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:rilascia-inventario",
"Next": "CompensaAnnullaOrdine"
},
"CompensaAnnullaOrdine": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:annulla-ordine",
"Next": "FallimentoSaga"
},
"FallimentoSaga": {
"Type": "Fail",
"Error": "SagaFailed",
"Cause": "La transazione distribuita e fallita, compensazioni eseguite"
}
}
}
Tazminat İşlemlerinde İdemsizlik
Telafi edici işlemler yapılmalı güçsüz: birkaç kez çağrılırsa (ağ veya orkestratörün yeniden denemesi nedeniyle), sonuç aynı olmalıdır. En yaygın teknik bir kullanmaktır. idempotency anahtarı: orderId + işlem türü, yinelenen işlemleri önleyen benzersiz bir anahtar oluşturur.
// RilasciaInventarioHandler.java - Compensating transaction idempotente
@CommandHandler
public void handle(RilasciaInventarioCommand cmd) {
// Controllo idempotenza: questa compensazione e gia stata eseguita?
String idempotencyKey = cmd.getOrdineId() + ":RILASCIA_INVENTARIO";
if (compensazioniEseguite.contains(idempotencyKey)) {
System.out.println("Compensazione gia eseguita per " + idempotencyKey + ", skip");
return;
}
// Esegui la compensazione
for (ProdottoOrdinato prodotto : cmd.getProdotti()) {
inventarioRepository.incrementaDisponibilita(
prodotto.getProductId(),
prodotto.getQuantita()
);
}
// Segna la compensazione come eseguita
compensazioniEseguite.add(idempotencyKey);
// Pubblica evento per continuare la catena di compensazione
eventBus.publish(new InventarioRilasciato(cmd.getOrdineId()));
}
Saga Durum Takibi: Eyaletin Görünürlüğü
Orchestration Saga'ya getirilen eleştirilerden biri de durumu takip etmenin zorluğudur. Yüksek hacimli sistemlerde. Açık bir destan izleme tablosu bu sorunu çözer:
-- Tabella di tracking per le saga in corso
CREATE TABLE saga_state (
saga_id VARCHAR(36) PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
ordine_id VARCHAR(36) NOT NULL,
current_step VARCHAR(100) NOT NULL,
stato VARCHAR(50) NOT NULL, -- IN_CORSO, COMPLETATA, FALLITA, COMPENSATA
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
error_message TEXT
);
-- Query utili per monitoring
-- Saga in corso da piu di 5 minuti (potenzialmente bloccate)
SELECT * FROM saga_state
WHERE stato = 'IN_CORSO'
AND started_at < NOW() - INTERVAL '5 minutes';
-- Distribuzione per step corrente
SELECT current_step, COUNT(*) as count
FROM saga_state WHERE stato = 'IN_CORSO'
GROUP BY current_step;
Karşılaştırma: Koreografi ve Orkestrasyon
| bekliyorum | Koreografi | Orkestrasyon |
|---|---|---|
| Kaplin | Düşük (olaylar) | Orta (orkestratör hizmetleri bilir) |
| Durum görünürlüğü | Sert (dağıtılmış) | Yüksek (merkezi) |
| Hata ayıklama | Sert (dağıtılmış mantık) | Kolay (açık durum) |
| Hizmetlerin karmaşıklığı | Yüksek (her servis tazminatları bilir) | Düşük (basit hizmetler, orkestratördeki mantık) |
| Ölçeklenebilirlik | Yüksek (tek puan yok) | Orta (Ölçeğe göre Orkestratör) |
| İdeal kullanım durumu | Birkaç adım, otonom hizmetler, bağımsız ekipler | Birçok adım, karmaşık mantık, gerekli görünürlük |
Serideki Sonraki Adımlar
- Madde 6 – AWS EventBridge: Koreografi Saga, bir mesaj veriyolunu kullanarak olay alışverişi. EventBridge, akıllı yönlendirme için ideal AWS yönetilen veriyoludur Lambda ve SQS arasındaki olayların.
- Madde 10 – Giden Kutusu Modeli: Saga olaylarının atomik gönderimini sağlayın yerel veritabanı güncellemesiyle birlikte.
Diğer Serilerle Bağlantı
- Olay Kaynak Kullanımı + CQRS (Madde 4): Destan'ın her adımı olaylar üretir toplu durumu yeniden inşa etmek için bir hakikat kaynağı olarak kullanılabilir. Axon Framework, Saga + Event Sourcing kombinasyonunu yerel olarak destekler.
- Apaçi Kafka (Seri 38): Kafka ideal mesaj otobüsü Koreografi Efsanesi, dayanıklılık ve sipariş garantisiyle üretimde anahtar bölümü başına.







