Twee berichtparadigma's in Redis

Redis biedt twee radicaal verschillende berichtmechanismen. Pub/Sub het is een realtime uitzendsysteem: berichten worden bij iedereen afgeleverd aangesloten abonnees op dat moment en daarna vergeten. Als een abonnee dat niet doet verbonden is, gaat het bericht voor altijd verloren.

Redis-streams (geïntroduceerd in Redis 5.0) is een logboek dat alleen kan worden toegevoegd persistent geïnspireerd door het Apache Kafka-stroomconcept. Berichten blijven bewaard zolang ze niet expliciet worden verwijderd, kunnen ze opnieuw worden gelezen keer en ondersteun consumentengroepen voor gedistribueerde verwerking met bevestigingen.

Wat je gaat leren

  • Pub/Sub: ABONNEREN, PUBLICEREN, PSUBSCRIBE met globbing-patroon
  • Wanneer Pub/Sub versus Streams gebruiken: de belangrijkste verschillen
  • Streams: XADD, XREAD, XRANGE voor alleen-toevoegen-logboeken
  • Consumentengroepen: XGROUP CREATE, XREADGROUP, XACK
  • Minimaal één keer bezorgen en beheren van openstaande berichten
  • Patroon: gebeurtenissourcing en auditlogboek met Streams

Pub/Sub: realtime uitzending

Pub/Sub is ideaal voor real-time meldingen waarbij er enkele verloren gaan bericht is acceptabel: live prijsupdates, pushmeldingen, chatroom, gedistribueerde cache-invalidatie.

# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"

# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3

# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:*    # Tutti gli utenti
PSUBSCRIBE events:order:*          # Tutti gli ordini

# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1  -- numero di subscriber che hanno ricevuto il messaggio

PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1

# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0  -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# --- SUBSCRIBER ---
def message_handler(message):
    if message['type'] == 'message':
        channel = message['channel']
        data = json.loads(message['data'])
        print(f"[{channel}] Received: {data}")

def start_subscriber(user_id: int):
    """Subscriber in background thread."""
    sub = r.pubsub()
    sub.subscribe(**{
        f'notifications:user:{user_id}': message_handler,
        'notifications:broadcast': message_handler,
    })
    # Pattern subscribe
    sub.psubscribe(**{'events:*': message_handler})

    # Blocca in ascolto (in un thread separato)
    thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
    return thread, sub

# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
    """Pubblica una notifica a un utente specifico."""
    payload = json.dumps({'type': notification_type, **data})
    subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
    print(f"Delivered to {subscribers_count} subscribers")
    return subscribers_count

# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)

# Pubblica messaggi
import time
time.sleep(0.1)  # Aspetta che subscriber sia pronto

publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})

# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()

Redis-streams: persistent en alleen toevoegend logboek

Streams hanteert een heel ander model: elk bericht heeft een uniek ID (tijdstempel-reeks), wordt bewaard in het logboek en kan opnieuw worden gelezen op elk moment. Het is de juiste keuze voor gebeurtenissourcing, auditlogboeken, taakwachtrijen en elk scenario waarin berichtverlies niet acceptabel is.

# Redis Streams: XADD e XREAD

# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0"  <-- ID generato automaticamente

XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"

# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97

# XLEN: numero di messaggi nello stream
XLEN orders:stream    # 3

# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
#    2) 1) "user_id"
#       2) "1001"
#       3) "product_id"
#       4) "555"
#       5) "quantity"
#       6) "2"
#       7) "total"
#       8) "89.99"
# ...

# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10

# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5

# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0   # Da ID 0, leggi al massimo 10

# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi

Consumentengroepen: gedistribueerde verwerking met bevestiging

Consumentengroepen zijn de krachtigste functie van Redis Streams. Zij staan toe meerdere consumenten om berichten uit dezelfde stroom gedistribueerd te verwerken, met de garantie dat elk bericht door precies één consument in de groep wordt verwerkt, en met automatische herbezorging van onbevestigde berichten.

# Consumer Groups: setup e utilizzo

# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"

# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0

# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
#    2) 1) 1) "1710000001234-0"
#          2) 1) "user_id"
#             2) "1001"
#             ...

# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1  -- 1 messaggio acknowledged

# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count

# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi

# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0

# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000    # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000  # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000  # 30 secondi prima di ri-assegnare

class StreamConsumer:
    def __init__(self, consumer_name: str, handler: Callable):
        self.consumer_name = consumer_name
        self.handler = handler
        self._ensure_group()

    def _ensure_group(self):
        """Crea il consumer group se non esiste."""
        try:
            r.xgroup_create(STREAM_KEY, GROUP_NAME, id=', mkstream=True)
            print(f"Created consumer group: {GROUP_NAME}")
        except redis.exceptions.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise
            # Gruppo gia' esiste: ok

    def process_messages(self, batch_size: int = 10) -> int:
        """Processa un batch di messaggi. Ritorna il numero processato."""
        processed = 0

        # Prima: controlla e riassegna messaggi pending di altri consumer (crashati)
        self._reclaim_pending()

        # Leggi nuovi messaggi non assegnati
        results = r.xreadgroup(
            groupname=GROUP_NAME,
            consumername=self.consumer_name,
            streams={STREAM_KEY: '>'},
            count=batch_size,
            block=1000,  # Blocca 1 secondo se non ci sono messaggi
        )

        if not results:
            return 0

        for stream_name, messages in results:
            for message_id, fields in messages:
                try:
                    self.handler(message_id, fields)
                    # Acknowledge dopo processing riuscito
                    r.xack(STREAM_KEY, GROUP_NAME, message_id)
                    processed += 1
                except Exception as e:
                    print(f"Error processing {message_id}: {e}")
                    # Non acknowledge: il messaggio rimane pending
                    # Verra riassegnato dopo PENDING_TIMEOUT_MS

        return processed

    def _reclaim_pending(self):
        """Riassegna a se' stesso messaggi idle di altri consumer."""
        pending = r.xpending_range(
            STREAM_KEY, GROUP_NAME,
            min='-', max='+', count=10,
        )
        for p in pending:
            if p['time_since_delivered'] > PENDING_TIMEOUT_MS:
                if p['name'] != self.consumer_name:
                    # Riassegna il messaggio a questo consumer
                    r.xclaim(
                        STREAM_KEY, GROUP_NAME, self.consumer_name,
                        PENDING_TIMEOUT_MS, [p['message_id']],
                    )
                    print(f"Reclaimed {p['message_id']} from {p['name']}")

# --- Producer ---
class OrderEventProducer:
    def publish_order(self, order: dict) -> str:
        """Pubblica un evento ordine nello stream."""
        message_id = r.xadd(STREAM_KEY, order)
        return message_id

# --- Utilizzo ---
def handle_order(message_id: str, fields: dict):
    print(f"Processing order {message_id}: {fields}")
    # Logica di business (es. email conferma, aggiornamento inventario)
    time.sleep(0.01)  # Simula lavoro

producer = OrderEventProducer()
consumer = StreamConsumer('worker-1', handle_order)

# Pubblica 5 ordini
for i in range(5):
    msg_id = producer.publish_order({
        'user_id': str(1000 + i),
        'total': str(50.0 + i * 10),
        'status': 'pending',
    })
    print(f"Published: {msg_id}")

# Processa messaggi in loop
while True:
    count = consumer.process_messages(batch_size=5)
    if count == 0:
        break  # Nessun messaggio rimasto

Vergelijking: wanneer moet u Pub/Sub versus streams gebruiken?

Pub/Sub: Kies Wanneer

  • Realtime meldingen waarbij sommige gemiste berichten acceptabel zijn
  • Gedistribueerde cache-invalidatie (als een knooppunt niet ontvangt, maakt het volgende verzoek zichzelf ongeldig)
  • Chat of live aanwezigheid (als je offline bent, maakt het je niet uit wat er is gebeurd)
  • Realtime financiële prijs-/tickerupdates
  • U hebt een ultralage latentie nodig en kunt verlies verdragen

Streams: kies Wanneer

  • Elk bericht moet minstens één keer verwerkt worden (bestellingen, betalingen)
  • Je hebt herhaling nodig (herlezen van gebeurtenissen uit het verleden voor foutopsporing of opnieuw opbouwen)
  • Gedistribueerde verwerking met meerdere werknemers parallel (consumentengroepen)
  • Auditlogboek en gebeurtenisbronnen (gebeurtenisgeschiedenis mag niet verloren gaan)
  • Consumenten die offline kunnen gaan en verloren berichten moeten herstellen
# Riepilogo comandi chiave

# PUB/SUB
SUBSCRIBE channel             # Iscriviti a un canale
PSUBSCRIBE pattern:*          # Iscriviti con pattern glob
PUBLISH channel message       # Pubblica (fire-and-forget)
UNSUBSCRIBE channel           # Disiscrivi

# STREAMS - Producer
XADD stream * field value     # Aggiunge messaggio con ID auto
XLEN stream                   # Lunghezza stream
XTRIM stream MAXLEN ~ 10000   # Tronca stream

# STREAMS - Consumer semplice
XREAD COUNT 10 STREAMS stream 0         # Leggi da inizio
XREAD BLOCK 5000 STREAMS stream $       # Blocca aspettando nuovi

# STREAMS - Consumer Groups
XGROUP CREATE stream group $ MKSTREAM    # Crea gruppo
XREADGROUP GROUP group consumer STREAMS stream >  # Leggi non assegnati
XACK stream group message-id             # Acknowledge
XPENDING stream group - + 10            # Lista pending
XCLAIM stream group consumer 30000 id   # Riassegna dopo timeout

Conclusies

De keuze tussen Pub/Sub en Streams komt neer op één vraag: kun je het je veroorloven berichten missen? Als dat zo is, biedt Pub/Sub latentie zo laag mogelijk. Zo niet, dan biedt Streams with Consumer Groups garanties van robuuste levering met de operationele eenvoud van Redis. Volgende artikel onderzoekt Lua Scripting voor complexe atomaire operaties.

Aankomende artikelen in de Redis-serie

  • Artikel 3: Lua-scripting - Atomic Operations, EVAL en Redis Functions API
  • Artikel 4: Tariefbeperking: tokenbucket, schuifvenster, vaste teller
  • Artikel 5: Sessiebeheer en cachepatronen: cache opzij, doorschrijven