Model de expediere: Atomic Event Publishing cu CDC
Aveți un serviciu care trebuie să facă două lucruri atomic: Salvați starea în baza de date și publică un eveniment către Kafka sau SQS. Problema - cunoscută ca scriere duală — pare banal, dar ascunde o capcană fundamentală: nu există cale directă pentru a executa o tranzacție care implică atât o bază de date relațională, cât și un mesaj brokeri. Dacă salvați în DB și apoi publicați în Kafka eșuează, starea este inconsecventă. Dacă publicați mai întâi și apoi salvați în DB eșuează, publicați evenimente care ele descriu o stare care nu există.
L'Model pentru căsuța de ieșire rezolvă această problemă elegant: în loc de scrieți direct brokerului de mesaje, scrieți evenimentul într-un tabel din interior a aceleiași tranzacții de bază de date care salvează statul. Un proces separat - cel releu — citește din tabelul de expediere și publică către broker. Atomicitatea este garantat de baza de date, nu de un coordonator distribuit.
Ce vei învăța
- Problema de scriere duală și de ce nu poate fi rezolvată cu reîncercare naivă
- Outbox Pattern: arhitectură și componente principale
- Implementarea schemei de ieșire în PostgreSQL
- Polling Publisher: cel mai simplu releu de început
- Debezium CDC: Modificați captura de date pentru releu cu latență zero
- Configurați Debezium cu Kafka Connect pe Docker
- Compensații și alternative: când să utilizați Tranzacția Log Tailing vs Polling
Problema de scriere duală
Să luăm în considerare un serviciu de comandă care trebuie: 1) să salveze comanda în DB, 2) să o publice
un eveniment OrderPlaced despre Kafka. Orice abordare naivă are o fereastră
de inconsecvență:
// Approccio 1: DB prima, poi Kafka
// Problema: se Kafka e giu, l'evento non viene mai pubblicato
async function placeOrderV1(dto: PlaceOrderDto): Promise<Order> {
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
// Se questa linea fallisce: l'ordine e nel DB ma nessuno lo sa
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
return order.rows[0];
}
// Approccio 2: Kafka prima, poi DB
// Problema: se il DB fallisce, pubblichi un evento per un ordine che non esiste
async function placeOrderV2(dto: PlaceOrderDto): Promise<Order> {
await kafka.produce('order-events', { type: 'OrderPlaced', orderId: dto.id });
// Se questa linea fallisce: l'evento e pubblicato ma l'ordine non esiste nel DB
const order = await db.query(
'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3) RETURNING *',
[dto.id, dto.customerId, dto.total]
);
return order.rows[0];
}
// Non esiste un "giusto ordine": il problema e strutturale.
// I due sistemi (DB + broker) non partecipano alla stessa transazione ACID.
Arhitectura modelului căsuței de ieșire
Modelul introduce un tabel outbox_events în aceeași bază de date
a serviciului. Aplicația scrie în tabelul de expediere în aceeași tranzacție
care salvează datele domeniului. Un releu separat citește din tabelul de expediere și publică
pe broker, apoi marchează evenimentele ca fiind publicate.
-- Schema outbox: nella stessa istanza PostgreSQL del servizio
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- es. 'Order'
aggregate_id VARCHAR(255) NOT NULL, -- es. ID ordine
event_type VARCHAR(100) NOT NULL, -- es. 'OrderPlaced'
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = non ancora pubblicato
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Index per il relay che legge gli eventi non pubblicati
CREATE INDEX idx_outbox_unpublished
ON outbox_events(created_at)
WHERE published_at IS NULL AND retry_count < 5;
-- Index per la pulizia dei vecchi record
CREATE INDEX idx_outbox_cleanup
ON outbox_events(published_at)
WHERE published_at IS NOT NULL;
// TypeScript: scrivi nella outbox nella stessa transazione
class OrderRepository {
constructor(private readonly db: Pool) {}
async saveWithEvents(
order: Order,
events: DomainEvent[]
): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// 1. Salva lo stato del dominio
await client.query(`
INSERT INTO orders (id, customer_id, total_amount, status)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status
`, [order.id, order.customerId, order.totalAmount, order.status]);
// 2. Salva gli eventi nella tabella outbox
// STESSA TRANSAZIONE: atomico con il salvataggio dell'ordine
for (const event of events) {
await client.query(`
INSERT INTO outbox_events
(id, aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
`, [
event.eventId,
event.aggregateType,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
]);
}
await client.query('COMMIT');
// A questo punto: o sia l'ordine che gli eventi sono nel DB,
// o nessuno dei due. Zero possibilita di stato inconsistente.
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Domain service
class OrderService {
constructor(
private readonly repo: OrderRepository,
private readonly eventBus: EventBus
) {}
async placeOrder(dto: PlaceOrderDto): Promise<Order> {
const order = Order.create(dto); // genera anche gli eventi nel dominio
const events = order.getUncommittedEvents();
// Singola operazione atomica: ordine + eventi outbox
await this.repo.saveWithEvents(order, events);
order.clearEvents();
return order;
}
}
Editura de sondaje: The Simple Relay
Il Editor de sondaje și un proces separat (sau un job cron) care interogați periodic tabelul de expediere pentru a găsi evenimente nepublicate și acolo trimite la broker. Simplu de implementat, introduce latența de sondare (de obicei 100 ms-5s).
// Polling Publisher: relay che legge outbox e pubblica su Kafka
class OutboxPollingPublisher {
private isRunning = false;
private readonly POLL_INTERVAL_MS = 500;
private readonly BATCH_SIZE = 100;
constructor(
private readonly db: Pool,
private readonly kafka: KafkaProducer
) {}
async start(): Promise<void> {
this.isRunning = true;
console.log('OutboxPollingPublisher started');
while (this.isRunning) {
try {
await this.publishPendingEvents();
} catch (error) {
console.error('OutboxPublisher error:', error);
// Continua dopo errore: non vogliamo che il publisher si fermi
}
await this.sleep(this.POLL_INTERVAL_MS);
}
}
private async publishPendingEvents(): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// SELECT FOR UPDATE SKIP LOCKED: evita conflitti con publisher multipli
const { rows } = await client.query(`
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [this.BATCH_SIZE]);
if (rows.length === 0) {
await client.query('ROLLBACK');
return;
}
// Pubblica su Kafka in batch
const messages = rows.map((row) => ({
key: row.aggregate_id,
value: JSON.stringify({
eventId: row.id,
aggregateType: row.aggregate_type,
aggregateId: row.aggregate_id,
eventType: row.event_type,
payload: row.payload,
}),
headers: {
'event-type': row.event_type,
'aggregate-type': row.aggregate_type,
},
}));
await this.kafka.sendBatch({
topicMessages: [{
topic: `${rows[0].aggregate_type.toLowerCase()}-events`,
messages,
}],
});
// Marca come pubblicati (nella stessa transazione)
const ids = rows.map((r) => r.id);
await client.query(`
UPDATE outbox_events
SET published_at = NOW()
WHERE id = ANY($1)
`, [ids]);
await client.query('COMMIT');
console.log(`Published ${rows.length} events from outbox`);
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count per gli eventi falliti
const ids = []; // recuperali dal context
await this.db.query(`
UPDATE outbox_events
SET retry_count = retry_count + 1,
last_error = $1
WHERE id = ANY($2)
`, [(error as Error).message, ids]);
throw error;
} finally {
client.release();
}
}
stop(): void {
this.isRunning = false;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Debezium CDC: Modificați captura de date pentru zero-latență
Polling Publisher introduce latența (cât de des să sondaj?) și încărcarea în baza de date (interogări continue). Debezium — parte a ecosistemului Kafka Connect — citește direct jurnalul de tranzacții a bazei de date (WAL în PostgreSQL) și publicați modificări pe Kafka în timp real, cu o latență de obicei sub 100 ms.
# docker-compose.yml: setup Debezium con PostgreSQL e Kafka
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: orders_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
# Abilita il WAL logico necessario per Debezium
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
kafka-connect:
image: debezium/connect:2.6
depends_on: [kafka, postgres]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
# Registra il Debezium PostgreSQL Connector via REST API
# POST http://localhost:8083/connectors
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.${routedByValue}.events"
}
}
# Questo crea automaticamente i topic:
# outbox.Order.events
# outbox.Payment.events
# outbox.Inventory.events
# basandosi sul campo aggregate_type nella tabella outbox
Curățarea tabelului Outbox
Tabelul de expediere crește în timp. Implementați o strategie de curățare pentru menține dimensiunea sub control fără a afecta operațiunile curente.
// Job di cleanup: esegui ogni ora con pg_cron o un cron esterno
-- Cancella eventi pubblicati piu vecchi di 7 giorni
-- Usa DELETE con LIMIT per evitare lock estesi
DO $
DECLARE
deleted_count INTEGER;
BEGIN
LOOP
DELETE FROM outbox_events
WHERE id IN (
SELECT id FROM outbox_events
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days'
LIMIT 1000
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count < 1000;
-- Pausa tra batch per non sovraccaricare il DB
PERFORM pg_sleep(0.1);
END LOOP;
END $;
Polling Publisher vs CDC: Când să folosiți care
- Editorii de sondaje: Configurare simplă, fără componente suplimentare, latență de 100 ms-5s. Ideal pentru începători și sisteme de volum mic până la mediu.
- Debezium CDC: latență sub 100 ms, overhead de sondare zero pe DB, necesită configurarea Kafka Connect și WAL. Ideal pentru producție de volum mare.
- Alternativă ușoară: pg_notify + LISTEN pentru sistemele numai PostgreSQL care doresc notificări în timp real fără Kafka.
Concluzii și pașii următori
Modelul Outbox rezolvă una dintre cele mai insidioase probleme ale arhitecturii bazate pe evenimente: publicarea atomică a evenimentelor. Cheia este să folosiți tranzacțiile cu ACID baza de date ca coordonator în locul unui sistem distribuit. Cu Debezium CDC, releul devine aproape transparent cu latență minimă și impact zero asupra interogărilor aplicației.







