Kafka în producție: dimensionare, reținere, replicare și recuperare în caz de dezastru
Aducerea lui Kafka în producție nu înseamnă doar lansarea a trei brokeri. Înseamnă mărimea corectă cluster-ul pe baza debitului așteptat, configurați reținerea și factorul de replicare pentru garanții de durabilitate solicitări, planificați întreținerea fără timpi de nefuncționare și pregătiți un plan de recuperare în caz de dezastru cu MirrorMaker 2. Acest ghid operațional colectează cele mai critice decizii pentru un cluster Kafka de întreprindere.
Dimensiunea clusterului: câți brokeri și ce dimensiune
Dimensionarea unui cluster Kafka începe cu două întrebări fundamentale: Care este debitul rata de scriere țintă (MB/s) și care este păstrarea necesară (cât timp trebuie să rămână disponibile datele)? Răspunsul la aceste două întrebări determină numărul de brokeri, dimensiunea discului și rețeaua necesară.
Formula numărului de broker
Fiecare broker Kafka poate gestiona în mod durabil cca 100-200 MB/s de debit scrieți pe hardware-ul de bază (disc SSD NVMe, rețea de 10 Gbps). Aceasta valoare depinde foarte mult din profilul de sarcină de lucru (dimensiunea mesajului, numărul de partiții, setarea acks).
# Formula per il sizing del cluster
#
# Throughput totale in scrittura: T_write = messaggi_al_secondo * dimensione_media_messaggio
# Throughput totale con replica: T_total = T_write * replication_factor
# Numero di broker: N_broker = ceil(T_total / throughput_per_broker)
#
# Esempio pratico:
# - 100.000 messaggi/s * 1 KB/messaggio = 100 MB/s throughput in scrittura
# - Replication factor 3: 100 * 3 = 300 MB/s throughput totale
# - Throughput per broker: 150 MB/s 300 / 150 = 2 broker
# - Aggiungi 30% di margine: ceil(2 * 1.3) = 3 broker
#
# Per la memoria RAM:
# - Kafka usa la page cache del SO per le letture recenti (hot data)
# - Regola empirica: 64-128 GB RAM per broker (o almeno 50% del dataset "caldo")
# - JVM heap: 6-8 GB (non di piu: il resto serve alla page cache)
#
# Per il disco:
# Disco_per_broker = (T_write * retention_giorni * 86400 * replication_factor) / N_broker
# Esempio: 100 MB/s, 7 giorni di retention, replication factor 3, 3 broker
# = (100 * 7 * 86400 * 3) / 3 = 181.440.000 MB ~= 173 TB totale / 3 broker = ~58 TB per broker
# Arrotonda a 80 TB con margine
Hardware recomandat pentru Kafka Broker în producție
- CPU: 16-32 de nuclee (Kafka nu este legat de CPU, dar compresia îl folosește)
- RAM: 64-128 GB (cache-ul paginii este esențial pentru performanță)
- Discotecă: NVMe SSD RAID 10 sau mai multe discuri separate (montate separat pentru I/O paralele)
- Net: 10 Gbps minim, 25 Gbps pentru clustere cu debit mare
- OS: Linux cu ext4 sau XFS, sistem de fișiere optimizat pentru anexare scrieri secvențiale
- JVM: Java 21 (LTS), G1GC, heap 6-8 GB, -XX:MaxGCPauseMillis=20
Configurație optimă pentru broker: server.properties
Configurațiile implicite Kafka sunt adesea sub-optimale pentru producție. Iată proprietățile
cel mai important pentru personalizare server.properties:
# server.properties - configurazione production-ready per Kafka 4.0
# ===== Rete e I/O =====
# Thread per le richieste di rete
num.network.threads=8
# Thread per le operazioni di I/O su disco
num.io.threads=16
# Buffer per richieste/risposte di rete
socket.send.buffer.bytes=1048576 # 1 MB
socket.receive.buffer.bytes=1048576 # 1 MB
socket.request.max.bytes=104857600 # 100 MB
# ===== Log e Disco =====
# Separare i log su piu dischi per I/O parallelo
log.dirs=/disk1/kafka-data,/disk2/kafka-data,/disk3/kafka-data
# Numero di thread per il log recovery all'avvio
num.recovery.threads.per.data.dir=4
# Flush su disco: NON abbassare questi valori, usa acks=all invece
log.flush.interval.messages=10000000
log.flush.interval.ms=1000
# ===== Retention Default =====
# Retention temporale (7 giorni)
log.retention.hours=168
# Retention per dimensione (-1 = illimitata per default)
log.retention.bytes=-1
# Dimensione massima del segmento log (1 GB)
log.segment.bytes=1073741824
# Intervallo di controllo per la retention
log.retention.check.interval.ms=300000
# ===== Replica =====
# Numero minimo di ISR per accettare scritture (con acks=all)
min.insync.replicas=2
# Default replication factor per topic auto-creati
default.replication.factor=3
# Replication factor per topic interni
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ===== Performance Replica =====
# Dimensione buffer per la replica
replica.fetch.max.bytes=10485760 # 10 MB
replica.socket.receive.buffer.bytes=10485760
# Timeout per il leader election
leader.imbalance.check.interval.seconds=300
# ===== Consumer Groups =====
# Timeout sessione consumer
group.initial.rebalance.delay.ms=3000
# ===== Auto Create Topics =====
# DISABILITARE in produzione per evitare topic creati per errore
auto.create.topics.enable=false
Număr de partiții: Câte de creat
Alegerea numărului de partiții este una dintre cele mai critice și mai puțin reversibile din Kafka:
pot crește pereții despărțitori (cu kafka-topics.sh --alter) dar
nu scade. Creșterea partițiilor are impact asupra sortării și reechilibrării.
# Regola pratica per il numero di partizioni:
# max(throughput_MB_s / 10, consumer_max_paralleli, producer_max_paralleli)
#
# Esempio:
# - Throughput target: 50 MB/s --> almeno 5 partizioni per throughput
# - Consumer parallelismo max: 12 istanze --> almeno 12 partizioni
# - Scelta: 12 partizioni
#
# Linee guida pratiche:
# - Topic ad alto volume, molti consumer: 12, 24, 48 partizioni
# - Topic a basso volume, pochi consumer: 3, 6 partizioni
# - Topic interni (audit, DLQ): 3 partizioni sono spesso sufficienti
# - Non creare mai piu di 4000-6000 partizioni per broker (overhead memoria)
# Creare un topic con sizing ottimale
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic pagamenti-confermati \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000 \
--config compression.type=snappy
# Verificare la distribuzione delle partizioni tra i broker
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--topic pagamenti-confermati
# Se la distribuzione e sbilanciata (troppi leader sullo stesso broker):
kafka-leader-election.sh \
--bootstrap-server kafka1:9092 \
--election-type preferred \
--all-topic-partitions
Politica de păstrare: timp vs dimensiune vs compactare
Politica de păstrare determină cât timp rămân disponibile înregistrările în Kafka. Alegerea greșită duce la două probleme opuse: fără disc (reținere prea lungă) sau reluare imposibilă pentru consumatorii lenți (retenție prea scurtă).
# Configurazioni di retention per diversi use case
# Use case 1: Event streaming real-time (clickstream, metriche)
# Retention breve, alta velocita, i dati vengono aggregati immediatamente
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic click-events \
--partitions 24 \
--replication-factor 3 \
--config retention.ms=3600000 \ # 1 ora
--config retention.bytes=5368709120 \ # 5 GB per partizione
--config compression.type=lz4 # compressione veloce
# Use case 2: Integrazione tra servizi (domain events)
# Retention media, consumer devono poter recuperare eventi recenti
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 giorni (default)
--config compression.type=snappy
# Use case 3: Audit log, compliance
# Retention lunga, dati critici per regolamentazione
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic audit-trail \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=94608000000 \ # 3 anni
--config compression.type=gzip # compressione massima per dati storici
# Use case 4: Change Data Capture (CDC), topic di stato
# Log compaction: mantiene solo ultimo valore per chiave
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic clienti-profilo \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config delete.retention.ms=86400000 # tombstone retention 24h
Repornire continuă: actualizați clusterul fără timp de nefuncționare
Sunt efectuate actualizări de cluster Kafka (versiunea software, configurația brokerului, JVM). cu a repornire la rulare: Reporniți câte un broker la un moment dat, așteptând ISR-ul reconstruiți complet înainte de a continua cu următorul broker.
# Procedura di rolling restart sicura
# 1. Verifica che il cluster sia sano prima di iniziare
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--under-replicated-partitions
# Output atteso: nessuna partizione elencata
# 2. Per ogni broker (uno alla volta):
# a) Marca il broker come "not preferred" per evitare leader election inutili
kafka-leader-election.sh --bootstrap-server kafka1:9092 \
--election-type unclean --all-topic-partitions
# b) Riavvia il broker
systemctl restart kafka
# c) ASPETTA che il broker si riunga al cluster e l'ISR si ricostruisca
# Monitora: tutte le under-replicated partitions devono tornare a 0
watch -n 5 "kafka-topics.sh --bootstrap-server kafka1:9092 --describe --under-replicated-partitions"
# d) Solo quando l'ISR e completo, passa al broker successivo
# 3. Dopo il rolling restart, ribilancia i leader
kafka-leader-election.sh \
--bootstrap-server kafka1:9092 \
--election-type preferred \
--all-topic-partitions
# Verifica finale
kafka-broker-api-versions.sh --bootstrap-server kafka1:9092
MirrorMaker 2: Geo-replicare și recuperare în caz de dezastru
MirrorMaker 2 (MM2) este componenta Kafka pentru replicarea datelor între clustere distincte. Este construit pe Kafka Connect și acceptă replicarea bidirecțională, sincronizarea offset și failover automat. Este folosit pentru:
- Recuperare în caz de dezastru (DR): se replică de la clusterul principal într-un standby într-un alt centru de date
- Geo-distribuție: Date replicate în mai multe regiuni pentru o latență scăzută pentru consumatorii locali
- Migrația: Migrare controlată între clustere fără timp de nefuncționare
# mm2.properties - Configurazione MirrorMaker 2
# Replica da cluster "primary" a "secondary" (datacenter DR)
# I due cluster
clusters=primary,secondary
# Connessione ai cluster
primary.bootstrap.servers=kafka-primary-1:9092,kafka-primary-2:9092,kafka-primary-3:9092
secondary.bootstrap.servers=kafka-secondary-1:9092,kafka-secondary-2:9092,kafka-secondary-3:9092
# Abilita la replica primary -> secondary
primary->secondary.enabled=true
# Non abilitare secondary->primary per evitare loop (solo se non e' bidirezionale)
secondary->primary.enabled=false
# Topic da replicare (regex): tutti i topic produzione escludendo interni
primary->secondary.topics=^(?!(__|\.)).*$
# Sincronizza anche le configurazioni dei topic
primary->secondary.sync.topic.configs.enabled=true
primary->secondary.sync.topic.acls.enabled=true
# Sincronizza gli offset dei consumer group
primary->secondary.sync.group.offsets.enabled=true
primary->secondary.sync.group.offsets.interval.seconds=60
# Prefisso per i topic replicati (su secondary avrai "primary.ordini-effettuati")
replication.factor=3
# Performance
tasks.max=4
producer.override.acks=all
producer.override.compression.type=snappy
# Avvio di MirrorMaker 2:
# connect-mirror-maker.sh mm2.properties
Failover cu MirrorMaker 2: Traducere offset pentru consumatori
Una dintre cele mai puternice caracteristici ale MM2 este traducere offset:
decalajele de pe clusterul secundar sunt diferite de cele de pe cel primar (copia poate să nu aibă
toate înregistrările dacă există întârziere de replicare). MM2 menține un tabel de mapare în subiect
mm2-offset-syncs.primary.internal pentru a traduce offset-urile.
# RemoteClusterUtils: tradurre gli offset per il failover
# Da usare durante un failover emergency per far ripartire i consumer
# dal punto corretto sul cluster secondary
from confluent_kafka.admin import AdminClient
import json
# Script di failover: trova l'offset tradotto per ogni consumer group
def translate_offsets_for_failover(primary_group_id, secondary_bootstrap):
"""
Usa i checkpoint di MM2 per trovare l'offset equivalente
su secondary per un consumer group originalmente su primary.
"""
admin = AdminClient({'bootstrap.servers': secondary_bootstrap})
# MM2 scrive i checkpoint nel topic dedicato
# Topic: mm2-checkpoints.primary.internal
# Leggi i checkpoint per il gruppo specifico
# In alternativa, usa la MM2 RemoteClusterUtils Java API:
# Map<TopicPartition, OffsetAndMetadata> translated =
# RemoteClusterUtils.translateOffsets(
# adminClient, "primary", groupId, Duration.ofMinutes(1));
print(f"Failover completato: consumer group '{primary_group_id}' "
f"ora punta al cluster secondary con offset tradotti")
# Dopo il failover, cambia il bootstrap.servers dei consumer
# da primary a secondary e ripartono dal punto corretto
Configurarea subiectelor pentru producție: Lista de verificare
# Script di creazione topic production-ready con tutti i parametri
create_production_topic() {
TOPIC=$1
PARTITIONS=$2
RETENTION_DAYS=$3
RETENTION_MS=$((RETENTION_DAYS * 24 * 3600 * 1000))
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic $TOPIC \
--partitions $PARTITIONS \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=$RETENTION_MS \
--config compression.type=snappy \
--config max.message.bytes=10485760 \ # 10 MB max message
--if-not-exists
echo "Topic $TOPIC creato: $PARTITIONS partizioni, RF=3, MIR=2, retention=${RETENTION_DAYS}d"
}
# Crea i topic principali
create_production_topic ordini-effettuati 12 7
create_production_topic pagamenti-confermati 12 30
create_production_topic audit-trail 3 1095 # 3 anni
create_production_topic ordini-effettuati.DLT 3 30 # DLQ
# Verifica di tutti i topic
kafka-topics.sh --list --bootstrap-server kafka1:9092
# Verifica configurazione di un topic specifico
kafka-configs.sh --bootstrap-server kafka1:9092 \
--entity-type topics \
--entity-name ordini-effettuati \
--describe
Kafka pe Kubernetes cu operatorul Strimzi
Pentru implementarea pe Kubernetes, Strimzi este operatorul de referință oficial (parte a cutiei de nisip CNCF). Gestionează crearea, actualizarea și configurarea clusterelor Kafka prin Kubernetes Custom Resource Definitions (CRD).
# kafka-cluster.yaml - Strimzi KafkaNodePool + Kafka CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker
namespace: kafka
labels:
strimzi.io/cluster: production-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: persistent-claim
size: 2Ti # 2 TB per broker
class: fast-ssd
resources:
requests:
memory: 32Gi
cpu: "4"
limits:
memory: 64Gi
cpu: "8"
jvmOptions:
-Xms: 6144m
-Xmx: 6144m
gcLoggingEnabled: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: production-cluster
namespace: kafka
spec:
kafka:
version: 4.0.0
metadataVersion: "4.0"
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
auto.create.topics.enable: "false"
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.retention.hours: 168
compression.type: snappy
zookeeper:
replicas: 0 # KRaft: niente ZooKeeper in Kafka 4.0
Ajustarea performanței: configurațiile care schimbă totul
Anti-modele comune în producție
- acks=0 pentru viteza: zero garanții de durabilitate, mesaje pierdute în tăcere în cazul eșecului brokerului
- replication.factor=1: fără redundanță, pierderea unui broker distruge datele
- auto.create.topics.enable=true: subiecte create de greșeală de scriere, imposibil de verificat
- Heap JVM > 8 GB: provoacă pauze lungi GC care degradează performanța brokerului
- Prea puține partiții: blocaj paralelism imposibil de rezolvat fără timp de nefuncționare
# Configurazioni producer per massimo throughput (batch processing)
props.put("batch.size", 65536); # 64 KB per batch
props.put("linger.ms", 20); # Aspetta 20ms per riempire il batch
props.put("compression.type", "snappy"); # Snappy: buon bilanciamento velocita/ratio
props.put("buffer.memory", 67108864); # 64 MB buffer totale
# Configurazioni consumer per massimo throughput
props.put("fetch.min.bytes", 65536); # 64 KB fetch minimo
props.put("fetch.max.wait.ms", 500); # Aspetta max 500ms se non ci sono dati
props.put("max.partition.fetch.bytes", 10485760); # 10 MB per fetch per partizione
props.put("max.poll.records", 500); # 500 record per poll()
# Per sistemi a bassa latenza (trading, real-time alerts):
props.put("linger.ms", 0); # Invia immediatamente
props.put("batch.size", 1); # Nessun batching
props.put("acks", "1"); # Solo leader ack (senza aspettare ISR)
Rezumat: Lista de verificare pentru Kafka în producție
- Cluster KRaft cu minim 3 brokeri (5+ pentru disponibilitate ridicată)
replication.factor=3,min.insync.replicas=2pe toate subiectele criticeauto.create.topics.enable=false: Gestionați subiectele în mod explicit- Heap JVM 6-8 GB, G1GC configurat, restul memoriei RAM pentru cache-ul paginii
- Conectați-vă pe un disc dedicat (NVMe SSD), separat de sistemul de operare
- Monitorizare: JMX Exporter + Prometheus + Grafana (vezi articolul 9)
- Alertă activată: partiții offline, partiții subreplicate, întârziere de consum
- DLQ configurat pentru fiecare grup de consumatori (a se vedea articolul 10)
- MirrorMaker 2 pentru replicare geografică și DR în centrul de date secundar
- Procedura de repornire la rulare documentată și testată în mod regulat
- Planificarea capacității este revizuită trimestrial pe baza tendințelor de creștere
Sfârșitul seriei: pașii următori
Ați finalizat seria completă Apache Kafka. Iată cum puteți afla mai multe despre subiectele conexe:
- Arhitectură bazată pe evenimente (Seria 39): aplicați Kafka ca coloană vertebrală pentru modelele EDA, Saga, CQRS și Outbox în sisteme complexe de microservicii.
- PostgreSQL AI și Debezium CDC: Folosiți Kafka ca o conductă pentru a propaga modificările de la baza de date la serviciile din aval în timp real.
Legătură cu alte serii
- Arhitectură Event-Driven – Saga Pattern și CQRS: Kafka ca broker de mesaje să implementeze modelul Saga și proiecțiile CQRS în sistemele de microservicii.
- Monitorizare cu Prometheus și Grafana (articolul 9): valorile cluster pe care le aveți configurate în acest ghid sunt cele expuse prin JMX Exporter și afișate pe Grafana.







