Redis'te İki Mesajlaşma Paradigması

Redis tamamen farklı iki mesajlaşma mekanizması sunar. Pub/Sub gerçek zamanlı bir yayın sistemidir: mesajlar herkese iletilir bağlı aboneler o anda ve sonra unutuldu. Bir abone bunu yapmazsa bağlandığında mesaj sonsuza kadar kaybolur.

Redis Akışları (Redis 5.0'da tanıtılmıştır) salt eklemeli bir günlüktür Apache Kafka akış konseptinden ilham alan kalıcı. Mesajlar korunuyor açıkça silinmediği sürece tekrar okunabilir kez ve teşekkürlerle dağıtılmış işleme için Tüketici Gruplarını destekleyin.

Ne Öğreneceksiniz

  • Pub/Sub: Küresel desenle ABONE OLUN, YAYINLAYIN, PSABONE OLUN
  • Pub/Sub ve Streams ne zaman kullanılmalı: temel farklar
  • Akışlar: Yalnızca ekleme günlükleri için XADD, XREAD, XRANGE
  • Tüketici Grupları: XGROUP CREATE, XREADGROUP, XACK
  • Bekleyen mesajların en az bir kez teslim edilmesi ve yönetimi
  • Desen: Streams ile olay kaynağı bulma ve denetim günlüğü

Pub/Sub: Gerçek Zamanlı Yayın

Pub/Sub, bazı bilgilerin kaybolduğu gerçek zamanlı bildirimler için idealdir. mesaj kabul edilebilir: canlı fiyat güncellemeleri, anlık bildirimler, sohbet odası, dağıtılmış önbellek geçersiz kılma.

# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"

# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3

# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:*    # Tutti gli utenti
PSUBSCRIBE events:order:*          # Tutti gli ordini

# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1  -- numero di subscriber che hanno ricevuto il messaggio

PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1

# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0  -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# --- SUBSCRIBER ---
def message_handler(message):
    if message['type'] == 'message':
        channel = message['channel']
        data = json.loads(message['data'])
        print(f"[{channel}] Received: {data}")

def start_subscriber(user_id: int):
    """Subscriber in background thread."""
    sub = r.pubsub()
    sub.subscribe(**{
        f'notifications:user:{user_id}': message_handler,
        'notifications:broadcast': message_handler,
    })
    # Pattern subscribe
    sub.psubscribe(**{'events:*': message_handler})

    # Blocca in ascolto (in un thread separato)
    thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
    return thread, sub

# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
    """Pubblica una notifica a un utente specifico."""
    payload = json.dumps({'type': notification_type, **data})
    subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
    print(f"Delivered to {subscribers_count} subscribers")
    return subscribers_count

# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)

# Pubblica messaggi
import time
time.sleep(0.1)  # Aspetta che subscriber sia pronto

publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})

# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()

Redis Akışları: Kalıcı ve Yalnızca Ekleme Günlüğü

Streams tamamen farklı bir model kullanıyor: her mesajın benzersiz bir kimliği var (zaman damgası dizisi), günlükte kalıcıdır ve yeniden okunabilir herhangi bir zamanda. Olay kaynağı bulma, denetim günlükleri, görev kuyrukları ve mesaj kaybının kabul edilemeyeceği herhangi bir senaryo.

# Redis Streams: XADD e XREAD

# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0"  <-- ID generato automaticamente

XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"

# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97

# XLEN: numero di messaggi nello stream
XLEN orders:stream    # 3

# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
#    2) 1) "user_id"
#       2) "1001"
#       3) "product_id"
#       4) "555"
#       5) "quantity"
#       6) "2"
#       7) "total"
#       8) "89.99"
# ...

# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10

# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5

# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0   # Da ID 0, leggi al massimo 10

# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi

Tüketici Grupları: Bildirimli Dağıtılmış İşleme

Tüketici Grupları Redis Streams'in en güçlü özelliğidir. İzin veriyorlar birden fazla tüketicinin aynı akıştan gelen mesajları dağıtılmış bir şekilde işlemesi, Her mesajın gruptaki tam olarak bir tüketici tarafından işleneceği garantisiyle, ve onaylanmamış mesajların otomatik olarak yeniden teslimi ile.

# Consumer Groups: setup e utilizzo

# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"

# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0

# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
#    2) 1) 1) "1710000001234-0"
#          2) 1) "user_id"
#             2) "1001"
#             ...

# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1  -- 1 messaggio acknowledged

# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count

# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi

# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0

# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000    # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000  # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000  # 30 secondi prima di ri-assegnare

class StreamConsumer:
    def __init__(self, consumer_name: str, handler: Callable):
        self.consumer_name = consumer_name
        self.handler = handler
        self._ensure_group()

    def _ensure_group(self):
        """Crea il consumer group se non esiste."""
        try:
            r.xgroup_create(STREAM_KEY, GROUP_NAME, id=', mkstream=True)
            print(f"Created consumer group: {GROUP_NAME}")
        except redis.exceptions.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise
            # Gruppo gia' esiste: ok

    def process_messages(self, batch_size: int = 10) -> int:
        """Processa un batch di messaggi. Ritorna il numero processato."""
        processed = 0

        # Prima: controlla e riassegna messaggi pending di altri consumer (crashati)
        self._reclaim_pending()

        # Leggi nuovi messaggi non assegnati
        results = r.xreadgroup(
            groupname=GROUP_NAME,
            consumername=self.consumer_name,
            streams={STREAM_KEY: '>'},
            count=batch_size,
            block=1000,  # Blocca 1 secondo se non ci sono messaggi
        )

        if not results:
            return 0

        for stream_name, messages in results:
            for message_id, fields in messages:
                try:
                    self.handler(message_id, fields)
                    # Acknowledge dopo processing riuscito
                    r.xack(STREAM_KEY, GROUP_NAME, message_id)
                    processed += 1
                except Exception as e:
                    print(f"Error processing {message_id}: {e}")
                    # Non acknowledge: il messaggio rimane pending
                    # Verra riassegnato dopo PENDING_TIMEOUT_MS

        return processed

    def _reclaim_pending(self):
        """Riassegna a se' stesso messaggi idle di altri consumer."""
        pending = r.xpending_range(
            STREAM_KEY, GROUP_NAME,
            min='-', max='+', count=10,
        )
        for p in pending:
            if p['time_since_delivered'] > PENDING_TIMEOUT_MS:
                if p['name'] != self.consumer_name:
                    # Riassegna il messaggio a questo consumer
                    r.xclaim(
                        STREAM_KEY, GROUP_NAME, self.consumer_name,
                        PENDING_TIMEOUT_MS, [p['message_id']],
                    )
                    print(f"Reclaimed {p['message_id']} from {p['name']}")

# --- Producer ---
class OrderEventProducer:
    def publish_order(self, order: dict) -> str:
        """Pubblica un evento ordine nello stream."""
        message_id = r.xadd(STREAM_KEY, order)
        return message_id

# --- Utilizzo ---
def handle_order(message_id: str, fields: dict):
    print(f"Processing order {message_id}: {fields}")
    # Logica di business (es. email conferma, aggiornamento inventario)
    time.sleep(0.01)  # Simula lavoro

producer = OrderEventProducer()
consumer = StreamConsumer('worker-1', handle_order)

# Pubblica 5 ordini
for i in range(5):
    msg_id = producer.publish_order({
        'user_id': str(1000 + i),
        'total': str(50.0 + i * 10),
        'status': 'pending',
    })
    print(f"Published: {msg_id}")

# Processa messaggi in loop
while True:
    count = consumer.process_messages(batch_size=5)
    if count == 0:
        break  # Nessun messaggio rimasto

Karşılaştırma: Pub/Sub ve Streams Ne Zaman Kullanılmalı?

Pub/Sub: Ne Zamanı Seçin

  • Bazı kaçırılan mesajların kabul edilebilir olduğu gerçek zamanlı bildirimler
  • Dağıtılmış önbellek geçersiz kılma (bir düğüm almazsa sonraki istek kendini geçersiz kılar)
  • Sohbet veya canlı varlık (çevrimdışıysanız ne olduğu umrunda değil)
  • Gerçek zamanlı finansal fiyat/kod güncellemeleri
  • Ultra düşük gecikme süresine ihtiyacınız var ve kaybı tolere edebilirsiniz

Akışlar: Ne Zamanı Seçin

  • Her mesaj en az bir kez işlenmelidir (siparişler, ödemeler)
  • Tekrar oynatmanız gerekiyor (hata ayıklamak veya yeniden oluşturmak için geçmiş olayları yeniden okumak)
  • Birden fazla çalışanın paralel olarak dağıtılmış işlemesi (tüketici grupları)
  • Denetim günlüğü ve olay kaynağı (olay geçmişi kaybolmamalıdır)
  • Çevrimdışı olabilen ve kaybolan mesajları kurtarması gereken tüketiciler
# Riepilogo comandi chiave

# PUB/SUB
SUBSCRIBE channel             # Iscriviti a un canale
PSUBSCRIBE pattern:*          # Iscriviti con pattern glob
PUBLISH channel message       # Pubblica (fire-and-forget)
UNSUBSCRIBE channel           # Disiscrivi

# STREAMS - Producer
XADD stream * field value     # Aggiunge messaggio con ID auto
XLEN stream                   # Lunghezza stream
XTRIM stream MAXLEN ~ 10000   # Tronca stream

# STREAMS - Consumer semplice
XREAD COUNT 10 STREAMS stream 0         # Leggi da inizio
XREAD BLOCK 5000 STREAMS stream $       # Blocca aspettando nuovi

# STREAMS - Consumer Groups
XGROUP CREATE stream group $ MKSTREAM    # Crea gruppo
XREADGROUP GROUP group consumer STREAMS stream >  # Leggi non assegnati
XACK stream group message-id             # Acknowledge
XPENDING stream group - + 10            # Lista pending
XCLAIM stream group consumer 30000 id   # Riassegna dopo timeout

Sonuçlar

Pub/Sub ve Streams arasındaki seçim tek bir soruya dayanıyor: Bunu yapmaya gücünüz yetiyor mu? mesajları özledin mi? Bu durumda Pub/Sub gecikme süresi sunar mümkün olduğu kadar düşük. Aksi takdirde Streams with Consumer Groups garantiler sunar Redis'in operasyonel basitliği ile sağlam teslimat. Sonraki makale karmaşık atomik işlemler için Lua Komut Dosyasını araştırıyor.

Redis Serisinde Gelecek Makaleler

  • Madde 3: Lua Scripting — Atomik İşlemler, EVAL ve Redis İşlevleri API'si
  • Madde 4: Hız Sınırlama - Jeton Kovası, Kayar Pencere, Sabit Sayaç
  • Madde 5: Oturum Yönetimi ve Önbellek Modelleri - Önbellek Kenarına Yazma