Pub/Sub ve Redis Yayınları: Temel Farklılıklar, Kullanım Örnekleri ve Tüketici Grupları
Redis Pub/Sub, at ve unut; Streams kalıcı bir günlüktür tüketici grupları — farkı anlamak mimariyi değiştirir. Yapılandır Dağıtılmış işleme için Tüketici Grupları ve hiçbir mesajın kaybolmamasını sağlar.
Redis'te İki Mesajlaşma Paradigması
Redis tamamen farklı iki mesajlaşma mekanizması sunar. Pub/Sub gerçek zamanlı bir yayın sistemidir: mesajlar herkese iletilir bağlı aboneler o anda ve sonra unutuldu. Bir abone bunu yapmazsa bağlandığında mesaj sonsuza kadar kaybolur.
Redis Akışları (Redis 5.0'da tanıtılmıştır) salt eklemeli bir günlüktür Apache Kafka akış konseptinden ilham alan kalıcı. Mesajlar korunuyor açıkça silinmediği sürece tekrar okunabilir kez ve teşekkürlerle dağıtılmış işleme için Tüketici Gruplarını destekleyin.
Ne Öğreneceksiniz
- Pub/Sub: Küresel desenle ABONE OLUN, YAYINLAYIN, PSABONE OLUN
- Pub/Sub ve Streams ne zaman kullanılmalı: temel farklar
- Akışlar: Yalnızca ekleme günlükleri için XADD, XREAD, XRANGE
- Tüketici Grupları: XGROUP CREATE, XREADGROUP, XACK
- Bekleyen mesajların en az bir kez teslim edilmesi ve yönetimi
- Desen: Streams ile olay kaynağı bulma ve denetim günlüğü
Pub/Sub: Gerçek Zamanlı Yayın
Pub/Sub, bazı bilgilerin kaybolduğu gerçek zamanlı bildirimler için idealdir. mesaj kabul edilebilir: canlı fiyat güncellemeleri, anlık bildirimler, sohbet odası, dağıtılmış önbellek geçersiz kılma.
# Terminal 1: Subscriber
SUBSCRIBE notifications:user:1001
# Attende messaggi sul canale...
# Quando arriva un messaggio:
# 1) "message"
# 2) "notifications:user:1001"
# 3) "{"type":"payment","amount":100}"
# Puoi iscriverti a piu canali
SUBSCRIBE channel:1 channel:2 channel:3
# PSUBSCRIBE: subscribe con pattern glob
PSUBSCRIBE notifications:user:* # Tutti gli utenti
PSUBSCRIBE events:order:* # Tutti gli ordini
# Terminal 2: Publisher
PUBLISH notifications:user:1001 '{"type":"payment","amount":100}'
# (integer) 1 -- numero di subscriber che hanno ricevuto il messaggio
PUBLISH notifications:user:1001 '{"type":"login","ip":"192.168.1.1"}'
# (integer) 1
# Se nessun subscriber e connesso:
PUBLISH notifications:user:9999 '{"type":"alert"}'
# (integer) 0 -- messaggio PERSO, nessuno lo ricevera mai
# Python: Pub/Sub con redis-py
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# --- SUBSCRIBER ---
def message_handler(message):
if message['type'] == 'message':
channel = message['channel']
data = json.loads(message['data'])
print(f"[{channel}] Received: {data}")
def start_subscriber(user_id: int):
"""Subscriber in background thread."""
sub = r.pubsub()
sub.subscribe(**{
f'notifications:user:{user_id}': message_handler,
'notifications:broadcast': message_handler,
})
# Pattern subscribe
sub.psubscribe(**{'events:*': message_handler})
# Blocca in ascolto (in un thread separato)
thread = sub.run_in_thread(sleep_time=0.01, daemon=True)
return thread, sub
# --- PUBLISHER ---
def publish_notification(user_id: int, notification_type: str, data: dict):
"""Pubblica una notifica a un utente specifico."""
payload = json.dumps({'type': notification_type, **data})
subscribers_count = r.publish(f'notifications:user:{user_id}', payload)
print(f"Delivered to {subscribers_count} subscribers")
return subscribers_count
# Avvia subscriber
thread, pubsub = start_subscriber(user_id=1001)
# Pubblica messaggi
import time
time.sleep(0.1) # Aspetta che subscriber sia pronto
publish_notification(1001, 'payment', {'amount': 100, 'currency': 'EUR'})
publish_notification(1001, 'login', {'ip': '192.168.1.1', 'city': 'Milano'})
# IMPORTANTE: Pub/Sub non e' persistente
# Se il subscriber si disconnette, perde tutti i messaggi inviati nel frattempo
time.sleep(0.5)
pubsub.unsubscribe()
thread.stop()
Redis Akışları: Kalıcı ve Yalnızca Ekleme Günlüğü
Streams tamamen farklı bir model kullanıyor: her mesajın benzersiz bir kimliği var (zaman damgası dizisi), günlükte kalıcıdır ve yeniden okunabilir herhangi bir zamanda. Olay kaynağı bulma, denetim günlükleri, görev kuyrukları ve mesaj kaybının kabul edilemeyeceği herhangi bir senaryo.
# Redis Streams: XADD e XREAD
# XADD: aggiunge un messaggio allo stream
# ID auto-generato: {milliseconds}-{sequence}
XADD orders:stream * user_id 1001 product_id 555 quantity 2 total 89.99
# "1710000001234-0" <-- ID generato automaticamente
XADD orders:stream * user_id 1002 product_id 123 quantity 1 total 29.99
# "1710000002345-0"
# ID esplicito (raramente necessario)
XADD orders:stream 1710000003000-0 user_id 1003 product_id 789 quantity 3 total 149.97
# XLEN: numero di messaggi nello stream
XLEN orders:stream # 3
# XRANGE: leggi messaggi da un range di ID (o -)
XRANGE orders:stream - +
# 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# 3) "product_id"
# 4) "555"
# 5) "quantity"
# 6) "2"
# 7) "total"
# 8) "89.99"
# ...
# XRANGE con COUNT per paginazione
XRANGE orders:stream - + COUNT 10
# XREVRANGE: ordine inverso (dal piu recente)
XREVRANGE orders:stream + - COUNT 5
# XREAD: leggi messaggi dall'ultimo ID letto (blocking possible)
XREAD COUNT 10 STREAMS orders:stream 0 # Da ID 0, leggi al massimo 10
# Leggi solo messaggi nuovi ($ = solo futuri)
XREAD COUNT 10 BLOCK 5000 STREAMS orders:stream $
# Blocca per max 5 secondi aspettando nuovi messaggi
Tüketici Grupları: Bildirimli Dağıtılmış İşleme
Tüketici Grupları Redis Streams'in en güçlü özelliğidir. İzin veriyorlar birden fazla tüketicinin aynı akıştan gelen mesajları dağıtılmış bir şekilde işlemesi, Her mesajın gruptaki tam olarak bir tüketici tarafından işleneceği garantisiyle, ve onaylanmamış mesajların otomatik olarak yeniden teslimi ile.
# Consumer Groups: setup e utilizzo
# 1. Crea il Consumer Group
# $ = inizia dai messaggi futuri, 0 = include messaggi esistenti
XGROUP CREATE orders:stream order-processors $ MKSTREAM
# "OK"
# Crea gruppo che inizia dall'inizio dello stream
XGROUP CREATE orders:stream order-processors-v2 0
# 2. Consumer legge messaggi dal gruppo
# > = leggi il prossimo messaggio non assegnato a nessun consumer
XREADGROUP GROUP order-processors worker-1 COUNT 10 STREAMS orders:stream >
# 1) 1) "orders:stream"
# 2) 1) 1) "1710000001234-0"
# 2) 1) "user_id"
# 2) "1001"
# ...
# 3. Acknowledge (conferma processing completato)
XACK orders:stream order-processors 1710000001234-0
# (integer) 1 -- 1 messaggio acknowledged
# 4. XPENDING: vedi messaggi pendenti (non ancora acked)
XPENDING orders:stream order-processors - + 10
# Mostra: ID, consumer assegnato, tempo dall'ultimo delivery, delivery count
# 5. XCLAIM: riassegna messaggi idle a un altro consumer (dopo timeout)
XCLAIM orders:stream order-processors worker-2 60000 1710000001234-0
# Riassegna il messaggio se e' idle da piu di 60 secondi
# 6. XDEL: elimina messaggi specifici
XDEL orders:stream 1710000001234-0
# 7. XTRIM: mantieni solo gli ultimi N messaggi
XTRIM orders:stream MAXLEN 10000 # Mantieni ultimi 10.000 messaggi
XTRIM orders:stream MAXLEN ~ 10000 # Approssimativo (piu efficiente)
# Python: Consumer Group con at-least-once delivery
import redis
import json
import time
from typing import Callable
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM_KEY = 'orders:stream'
GROUP_NAME = 'order-processors'
PENDING_TIMEOUT_MS = 30000 # 30 secondi prima di ri-assegnare
class StreamConsumer:
def __init__(self, consumer_name: str, handler: Callable):
self.consumer_name = consumer_name
self.handler = handler
self._ensure_group()
def _ensure_group(self):
"""Crea il consumer group se non esiste."""
try:
r.xgroup_create(STREAM_KEY, GROUP_NAME, id='






