Tüketicilerde Kimlik Eksikliği: Tekilleştirme ve Kimlik Eksikliği Anahtarı
Olay odaklı sisteminizi SQS, Kafka veya EventBridge ile kurdunuz. Mesajlar akıyorlar, tüketiciler olayları işliyor, geliştirme aşamasında her şey mükemmel çalışıyor. Daha sonra üretime gidiyorsunuz ve bazı sipariş onay e-postalarının iki kez gönderildiğini görüyorsunuz. kez veya daha kötüsü, belirli ödemelerin iki kez tahsil edilmesi. Sorun değil kodunuzdaki bir hata: bu, modern dağıtılmış sistemlerin temel bir özelliğidir.
Tüm önemli mesaj aracıları (SQS, Kafka, RabbitMQ, EventBridge) garanti verir teslimat en az bir kez: Bir mesaj en az bir kez iletilir ancak birden çok kez teslim edilebilir. Bu, otomatik yeniden denemelerde olur. Tüketici gruplarının yeniden dengelenmesi, görünürlük zaman aşımı, tüketici çöküşü işleme. Çözüm komisyoncuda değil, tüketicide. Tüketici olmalı güçsüz.
Ne Öğreneceksiniz
- Çünkü en az bir kerelik sistemler kaçınılmaz olarak yinelenen mesajlar üretir
- Idempotency Key: veri tekilleştirmenin temel modeli
- INSERT ON CONFLICT ile veritabanı düzeyinde yetersizlik
- Yüksek performans için TTL ile Redis tabanlı veri tekilleştirme
- Gelen Kutusu Modeli: tam olarak bir kez anlambilim için yapılandırılmış çözüm
- Doğal idempotency: doğal olarak idempotent operasyonların nasıl tasarlanacağı
- Tüketici bağımsızlığını doğrulamak için stratejileri test etme
Yinelenen Mesajlar Neden Kaçınılmazdır?
Tüketici bağımsızlığının neden gerekli olduğunu anlamak için ne zaman olduğunu anlamamız gerekir. bir mesajın birden fazla kez iletilmesi. Üretimdeki ana durumlar:
Durum 1: Tüketicinin İşleme Sonrası, ACK Öncesi Çökmesi
Tüketici mesajı başarıyla işler (DB'ye yazar, harici API'yi çağırır) ancak ACK'yi aracıya göndermeden önce çöküyor. Komisyoncu mesajın dikkate alınmadığını düşünüyor teslim edilir ve görünürlük zaman aşımından sonra geri gönderilir. Yeni bir tüketici (veya aynı yeniden başlatmanın ardından) mesajı alır ve yeniden işler.
Durum 2: İşleme Zaman Aşımı
SQS'in bir özelliği var görünürlük zaman aşımı (varsayılan 30 saniye). Tüketici çalışıyorsa SQS, zaman aşımını uzatmadan mesajı işlemek için 30 saniyeden daha uzun bir süreye ihtiyaç duyar. mesajın diğer tüketiciler tarafından görülebilmesi. Mesaj iki kez işlenir iki farklı tüketici tarafından paralel olarak.
Durum 3: Kafka Tüketici Grubunun Yeniden Dengelenmesi
Kafka tüketici grubunun yeniden dengelenmesi sırasında (tüketici eklemek/çıkarmak için, yuvarlanan dağıtımı), bazı bölümler yeniden atanır. Gelen tüketici ise kaldırılan ofset henüz tamamlanmadı, bu gruptaki mesajlar geliyor bölüme atanan yeni tüketici tarafından yeniden işlenir.
// Simulazione: perche i duplicati sono inevitabili
// Questo codice mostra IL PROBLEMA, non la soluzione
async function processPayment(message: SQSMessage): Promise<void> {
const { paymentId, amount, customerId } = JSON.parse(message.Body);
// Step 1: chiama l'API di pagamento esterna
await paymentGateway.charge(customerId, amount);
// ^^^ SUCCESSO: il pagamento e stato addebitato
// -- CRASH QUI --
// Il processo muore per OOM, segfault, deploy, ecc.
// Il pagamento e gia stato addebitato MA non abbiamo
// ancora eliminato il messaggio dalla coda SQS.
// SQS considera il messaggio non processato e lo
// rimanda dopo il visibility timeout.
await sqs.deleteMessage({
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle,
});
// ^^^ Mai eseguito se crashiamo sopra
}
// Risultato: il cliente viene addebitato due volte.
// Nessun bug nel codice. E' la natura del sistema at-least-once.
Desen Idempotency Anahtarı
En yaygın çözüm bir kullanmaktır. idempotency anahtarı: bir tanımlayıcı Tüketicinin daha önce işlem yapıp yapmadığını tespit etmesine olanak tanıyan her işlem için benzersiz bu mesaj. Tüketici işlemden önce veritabanını kontrol eder: eğer anahtar zaten var, sessizliği atla; yoksa anahtarı işleyin ve kaydedin.
// Pattern base: Idempotency Key con PostgreSQL
// Schema tabella per il tracking
CREATE TABLE processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
consumer_name VARCHAR(100) NOT NULL,
-- TTL gestito da un job di cleanup o da una policy di partizione
expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + INTERVAL '7 days'
);
CREATE INDEX idx_processed_messages_expires
ON processed_messages(expires_at);
-- Job di cleanup: esegui ogni ora
DELETE FROM processed_messages WHERE expires_at < NOW();
// TypeScript: consumer idempotente con PostgreSQL
interface ProcessedMessageRecord {
messageId: string;
processedAt: Date;
consumerName: string;
}
class IdempotentConsumer {
constructor(
private readonly db: Pool,
private readonly consumerName: string
) {}
async processMessage<T>(
messageId: string,
payload: T,
handler: (payload: T) => Promise<void>
): Promise<{ processed: boolean; skipped: boolean }> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Tenta di inserire il messageId (fail se gia esiste)
const result = await client.query<ProcessedMessageRecord>(`
INSERT INTO processed_messages (message_id, consumer_name, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '7 days')
ON CONFLICT (message_id) DO NOTHING
RETURNING message_id
`, [messageId, this.consumerName]);
if (result.rowCount === 0) {
// Gia processato: skip idempotente
await client.query('ROLLBACK');
console.log(`[${this.consumerName}] Skipping duplicate: ${messageId}`);
return { processed: false, skipped: true };
}
// Prima volta: esegui l'handler nella stessa transazione
await handler(payload);
await client.query('COMMIT');
return { processed: true, skipped: false };
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
}
// Utilizzo nel consumer SQS
const consumer = new IdempotentConsumer(db, 'payment-processor');
async function handlePaymentEvent(message: SQSMessage): Promise<void> {
const payload = JSON.parse(message.Body);
const messageId = message.MessageId; // ID univoco SQS
const { processed, skipped } = await consumer.processMessage(
messageId,
payload,
async (data) => {
await paymentGateway.charge(data.customerId, data.amount);
await db.query(
'UPDATE orders SET payment_status = $1 WHERE id = $2',
['paid', data.orderId]
);
}
);
if (skipped) {
// Log ma non errore: comportamento atteso
metrics.increment('consumer.duplicate_skipped');
}
}
Redis Tabanlı Veri Tekilleştirme: Yüksek Performans
PostgreSQL kontrolü, her mesaj için bir veritabanı sorgusu sunar. için yüksek verimli sistemler (saniyede binlerce mesaj), bu durum bir darboğaz. TTL ile Redis ve çözüm: O(1) işlemi, milisaniyenin altında gecikme, Otomatik son kullanma tarihi için yerel TTL.
// Redis-based deduplication per alto throughput
import { Redis } from 'ioredis';
class RedisIdempotencyStore {
constructor(
private readonly redis: Redis,
private readonly ttlSeconds: number = 86400 // 24 ore default
) {}
// Ritorna true se e la PRIMA VOLTA che vediamo questa key
// Ritorna false se e un duplicato
async setIfAbsent(key: string): Promise<boolean> {
// SET key value NX EX ttl
// NX = solo se non esiste
// EX = TTL in secondi
const result = await this.redis.set(
`dedup:${key}`,
'1',
'EX',
this.ttlSeconds,
'NX'
);
return result === 'OK'; // 'OK' = primo inserimento, null = gia esisteva
}
async isProcessed(key: string): Promise<boolean> {
const exists = await this.redis.exists(`dedup:${key}`);
return exists === 1;
}
// Per operazioni atomiche: check + set in Lua script
async checkAndSet(key: string): Promise<boolean> {
const luaScript = `
local exists = redis.call('EXISTS', KEYS[1])
if exists == 0 then
redis.call('SET', KEYS[1], '1', 'EX', ARGV[1])
return 1
end
return 0
`;
const result = await this.redis.eval(
luaScript,
1,
`dedup:${key}`,
this.ttlSeconds.toString()
);
return result === 1;
}
}
// Consumer con Redis deduplication
class HighThroughputConsumer {
constructor(
private readonly dedup: RedisIdempotencyStore,
private readonly db: Pool
) {}
async handleKafkaMessage(
topic: string,
partition: number,
offset: string,
payload: OrderPayload
): Promise<void> {
// Componi una key univoca: topic + partition + offset
const messageKey = `${topic}-${partition}-${offset}`;
const isFirst = await this.dedup.setIfAbsent(messageKey);
if (!isFirst) {
// Duplicato: skip
return;
}
// Prima elaborazione: procedi
await this.processOrder(payload);
}
private async processOrder(payload: OrderPayload): Promise<void> {
await this.db.query(
'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
[payload.quantity, payload.productId]
);
}
}
// ATTENZIONE: Redis ha durabilita limitata.
// Se Redis perde dati (AOF/RDB non aggiornati), i duplicati
// potrebbero passare dopo un crash Redis.
// Per operazioni critiche (pagamenti), usa sempre PostgreSQL.
Gelen Kutusu Modeli: Tam Olarak Bir Kez Anlambilimi
Desen Gelen kutusu ve bağımsızlığın en sağlam versiyonu: mesaj ilk önce gelen kutusu tablosuna yazılır (bir DB işlemi içinde), sonra denedim. İşleme sırasında çökme durumunda bile tam olarak bir kez garanti edilir.
-- Schema Inbox Pattern
CREATE TABLE inbox_messages (
id UUID PRIMARY KEY,
source VARCHAR(100) NOT NULL, -- nome del producer/queue
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ, -- NULL = non ancora processato
error TEXT, -- NULL = successo o non processato
retry_count INTEGER NOT NULL DEFAULT 0
);
-- Index per il worker che processa i messaggi pendenti
CREATE INDEX idx_inbox_pending
ON inbox_messages(received_at)
WHERE processed_at IS NULL AND retry_count < 3;
// TypeScript: Inbox Pattern completo
class InboxProcessor {
constructor(private readonly db: Pool) {}
// Fase 1: scrivi nella inbox (idempotente grazie al PK)
async receiveMessage(message: IncomingMessage): Promise<void> {
await this.db.query(`
INSERT INTO inbox_messages (id, source, event_type, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO NOTHING
`, [
message.id,
message.source,
message.eventType,
JSON.stringify(message.payload)
]);
// Se il messaggio arriva due volte, ON CONFLICT DO NOTHING
// lo scarta silenziosamente
}
// Fase 2: worker che processa i messaggi dalla inbox
async processPendingMessages(): Promise<void> {
// Prendi un messaggio con SELECT FOR UPDATE SKIP LOCKED
// Evita che piu worker prendano lo stesso messaggio
const { rows } = await this.db.query(`
SELECT id, event_type, payload
FROM inbox_messages
WHERE processed_at IS NULL
AND retry_count < 3
ORDER BY received_at
LIMIT 1
FOR UPDATE SKIP LOCKED
`);
if (rows.length === 0) return;
const message = rows[0];
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Esegui l'handler specifico per il tipo di evento
await this.dispatch(message.event_type, message.payload);
// Marca come processato nella stessa transazione
await client.query(`
UPDATE inbox_messages
SET processed_at = NOW(), error = NULL
WHERE id = $1
`, [message.id]);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
// Incrementa retry count e salva l'errore
await this.db.query(`
UPDATE inbox_messages
SET retry_count = retry_count + 1,
error = $1
WHERE id = $2
`, [(error as Error).message, message.id]);
} finally {
client.release();
}
}
private async dispatch(
eventType: string,
payload: unknown
): Promise<void> {
switch (eventType) {
case 'OrderPlaced':
await this.handleOrderPlaced(payload as OrderPayload);
break;
case 'PaymentReceived':
await this.handlePaymentReceived(payload as PaymentPayload);
break;
default:
throw new Error(`Unknown event type: ${eventType}`);
}
}
private async handleOrderPlaced(payload: OrderPayload): Promise<void> {
await this.db.query(
'UPDATE inventory SET reserved = reserved + $1 WHERE product_id = $2',
[payload.quantity, payload.productId]
);
}
private async handlePaymentReceived(payload: PaymentPayload): Promise<void> {
await this.db.query(
'UPDATE orders SET status = $1 WHERE id = $2',
['confirmed', payload.orderId]
);
}
}
Doğal Bağımsızlık: Doğa için Bağımsız Operasyonlar Tasarlamak
Bağımsızlığa karşı en şık çözüm, operasyonları öyle tasarlamaktır ki doğal olarak idempotenttir: bunları birden çok kez yürütmek aynı sonucu verir tek infazdan. Bu, açık izleme ihtiyacını ortadan kaldırır.
// Operazioni naturalmente idempotenti vs non idempotenti
// NON IDEMPOTENTE: aggiornamento relativo
// Se eseguita due volte, l'inventory diventa -2 invece di -1
async function decrementInventory(productId: string, qty: number): Promise<void> {
await db.query(
'UPDATE inventory SET quantity = quantity - $1 WHERE product_id = $2',
[qty, productId]
);
}
// IDEMPOTENTE: aggiornamento assoluto con versioning
// Usa il numero dell'ordine come "target state"
async function setInventoryForOrder(
productId: string,
orderId: string,
newQuantity: number
): Promise<void> {
await db.query(`
INSERT INTO inventory_reservations (order_id, product_id, quantity)
VALUES ($1, $2, $3)
ON CONFLICT (order_id, product_id)
DO UPDATE SET quantity = EXCLUDED.quantity
`, [orderId, productId, newQuantity]);
}
// NON IDEMPOTENTE: INSERT senza conflict handling
async function createPaymentRecord(payment: Payment): Promise<void> {
await db.query(
'INSERT INTO payments (id, order_id, amount) VALUES ($1, $2, $3)',
[payment.id, payment.orderId, payment.amount]
);
// Fallisce con unique constraint la seconda volta
}
// IDEMPOTENTE: UPSERT con ON CONFLICT DO NOTHING
async function upsertPaymentRecord(payment: Payment): Promise<void> {
await db.query(`
INSERT INTO payments (id, order_id, amount, status)
VALUES ($1, $2, $3, 'pending')
ON CONFLICT (id) DO NOTHING
`, [payment.id, payment.orderId, payment.amount]);
}
// IDEMPOTENTE: update a stato finale (state machine idempotente)
// Transitare da 'confirmed' a 'confirmed' non cambia nulla
async function markOrderAsShipped(orderId: string): Promise<void> {
await db.query(`
UPDATE orders
SET status = 'shipped', shipped_at = COALESCE(shipped_at, NOW())
WHERE id = $1
AND status IN ('confirmed', 'processing')
`, [orderId]);
// Se lo stato e gia 'shipped', la WHERE non matcha: no-op
}
SQS Düzeyinde Tekilleştirme
SQS FIFO Kuyruğu, aracılığıyla yerel tekilleştirme sunar Mesaj Tekilleştirme Kimliği. Tekilleştirme aralığında gönderilen aynı tekilleştirme kimliğine sahip iletiler (5 dakika) yalnızca bir kez teslim edilir. İdempotans ihtiyacını ortadan kaldırmaz tüketici tarafı, ancak kopyaları önemli ölçüde azaltır.
// AWS SDK v3: invio su SQS FIFO con MessageDeduplicationId
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
const sqs = new SQSClient({ region: 'eu-west-1' });
async function publishOrderEvent(
orderId: string,
eventType: string,
payload: unknown
): Promise<void> {
// MessageDeduplicationId: hash del contenuto o ID evento univoco
// Stesso ID = stesso messaggio entro 5 minuti = consegnato una sola volta
const deduplicationId = `${eventType}-${orderId}-${Date.now()}`;
await sqs.send(new SendMessageCommand({
QueueUrl: 'https://sqs.eu-west-1.amazonaws.com/123456/orders.fifo',
MessageBody: JSON.stringify(payload),
MessageGroupId: orderId, // Ordine FIFO per stesso ordine
MessageDeduplicationId: deduplicationId,
MessageAttributes: {
EventType: {
DataType: 'String',
StringValue: eventType,
},
},
}));
}
// SQS Standard Queue: nessuna deduplication nativa
// Puoi usare l'Attribute MessageId come idempotency key nel consumer
async function handleSqsStandardMessage(msg: SQSMessage): Promise<void> {
// msg.MessageId e univoco per invio, ma se SQS rimanda il messaggio
// il MessageId rimane lo stesso. Usalo come idempotency key.
const idempotencyKey = msg.MessageId;
await consumer.processMessage(idempotencyKey, JSON.parse(msg.Body), handler);
}
İdempotans Testi
İdempotent bir tüketici açıkça test edilmelidir: test etmek yeterli değildir normal durum. Aynı mesaj geldiğinde ne olacağını test etmeniz gerekiyor birkaç paralel tüketiciyle iki, on kez.
// Test suite per consumer idempotente
describe('IdempotentPaymentConsumer', () => {
let consumer: IdempotentConsumer;
let db: Pool;
beforeEach(async () => {
db = await createTestDatabase();
consumer = new IdempotentConsumer(db, 'payment-test');
await db.query('DELETE FROM processed_messages');
});
it('should process message exactly once on first delivery', async () => {
const messageId = 'msg-001';
const payload = { orderId: 'ord-001', amount: 100 };
let callCount = 0;
const handler = async () => { callCount++; };
const result = await consumer.processMessage(messageId, payload, handler);
expect(result.processed).toBe(true);
expect(result.skipped).toBe(false);
expect(callCount).toBe(1);
});
it('should skip duplicate message silently', async () => {
const messageId = 'msg-001';
const payload = { orderId: 'ord-001', amount: 100 };
let callCount = 0;
const handler = async () => { callCount++; };
// Prima consegna
await consumer.processMessage(messageId, payload, handler);
// Seconda consegna (duplicato)
const result = await consumer.processMessage(messageId, payload, handler);
expect(result.processed).toBe(false);
expect(result.skipped).toBe(true);
expect(callCount).toBe(1); // Handler chiamato solo una volta
});
it('should handle concurrent duplicate messages correctly', async () => {
const messageId = 'msg-concurrent';
const payload = { orderId: 'ord-002', amount: 200 };
let callCount = 0;
const handler = async () => {
callCount++;
// Simula elaborazione lenta per forzare concorrenza
await new Promise((resolve) => setTimeout(resolve, 100));
};
// Simula 5 consumer che ricevono lo stesso messaggio in parallelo
const results = await Promise.allSettled([
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
consumer.processMessage(messageId, payload, handler),
]);
const processed = results.filter(
(r) => r.status === 'fulfilled' && r.value.processed
).length;
// Solo uno deve essere processato, gli altri skippati
expect(processed).toBe(1);
expect(callCount).toBe(1);
});
});
Anti-Desen: Yalnızca Bellekte Idempotence
İşlenen mesajları izlemek için bellek içi Kümeyi veya Haritayı kullanmayın. İşlem yeniden başlatılırsa hafıza ve tüm mesajlar kaybolur daha önce işlenmiş olan tekrar işlenecektir. Kimliksizlik deposu kalıcı olmalıdır (PostgreSQL, AOF ile Redis, DynamoDB).
Sonuçlar ve Sonraki Adımlar
Tüketici bağımsızlığı bir sistemde isteğe bağlı bir optimizasyon değildir olay odaklı üretim sınıfı: temel bir gerekliliktir. Arasındaki seçim PostgreSQL tabanlı veri tekilleştirme, Redis ve Gelen Kutusu Modeli seviyeye bağlıdır gerekli dayanıklılık ve sistem verimi. Kritik operasyonlar için (ödemeler, geri alınamaz durum güncellemeleri), Gelen Kutusu Modeli şunları sunar: tam olarak bir kerelik anlambilimin maksimum garantisi.







