Fluxuri Pub/Sub vs Redis: diferențe fundamentale, cazuri de utilizare și grupuri de consumatori
Redis Pub/Sub este foc și uita; Streams este un jurnal persistent cu grupuri de consumatori — înțelegerea diferenței schimbă arhitectura. Configurați Grupuri de consumatori pentru procesarea distribuită și asigurați-vă că niciun mesaj nu este pierdut.
Două paradigme de mesagerie în Redis
Redis oferă două mecanisme de mesagerie radical diferite. Pub/Sub este un sistem de difuzare în timp real: mesajele sunt livrate tuturor abonații conectați în acel moment si apoi uitat. Dacă un abonat nu o face este conectat, mesajul se pierde pentru totdeauna.
Redis Streams (introdus în Redis 5.0) este un jurnal numai pentru adăugare persistent inspirat de conceptul de flux Apache Kafka. Mesajele sunt păstrate atâta timp cât nu sunt șterse explicit, pot fi citite din nou ori și sprijină grupurile de consumatori pentru procesarea distribuită cu mulțumiri.
Ce vei învăța
- Pub/Sub: SUBSCRIBE, PUBLISH, PSUBSCRIBE cu model globbing
- Când să folosiți Pub/Sub vs Streams: diferențele cheie
- Fluxuri: XADD, XREAD, XRANGE pentru jurnalele de atașare
- Grupuri de consumatori: XGROUP CREATE, XREADGROUP, XACK
- Livrarea și gestionarea cel puțin o dată a mesajelor în așteptare
- Model: aprovizionare cu evenimente și jurnal de audit cu Streams
Pub/Sub: Transmisie în timp real
Pub/Sub este ideal pentru notificări în timp real în cazul în care pierderea unora mesajul este acceptabil: actualizări live de preț, notificări push, camera de chat, invalidarea cache-ului distribuit.
# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"
# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3
# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:* # Tutti gli utenti
PSUBSCRIBE events:order:* # Tutti gli ordini
# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1 -- numero di subscriber che hanno ricevuto il messaggio
PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1
# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0 -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# --- SUBSCRIBER ---
def message_handler(message):
if message['type'] == 'message':
channel = message['channel']
data = json.loads(message['data'])
print(f"[{channel}] Received: {data}")
def start_subscriber(user_id: int):
"""Subscriber in background thread."""
sub = r.pubsub()
sub.subscribe(**{
f'notifications:user:{user_id}': message_handler,
'notifications:broadcast': message_handler,
})
# Pattern subscribe
sub.psubscribe(**{'events:*': message_handler})
# Blocca in ascolto (in un thread separato)
thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
return thread, sub
# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
"""Pubblica una notifica a un utente specifico."""
payload = json.dumps({'type': notification_type, **data})
subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
print(f"Delivered to {subscribers_count} subscribers")
return subscribers_count
# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)
# Pubblica messaggi
import time
time.sleep(0.1) # Aspetta che subscriber sia pronto
publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})
# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()
Fluxuri Redis: jurnal persistent și numai pentru adăugare
Streams folosește un model complet diferit: fiecare mesaj are un ID unic (timestamp-secvență), este persistat în jurnal și poate fi recitit în orice moment. Este alegerea potrivită pentru aprovizionarea cu evenimente, jurnalele de audit, cozi de sarcini și orice scenariu în care pierderea mesajelor nu este acceptabilă.
# Redis Streams: XADD e XREAD
# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0" <-- ID generato automaticamente
XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"
# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97
# XLEN: numero di messaggi nello stream
XLEN orders:stream # 3
# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# 3) "product_id"
# 4) "555"
# 5) "quantity"
# 6) "2"
# 7) "total"
# 8) "89.99"
# ...
# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10
# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5
# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0 # Da ID 0, leggi al massimo 10
# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi
Grupuri de consumatori: procesare distribuită cu confirmare
Grupurile de consumatori sunt cea mai puternică caracteristică a Redis Streams. Ei permit mai mulți consumatori pentru a procesa mesaje din același flux într-o manieră distribuită, cu garanția că fiecare mesaj este procesat de exact un consumator din grup, și cu re-livrarea automată a mesajelor neconfirmate.
# Consumer Groups: setup e utilizzo
# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"
# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0
# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
# 2) 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# ...
# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1 -- 1 messaggio acknowledged
# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count
# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi
# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0
# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000 # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000 # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000 # 30 secondi prima di ri-assegnare
class StreamConsumer:
def __init__(self, consumer_name: str, handler: Callable):
self.consumer_name = consumer_name
self.handler = handler
self._ensure_group()
def _ensure_group(self):
"""Crea il consumer group se non esiste."""
try:
r.xgroup_create(STREAM_KEY, GROUP_NAME, id='






