Două paradigme de mesagerie în Redis

Redis oferă două mecanisme de mesagerie radical diferite. Pub/Sub este un sistem de difuzare în timp real: mesajele sunt livrate tuturor abonații conectați în acel moment si apoi uitat. Dacă un abonat nu o face este conectat, mesajul se pierde pentru totdeauna.

Redis Streams (introdus în Redis 5.0) este un jurnal numai pentru adăugare persistent inspirat de conceptul de flux Apache Kafka. Mesajele sunt păstrate atâta timp cât nu sunt șterse explicit, pot fi citite din nou ori și sprijină grupurile de consumatori pentru procesarea distribuită cu mulțumiri.

Ce vei învăța

  • Pub/Sub: SUBSCRIBE, PUBLISH, PSUBSCRIBE cu model globbing
  • Când să folosiți Pub/Sub vs Streams: diferențele cheie
  • Fluxuri: XADD, XREAD, XRANGE pentru jurnalele de atașare
  • Grupuri de consumatori: XGROUP CREATE, XREADGROUP, XACK
  • Livrarea și gestionarea cel puțin o dată a mesajelor în așteptare
  • Model: aprovizionare cu evenimente și jurnal de audit cu Streams

Pub/Sub: Transmisie în timp real

Pub/Sub este ideal pentru notificări în timp real în cazul în care pierderea unora mesajul este acceptabil: actualizări live de preț, notificări push, camera de chat, invalidarea cache-ului distribuit.

# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"

# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3

# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:*    # Tutti gli utenti
PSUBSCRIBE events:order:*          # Tutti gli ordini

# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1  -- numero di subscriber che hanno ricevuto il messaggio

PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1

# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0  -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# --- SUBSCRIBER ---
def message_handler(message):
    if message['type'] == 'message':
        channel = message['channel']
        data = json.loads(message['data'])
        print(f"[{channel}] Received: {data}")

def start_subscriber(user_id: int):
    """Subscriber in background thread."""
    sub = r.pubsub()
    sub.subscribe(**{
        f'notifications:user:{user_id}': message_handler,
        'notifications:broadcast': message_handler,
    })
    # Pattern subscribe
    sub.psubscribe(**{'events:*': message_handler})

    # Blocca in ascolto (in un thread separato)
    thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
    return thread, sub

# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
    """Pubblica una notifica a un utente specifico."""
    payload = json.dumps({'type': notification_type, **data})
    subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
    print(f"Delivered to {subscribers_count} subscribers")
    return subscribers_count

# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)

# Pubblica messaggi
import time
time.sleep(0.1)  # Aspetta che subscriber sia pronto

publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})

# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()

Fluxuri Redis: jurnal persistent și numai pentru adăugare

Streams folosește un model complet diferit: fiecare mesaj are un ID unic (timestamp-secvență), este persistat în jurnal și poate fi recitit în orice moment. Este alegerea potrivită pentru aprovizionarea cu evenimente, jurnalele de audit, cozi de sarcini și orice scenariu în care pierderea mesajelor nu este acceptabilă.

# Redis Streams: XADD e XREAD

# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0"  <-- ID generato automaticamente

XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"

# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97

# XLEN: numero di messaggi nello stream
XLEN orders:stream    # 3

# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
#    2) 1) "user_id"
#       2) "1001"
#       3) "product_id"
#       4) "555"
#       5) "quantity"
#       6) "2"
#       7) "total"
#       8) "89.99"
# ...

# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10

# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5

# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0   # Da ID 0, leggi al massimo 10

# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi

Grupuri de consumatori: procesare distribuită cu confirmare

Grupurile de consumatori sunt cea mai puternică caracteristică a Redis Streams. Ei permit mai mulți consumatori pentru a procesa mesaje din același flux într-o manieră distribuită, cu garanția că fiecare mesaj este procesat de exact un consumator din grup, și cu re-livrarea automată a mesajelor neconfirmate.

# Consumer Groups: setup e utilizzo

# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"

# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0

# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
#    2) 1) 1) "1710000001234-0"
#          2) 1) "user_id"
#             2) "1001"
#             ...

# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1  -- 1 messaggio acknowledged

# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count

# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi

# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0

# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000    # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000  # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000  # 30 secondi prima di ri-assegnare

class StreamConsumer:
    def __init__(self, consumer_name: str, handler: Callable):
        self.consumer_name = consumer_name
        self.handler = handler
        self._ensure_group()

    def _ensure_group(self):
        """Crea il consumer group se non esiste."""
        try:
            r.xgroup_create(STREAM_KEY, GROUP_NAME, id=', mkstream=True)
            print(f"Created consumer group: {GROUP_NAME}")
        except redis.exceptions.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise
            # Gruppo gia' esiste: ok

    def process_messages(self, batch_size: int = 10) -> int:
        """Processa un batch di messaggi. Ritorna il numero processato."""
        processed = 0

        # Prima: controlla e riassegna messaggi pending di altri consumer (crashati)
        self._reclaim_pending()

        # Leggi nuovi messaggi non assegnati
        results = r.xreadgroup(
            groupname=GROUP_NAME,
            consumername=self.consumer_name,
            streams={STREAM_KEY: '>'},
            count=batch_size,
            block=1000,  # Blocca 1 secondo se non ci sono messaggi
        )

        if not results:
            return 0

        for stream_name, messages in results:
            for message_id, fields in messages:
                try:
                    self.handler(message_id, fields)
                    # Acknowledge dopo processing riuscito
                    r.xack(STREAM_KEY, GROUP_NAME, message_id)
                    processed += 1
                except Exception as e:
                    print(f"Error processing {message_id}: {e}")
                    # Non acknowledge: il messaggio rimane pending
                    # Verra riassegnato dopo PENDING_TIMEOUT_MS

        return processed

    def _reclaim_pending(self):
        """Riassegna a se' stesso messaggi idle di altri consumer."""
        pending = r.xpending_range(
            STREAM_KEY, GROUP_NAME,
            min='-', max='+', count=10,
        )
        for p in pending:
            if p['time_since_delivered'] > PENDING_TIMEOUT_MS:
                if p['name'] != self.consumer_name:
                    # Riassegna il messaggio a questo consumer
                    r.xclaim(
                        STREAM_KEY, GROUP_NAME, self.consumer_name,
                        PENDING_TIMEOUT_MS, [p['message_id']],
                    )
                    print(f"Reclaimed {p['message_id']} from {p['name']}")

# --- Producer ---
class OrderEventProducer:
    def publish_order(self, order: dict) -> str:
        """Pubblica un evento ordine nello stream."""
        message_id = r.xadd(STREAM_KEY, order)
        return message_id

# --- Utilizzo ---
def handle_order(message_id: str, fields: dict):
    print(f"Processing order {message_id}: {fields}")
    # Logica di business (es. email conferma, aggiornamento inventario)
    time.sleep(0.01)  # Simula lavoro

producer = OrderEventProducer()
consumer = StreamConsumer('worker-1', handle_order)

# Pubblica 5 ordini
for i in range(5):
    msg_id = producer.publish_order({
        'user_id': str(1000 + i),
        'total': str(50.0 + i * 10),
        'status': 'pending',
    })
    print(f"Published: {msg_id}")

# Processa messaggi in loop
while True:
    count = consumer.process_messages(batch_size=5)
    if count == 0:
        break  # Nessun messaggio rimasto

Comparație: când să folosiți Pub/Sub vs Streams

Pub/Sub: Alegeți Când

  • Notificări în timp real în care unele mesaje pierdute sunt acceptabile
  • Invalidarea cache-ului distribuit (dacă un nod nu primește, următoarea solicitare se invalidează)
  • Chat sau prezență live (dacă ești offline, nu-ți pasă ce s-a întâmplat)
  • Actualizări în timp real ale prețurilor financiare/tickerelor
  • Aveți nevoie de o latență ultra-scăzută și puteți tolera pierderea

Fluxuri: alegeți Când

  • Fiecare mesaj trebuie procesat cel puțin o dată (comenzi, plăți)
  • Aveți nevoie de reluare (recitirea evenimentelor trecute pentru depanare sau reconstrucție)
  • Procesare distribuită cu mai mulți lucrători în paralel (grupuri de consumatori)
  • Jurnal de audit și aprovizionare cu evenimente (istoricul evenimentelor nu trebuie pierdut)
  • Consumatorii care pot merge offline și au nevoie să recupereze mesajele pierdute
# Riepilogo comandi chiave

# PUB/SUB
SUBSCRIBE channel             # Iscriviti a un canale
PSUBSCRIBE pattern:*          # Iscriviti con pattern glob
PUBLISH channel message       # Pubblica (fire-and-forget)
UNSUBSCRIBE channel           # Disiscrivi

# STREAMS - Producer
XADD stream * field value     # Aggiunge messaggio con ID auto
XLEN stream                   # Lunghezza stream
XTRIM stream MAXLEN ~ 10000   # Tronca stream

# STREAMS - Consumer semplice
XREAD COUNT 10 STREAMS stream 0         # Leggi da inizio
XREAD BLOCK 5000 STREAMS stream $       # Blocca aspettando nuovi

# STREAMS - Consumer Groups
XGROUP CREATE stream group $ MKSTREAM    # Crea gruppo
XREADGROUP GROUP group consumer STREAMS stream >  # Leggi non assegnati
XACK stream group message-id             # Acknowledge
XPENDING stream group - + 10            # Lista pending
XCLAIM stream group consumer 30000 id   # Riassegna dopo timeout

Concluzii

Alegerea între Pub/Sub și Streams se rezumă la o întrebare: vă puteți permite dor de mesaje? Dacă da, Pub/Sub oferă latență cât mai jos posibil. Dacă nu, Streams with Consumer Groups oferă garanții de livrare robustă cu simplitatea operațională a Redis. În continuare articolul explorează Lua Scripting pentru operații atomice complexe.

Articole viitoare din seria Redis

  • Articolul 3: Lua Scripting — Atomic Operations, EVAL și Redis Functions API
  • Articolul 4: Limitarea ratei — găleată de jetoane, fereastră glisantă, contor fix
  • Articolul 5: Gestionarea sesiunii și modele de cache — Cache-Aside, Write-Through