Giden Kutusu Modeli: CDC ile Atomik Olay Yayıncılığı
Atomik olarak iki şeyi yapması gereken bir hizmetiniz var: Durumu veritabanına kaydetme ve Kafka veya SQS'de bir etkinlik yayınlayın. Sorun şu şekilde bilinir: çift yazma - önemsiz gibi görünse de temel bir tuzağı gizliyor: doğrudan bir yol yok hem ilişkisel bir veritabanı hem de bir mesaj içeren bir işlemi yürütmek için komisyoncular. Veritabanına kaydedip Kafka'da yayınlamanız başarısız olursa durum tutarsızdır. Önce yayınlayıp ardından DB'ye kaydetme işlemi başarısız olursa, var olmayan bir durumu anlatırlar.
L'Giden Kutusu Deseni bu sorunu zarif bir şekilde çözüyor: bunun yerine doğrudan mesaj aracısına yazın, olayı içerideki bir giden kutusu tablosuna yazın durumu kaydeden aynı veritabanı işleminin. Ayrı bir süreç – röle — giden kutusu tablosundan okur ve komisyoncuya yayınlar. Atomiklik dağıtılmış bir koordinatör tarafından değil, veritabanı tarafından garanti edilir.
Ne Öğreneceksiniz
- İkili yazma sorunu ve neden saf yeniden denemeyle çözülemiyor
- Giden Kutusu Deseni: mimari ve ana bileşenler
- PostgreSQL'de giden kutusu şemasının uygulanması
- Anket Yayıncısı: Başlamak için en kolay geçiş
- Debezium CDC: Sıfır gecikmeli geçiş için Veri Yakalamayı Değiştirin
- Docker'da Kafka Connect ile Debezium'u Kurun
- Takaslar ve alternatifler: İşlem Günlüğü Kuyruklama ve Yoklama ne zaman kullanılmalı?
İkili Yazma Sorunu
Şunları yapması gereken bir sipariş hizmetini düşünelim: 1) siparişi DB'ye kaydetme, 2) yayınlama
bir olay OrderPlaced Kafka'yla ilgili. Her saf yaklaşımın bir penceresi vardır
tutarsızlık:
// Approccio 1: DB prima, poi Kafka
// Problema: se Kafka e giu, l'evento non viene mai pubblicato
async function placeOrderV1(dto: PlaceOrderDto): Promise<Order> {
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
// Se questa linea fallisce: l'ordine e nel DB ma nessuno lo sa
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
return order.rows[0];
}
// Approccio 2: Kafka prima, poi DB
// Problema: se il DB fallisce, pubblichi un evento per un ordine che non esiste
async function placeOrderV2(dto: PlaceOrderDto): Promise<Order> {
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
// Se questa linea fallisce: l'evento e pubblicato ma l'ordine non esiste nel DB
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
return order.rows[0];
}
// Non esiste un "giusto ordine": il problema e strutturale.
// I due sistemi (DB + broker) non partecipano alla stessa transazione ACID.
Giden Kutusu Deseninin Mimarisi
Desen bir tabloyu tanıtır outbox_events aynı veritabanında
hizmetin. Uygulama aynı işlemde giden kutusu tablosuna yazar
etki alanı verilerini kaydeder. Ayrı bir röle, giden kutusu tablosundan okur ve yayınlar
aracıda, olayları yayınlanmış olarak işaretler.
-- Schema outbox: nella stessa istanza PostgreSQL del servizio
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- es. 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- es. ID ordine
event_type VARCHAR(100) NOT NULL, -- es. 'OrderPlaced'
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = non ancora pubblicato
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Index per il relay che legge gli eventi non pubblicati
CREATE INDEX idx_outbox_unpublished
ON outbox_events(created_at)
WHERE published_at IS NULL AND retry_count < 5;
-- Index per la pulizia dei vecchi record
CREATE INDEX idx_outbox_cleanup
ON outbox_events(published_at)
WHERE published_at IS NOT NULL;
// TypeScript: scrivi nella outbox nella stessa transazione
class OrderRepository {
constructor(private readonly db: Pool) {}
async saveWithEvents(
order: Order,
events: DomainEvent[]
): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// 1. Salva lo stato del dominio
await client.query(`
INSERT INTO orders (id, customer_id, total_amount, status)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status
`, [order.id, order.customerId, order.totalAmount, order.status]);
// 2. Salva gli eventi nella tabella outbox
// STESSA TRANSAZIONE: atomico con il salvataggio dell'ordine
for (const event of events) {
await client.query(`
INSERT INTO outbox_events
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
`, [
event.eventId,
event.aggregateType,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
]);
}
await client.query('COMMIT');
// A questo punto: o sia l'ordine che gli eventi sono nel DB,
// o nessuno dei due. Zero possibilita di stato inconsistente.
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Domain service
class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly eventBus: EventBus
) {}
async placeOrder(dto: PlaceOrderDto): Promise<Order> {
const order = Order.create(dto); // genera anche gli eventi nel dominio
const events = order.getUncommittedEvents();
// Singola operazione atomica: ordine + eventi outbox
await this.repo.saveWithEvents(order, events);
order.clearEvents();
return order;
}
}
Anket Yayıncısı: Basit Aktarma
Il Anket Yayıncısı ve ayrı bir süreç (veya bir cron işi) yayınlanmamış olayları bulmak için giden kutusu tablosunu düzenli olarak sorgulayın ve orada komisyoncuya gönderin. Uygulaması basit, oylamada gecikmeye neden oluyor (genellikle 100ms-5s).
// Polling Publisher: relay che legge outbox e pubblica su Kafka
class OutboxPollingPublisher {
private isRunning = false;
private readonly POLL_INTERVAL_MS = 500;
private readonly BATCH_SIZE = 100;
constructor(
private readonly db: Pool,
private readonly kafka: KafkaProducer
) {}
async start(): Promise<void> {
this.isRunning = true;
console.log('OutboxPollingPublisher started');
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('OutboxPublisher error:', error);
// Continua dopo errore: non vogliamo che il publisher si fermi
}
await this.sleep(this.POLL_INTERVAL_MS);
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// SELECT FOR UPDATE SKIP LOCKED: evita conflitti con publisher multipli
const { rows } = await client.query(`
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [this.BATCH_SIZE]);
if (rows.length === 0) {
await client.query('ROLLBACK');
return;
}
// Pubblica su Kafka in batch
const messages = rows.map((row) => ({
key: row.aggregate_id,
value: JSON.stringify({
eventId: row.id,
aggregateType: row.aggregate_type,
aggregateId: row.aggregate_id,
eventType: row.event_type,
payload: row.payload,
}),
headers: {
'event-type': row.event_type,
'aggregate-type': row.aggregate_type,
},
}));
await this.kafka.sendBatch({
topicMessages: [{
topic: `${rows[0].aggregate_type.toLowerCase()}-events`,
messages,
}],
});
// Marca come pubblicati (nella stessa transazione)
const ids = rows.map((r) => r.id);
await client.query(`
UPDATE outbox_events
SET published_at = NOW()
WHERE id = ANY($1)
`, [ids]);
await client.query('COMMIT');
console.log(`Published ${rows.length} events from outbox`);
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count per gli eventi falliti
const ids = []; // recuperali dal context
await this.db.query(`
UPDATE outbox_events
SET retry_count = retry_count + 1,
last_error = $1
WHERE id = ANY($2)
`, [(error as Error).message, ids]);
throw error;
} finally {
client.release();
}
}
stop(): void {
this.isRunning = false;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Debezium CDC: Sıfır Gecikme için Veri Yakalamayı Değiştirin
Anket Yayımcısı gecikmeyi (ne sıklıkla anket yapılmalı?) ve veritabanına yüklemeyi başlatır (sürekli sorgular). Debezyum — Kafka Connect ekosisteminin bir parçası — okur doğrudan işlem günlüğü veritabanının (PostgreSQL'de WAL) ve yayınlanmasının Kafka'da gerçek zamanlı olarak değişiklikler yapılır ve gecikme süresi genellikle 100 ms'nin altındadır.
# docker-compose.yml: setup Debezium con PostgreSQL e Kafka
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: orders_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
# Abilita il WAL logico necessario per Debezium
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
kafka-connect:
image: debezium/connect:2.6
depends_on: [kafka, postgres]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
# Registra il Debezium PostgreSQL Connector via REST API
# POST http://localhost:8083/connectors
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}.events"
}
}
# Questo crea automaticamente i topic:
# outbox.Order.events
# outbox.Payment.events
# outbox.Inventory.events
# basandosi sul campo aggregate_type nella tabella outbox
Giden Kutusu Tablosunun Temizlenmesi
Giden kutusu tablosu zamanla büyür. Bir temizleme stratejisi uygulayın Mevcut operasyonları etkilemeden boyutu kontrol altında tutun.
// Job di cleanup: esegui ogni ora con pg_cron o un cron esterno
-- Cancella eventi pubblicati piu vecchi di 7 giorni
-- Usa DELETE con LIMIT per evitare lock estesi
DO $
DECLARE
deleted_count INTEGER;
BEGIN
LOOP
DELETE FROM outbox_events
WHERE id IN (
SELECT id FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days'
LIMIT 1000
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count < 1000;
-- Pausa tra batch per non sovraccaricare il DB
PERFORM pg_sleep(0.1);
END LOOP;
END $;
Anket Yayıncısı vs CDC: Hangisi Ne Zaman Kullanılmalı
- Anket Yayıncıları: basit kurulum, ek bileşen yok, 100 ms-5s gecikme. Yeni başlayanlar ve düşük ila orta hacimli sistemler için idealdir.
- Debezium CDC'si: 100 ms'nin altındaki gecikme süresi, veritabanında sıfır yoklama ek yükü, Kafka Connect ve WAL yapılandırması gerektirir. Yüksek adetli üretim için idealdir.
- Hafif alternatif: Kafka olmadan gerçek zamanlı bildirimler isteyen yalnızca PostgreSQL sistemleri için pg_notify + LISTEN.
Sonuçlar ve Sonraki Adımlar
Giden Kutusu Modeli, olaya dayalı mimarinin en sinsi sorunlarından birini çözer: olayların atomik yayını. Anahtar, ACID işlemlerinden yararlanmaktır Dağıtılmış bir sistem yerine koordinatör olarak veritabanı. Debezium CDC ile geçiş, minimum gecikme süresiyle ve uygulama sorguları üzerinde sıfır etkiyle neredeyse şeffaf hale gelir.







