Clustergrootte: hoeveel makelaars en welke grootte

Het dimensioneren van een Kafka-cluster begint met twee fundamentele vragen: wat is de doorvoer beoogde schrijfsnelheid (MB/s) en wat is de vereiste retentie (hoe lang moeten de gegevens beschikbaar blijven)? Het antwoord op deze twee vragen bepaalt het aantal makelaars, de schijfgrootte en het benodigde netwerk.

Formule makelaarsnummer

Iedere Kafka-makelaar kan ca 100-200 MB/sec van de doorvoer schrijven naar standaardhardware (NVMe SSD-schijf, 10 Gbps-netwerk). Deze waarde hangt sterk af vanuit het werkbelastingprofiel (berichtgrootte, aantal partities, acks-instelling).

# Formula per il sizing del cluster
#
# Throughput totale in scrittura: T_write = messaggi_al_secondo * dimensione_media_messaggio
# Throughput totale con replica:  T_total = T_write * replication_factor
# Numero di broker:               N_broker = ceil(T_total / throughput_per_broker)
#
# Esempio pratico:
# - 100.000 messaggi/s * 1 KB/messaggio = 100 MB/s throughput in scrittura
# - Replication factor 3:                 100 * 3 = 300 MB/s throughput totale
# - Throughput per broker: 150 MB/s       300 / 150 = 2 broker
# - Aggiungi 30% di margine:              ceil(2 * 1.3) = 3 broker
#
# Per la memoria RAM:
# - Kafka usa la page cache del SO per le letture recenti (hot data)
# - Regola empirica: 64-128 GB RAM per broker (o almeno 50% del dataset "caldo")
# - JVM heap: 6-8 GB (non di piu: il resto serve alla page cache)
#
# Per il disco:
# Disco_per_broker = (T_write * retention_giorni * 86400 * replication_factor) / N_broker

# Esempio: 100 MB/s, 7 giorni di retention, replication factor 3, 3 broker
# = (100 * 7 * 86400 * 3) / 3 = 181.440.000 MB ~= 173 TB totale / 3 broker = ~58 TB per broker
# Arrotonda a 80 TB con margine

Aanbevolen hardware voor Kafka Broker in productie

  • CPU: 16-32 cores (Kafka is niet CPU-gebonden, maar compressie gebruikt het)
  • RAM: 64-128 GB (paginacache is essentieel voor prestaties)
  • Disco: NVMe SSD RAID 10 of meer afzonderlijke schijven (afzonderlijk gemonteerd voor parallelle I/O)
  • Netto: Minimaal 10 Gbps, 25 Gbps voor clusters met hoge doorvoer
  • OS: Linux met ext4 of XFS, bestandssysteem geoptimaliseerd voor sequentiële schrijfbewerkingen
  • JVM: Java 21 (LTS), G1GC, 6-8 GB heap, -XX:MaxGCPauseMillis=20

Optimale Broker-configuratie: server.properties

Standaard Kafka-configuraties zijn vaak niet optimaal voor productie. Hier zijn de eigenschappen het belangrijkste om in te passen server.properties:

# server.properties - configurazione production-ready per Kafka 4.0

# ===== Rete e I/O =====
# Thread per le richieste di rete
num.network.threads=8
# Thread per le operazioni di I/O su disco
num.io.threads=16
# Buffer per richieste/risposte di rete
socket.send.buffer.bytes=1048576      # 1 MB
socket.receive.buffer.bytes=1048576   # 1 MB
socket.request.max.bytes=104857600    # 100 MB

# ===== Log e Disco =====
# Separare i log su piu dischi per I/O parallelo
log.dirs=/disk1/kafka-data,/disk2/kafka-data,/disk3/kafka-data

# Numero di thread per il log recovery all'avvio
num.recovery.threads.per.data.dir=4

# Flush su disco: NON abbassare questi valori, usa acks=all invece
log.flush.interval.messages=10000000
log.flush.interval.ms=1000

# ===== Retention Default =====
# Retention temporale (7 giorni)
log.retention.hours=168
# Retention per dimensione (-1 = illimitata per default)
log.retention.bytes=-1
# Dimensione massima del segmento log (1 GB)
log.segment.bytes=1073741824
# Intervallo di controllo per la retention
log.retention.check.interval.ms=300000

# ===== Replica =====
# Numero minimo di ISR per accettare scritture (con acks=all)
min.insync.replicas=2
# Default replication factor per topic auto-creati
default.replication.factor=3
# Replication factor per topic interni
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# ===== Performance Replica =====
# Dimensione buffer per la replica
replica.fetch.max.bytes=10485760      # 10 MB
replica.socket.receive.buffer.bytes=10485760
# Timeout per il leader election
leader.imbalance.check.interval.seconds=300

# ===== Consumer Groups =====
# Timeout sessione consumer
group.initial.rebalance.delay.ms=3000
# ===== Auto Create Topics =====
# DISABILITARE in produzione per evitare topic creati per errore
auto.create.topics.enable=false

Aantal partities: hoeveel er te maken

De keuze van het aantal partities is een van de meest kritische en minst omkeerbare in Kafka: zij kunnen toename de partities (met kafka-topics.sh --alter) Maar niet afnemen. Het vergroten van partities heeft invloed op het sorteren en opnieuw in evenwicht brengen.

# Regola pratica per il numero di partizioni:
# max(throughput_MB_s / 10, consumer_max_paralleli, producer_max_paralleli)
#
# Esempio:
# - Throughput target: 50 MB/s  --> almeno 5 partizioni per throughput
# - Consumer parallelismo max: 12 istanze --> almeno 12 partizioni
# - Scelta: 12 partizioni
#
# Linee guida pratiche:
# - Topic ad alto volume, molti consumer: 12, 24, 48 partizioni
# - Topic a basso volume, pochi consumer: 3, 6 partizioni
# - Topic interni (audit, DLQ): 3 partizioni sono spesso sufficienti
# - Non creare mai piu di 4000-6000 partizioni per broker (overhead memoria)

# Creare un topic con sizing ottimale
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic pagamenti-confermati \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config retention.ms=604800000 \
  --config compression.type=snappy

# Verificare la distribuzione delle partizioni tra i broker
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --topic pagamenti-confermati

# Se la distribuzione e sbilanciata (troppi leader sullo stesso broker):
kafka-leader-election.sh \
  --bootstrap-server kafka1:9092 \
  --election-type preferred \
  --all-topic-partitions

Bewaarbeleid: tijd versus omvang versus compactie

Het bewaarbeleid bepaalt hoe lang documenten beschikbaar blijven in Kafka. De verkeerde keuze leidt tot twee tegengestelde problemen: geen schijf meer (te lange retentie) of herhaling onmogelijk voor langzame consumenten (retentie te kort).

# Configurazioni di retention per diversi use case

# Use case 1: Event streaming real-time (clickstream, metriche)
# Retention breve, alta velocita, i dati vengono aggregati immediatamente
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic click-events \
  --partitions 24 \
  --replication-factor 3 \
  --config retention.ms=3600000 \        # 1 ora
  --config retention.bytes=5368709120 \  # 5 GB per partizione
  --config compression.type=lz4          # compressione veloce

# Use case 2: Integrazione tra servizi (domain events)
# Retention media, consumer devono poter recuperare eventi recenti
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \  # 7 giorni (default)
  --config compression.type=snappy

# Use case 3: Audit log, compliance
# Retention lunga, dati critici per regolamentazione
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic audit-trail \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=94608000000 \  # 3 anni
  --config compression.type=gzip       # compressione massima per dati storici

# Use case 4: Change Data Capture (CDC), topic di stato
# Log compaction: mantiene solo ultimo valore per chiave
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic clienti-profilo \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config delete.retention.ms=86400000  # tombstone retention 24h

Rollende herstart: upgrade het cluster zonder downtime

Er worden Kafka-clusterupdates (softwareversie, brokerconfiguratie, JVM) uitgevoerd met een rollende herstart: U herstart één makelaar tegelijk, wachtend op de ISR volledig opnieuw opbouwen voordat u doorgaat met de volgende makelaar.

# Procedura di rolling restart sicura

# 1. Verifica che il cluster sia sano prima di iniziare
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --under-replicated-partitions
# Output atteso: nessuna partizione elencata

# 2. Per ogni broker (uno alla volta):
# a) Marca il broker come "not preferred" per evitare leader election inutili
kafka-leader-election.sh --bootstrap-server kafka1:9092 \
  --election-type unclean --all-topic-partitions

# b) Riavvia il broker
systemctl restart kafka

# c) ASPETTA che il broker si riunga al cluster e l'ISR si ricostruisca
# Monitora: tutte le under-replicated partitions devono tornare a 0
watch -n 5 "kafka-topics.sh --bootstrap-server kafka1:9092 --describe --under-replicated-partitions"

# d) Solo quando l'ISR e completo, passa al broker successivo

# 3. Dopo il rolling restart, ribilancia i leader
kafka-leader-election.sh \
  --bootstrap-server kafka1:9092 \
  --election-type preferred \
  --all-topic-partitions

# Verifica finale
kafka-broker-api-versions.sh --bootstrap-server kafka1:9092

MirrorMaker 2: georeplicatie en noodherstel

SpiegelMaker 2 (MM2) is de Kafka-component voor het repliceren van gegevens tussen verschillende clusters. Het is gebouwd op Kafka Connect en ondersteunt bidirectionele replicatie en offsetsynchronisatie en automatische failover. Het wordt gebruikt voor:

  • Noodherstel (DR): Repliceert van het primaire cluster naar een stand-by in een ander datacenter
  • Geo-distributie: gegevens gerepliceerd over meerdere regio's voor lage latentie voor lokale consumenten
  • Migratie: Gecontroleerde migratie tussen clusters zonder downtime
# mm2.properties - Configurazione MirrorMaker 2
# Replica da cluster "primary" a "secondary" (datacenter DR)

# I due cluster
clusters=primary,secondary

# Connessione ai cluster
primary.bootstrap.servers=kafka-primary-1:9092,kafka-primary-2:9092,kafka-primary-3:9092
secondary.bootstrap.servers=kafka-secondary-1:9092,kafka-secondary-2:9092,kafka-secondary-3:9092

# Abilita la replica primary -> secondary
primary->secondary.enabled=true
# Non abilitare secondary->primary per evitare loop (solo se non e' bidirezionale)
secondary->primary.enabled=false

# Topic da replicare (regex): tutti i topic produzione escludendo interni
primary->secondary.topics=^(?!(__|\.)).*$

# Sincronizza anche le configurazioni dei topic
primary->secondary.sync.topic.configs.enabled=true
primary->secondary.sync.topic.acls.enabled=true

# Sincronizza gli offset dei consumer group
primary->secondary.sync.group.offsets.enabled=true
primary->secondary.sync.group.offsets.interval.seconds=60

# Prefisso per i topic replicati (su secondary avrai "primary.ordini-effettuati")
replication.factor=3

# Performance
tasks.max=4
producer.override.acks=all
producer.override.compression.type=snappy

# Avvio di MirrorMaker 2:
# connect-mirror-maker.sh mm2.properties

Failover met MirrorMaker 2: consumentencompensatievertaling

Een van de krachtigste functies van MM2 is de gecompenseerde vertaling: de verschuivingen op het secundaire cluster verschillen van die op het primaire cluster (de kopie heeft dit mogelijk niet). alle records als er replicatievertraging is). MM2 houdt een toewijzingstabel bij in het onderwerp mm2-offset-syncs.primary.internal om de offsets te vertalen.

# RemoteClusterUtils: tradurre gli offset per il failover
# Da usare durante un failover emergency per far ripartire i consumer
# dal punto corretto sul cluster secondary

from confluent_kafka.admin import AdminClient
import json

# Script di failover: trova l'offset tradotto per ogni consumer group
def translate_offsets_for_failover(primary_group_id, secondary_bootstrap):
    """
    Usa i checkpoint di MM2 per trovare l'offset equivalente
    su secondary per un consumer group originalmente su primary.
    """
    admin = AdminClient({'bootstrap.servers': secondary_bootstrap})

    # MM2 scrive i checkpoint nel topic dedicato
    # Topic: mm2-checkpoints.primary.internal
    # Leggi i checkpoint per il gruppo specifico

    # In alternativa, usa la MM2 RemoteClusterUtils Java API:
    # Map<TopicPartition, OffsetAndMetadata> translated =
    #   RemoteClusterUtils.translateOffsets(
    #     adminClient, "primary", groupId, Duration.ofMinutes(1));

    print(f"Failover completato: consumer group '{primary_group_id}' "
          f"ora punta al cluster secondary con offset tradotti")

# Dopo il failover, cambia il bootstrap.servers dei consumer
# da primary a secondary e ripartono dal punto corretto

Onderwerpen voor productie configureren: checklist

# Script di creazione topic production-ready con tutti i parametri

create_production_topic() {
    TOPIC=$1
    PARTITIONS=$2
    RETENTION_DAYS=$3

    RETENTION_MS=$((RETENTION_DAYS * 24 * 3600 * 1000))

    kafka-topics.sh --create \
        --bootstrap-server kafka1:9092 \
        --topic $TOPIC \
        --partitions $PARTITIONS \
        --replication-factor 3 \
        --config min.insync.replicas=2 \
        --config retention.ms=$RETENTION_MS \
        --config compression.type=snappy \
        --config max.message.bytes=10485760 \  # 10 MB max message
        --if-not-exists

    echo "Topic $TOPIC creato: $PARTITIONS partizioni, RF=3, MIR=2, retention=${RETENTION_DAYS}d"
}

# Crea i topic principali
create_production_topic ordini-effettuati 12 7
create_production_topic pagamenti-confermati 12 30
create_production_topic audit-trail 3 1095        # 3 anni
create_production_topic ordini-effettuati.DLT 3 30  # DLQ

# Verifica di tutti i topic
kafka-topics.sh --list --bootstrap-server kafka1:9092

# Verifica configurazione di un topic specifico
kafka-configs.sh --bootstrap-server kafka1:9092 \
    --entity-type topics \
    --entity-name ordini-effettuati \
    --describe

Kafka op Kubernetes met Strimzi Operator

Voor implementatie op Kubernetes: Strimzi is de officiële referentie-operator (onderdeel van de CNCF-sandbox). Beheert het maken, bijwerken en configureren van Kafka-clusters via Kubernetes Custom Resource Definitions (CRD).

# kafka-cluster.yaml - Strimzi KafkaNodePool + Kafka CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  namespace: kafka
  labels:
    strimzi.io/cluster: production-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: persistent-claim
    size: 2Ti       # 2 TB per broker
    class: fast-ssd
  resources:
    requests:
      memory: 32Gi
      cpu: "4"
    limits:
      memory: 64Gi
      cpu: "8"
  jvmOptions:
    -Xms: 6144m
    -Xmx: 6144m
    gcLoggingEnabled: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: production-cluster
  namespace: kafka
spec:
  kafka:
    version: 4.0.0
    metadataVersion: "4.0"
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      auto.create.topics.enable: "false"
      default.replication.factor: 3
      min.insync.replicas: 2
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.retention.hours: 168
      compression.type: snappy
  zookeeper:
    replicas: 0  # KRaft: niente ZooKeeper in Kafka 4.0

Prestatieafstemming: de configuraties die alles veranderen

Veelvoorkomende antipatronen in de productie

  • acks=0 voor snelheid: nulduurzaamheidsgaranties, berichten gaan stilletjes verloren in geval van falen van de makelaar
  • replicatie.factor=1: geen redundantie, het verlies van een makelaar vernietigt gegevens
  • auto.create.topics.enable=waar: onderwerpen gemaakt door een typfout, onmogelijk te controleren
  • JVM-heap > 8 GB: veroorzaakt lange pauzes in de GC die de prestaties van de makelaar verslechteren
  • Te weinig partities: parallellisme knelpunt onmogelijk op te lossen zonder downtime
# Configurazioni producer per massimo throughput (batch processing)
props.put("batch.size", 65536);          # 64 KB per batch
props.put("linger.ms", 20);              # Aspetta 20ms per riempire il batch
props.put("compression.type", "snappy"); # Snappy: buon bilanciamento velocita/ratio
props.put("buffer.memory", 67108864);    # 64 MB buffer totale

# Configurazioni consumer per massimo throughput
props.put("fetch.min.bytes", 65536);     # 64 KB fetch minimo
props.put("fetch.max.wait.ms", 500);     # Aspetta max 500ms se non ci sono dati
props.put("max.partition.fetch.bytes", 10485760);  # 10 MB per fetch per partizione
props.put("max.poll.records", 500);      # 500 record per poll()

# Per sistemi a bassa latenza (trading, real-time alerts):
props.put("linger.ms", 0);              # Invia immediatamente
props.put("batch.size", 1);             # Nessun batching
props.put("acks", "1");                 # Solo leader ack (senza aspettare ISR)

Samenvatting: Checklist voor Kafka in productie

  • KRaft-cluster met minimaal 3 makelaars (5+ voor hoge beschikbaarheid)
  • replication.factor=3, min.insync.replicas=2 over alle cruciale onderwerpen
  • auto.create.topics.enable=false: Onderwerpen expliciet beheren
  • JVM-heap 6-8 GB, G1GC geconfigureerd, rest van RAM voor paginacache
  • Meld u aan op een speciale schijf (NVMe SSD), los van het besturingssysteem
  • Monitoring: JMX Exporteur + Prometheus + Grafana (zie artikel 9)
  • Waarschuwing voor: offline partities, te weinig gerepliceerde partities, consumentenvertraging
  • DLQ geconfigureerd voor elke consumentengroep (zie artikel 10)
  • MirrorMaker 2 voor geo-replicatie en DR op secundair datacenter
  • Rollende herstartprocedure gedocumenteerd en regelmatig getest
  • Capaciteitsplanning wordt elk kwartaal herzien op basis van groeitrends

Einde van de serie: volgende stappen

Je hebt de volledige Apache Kafka-serie voltooid. Hier leest u hoe u meer te weten kunt komen over de gerelateerde onderwerpen:

  • Gebeurtenisgestuurde architectuur (serie 39): Kafka toepassen als ruggengraat voor EDA-patronen, Saga, CQRS en Outbox in complexe microservicesystemen.
  • PostgreSQL AI en Debezium CDC: Gebruik Kafka als pijplijn om wijzigingen door te geven van database tot downstream-services in realtime.

Link met andere series

  • Gebeurtenisgestuurde architectuur – Saga Pattern en CQRS: Kafka als berichtenmakelaar om de Saga Pattern- en CQRS-projecties in microservicesystemen te implementeren.
  • Monitoring met Prometheus en Grafana (artikel 9): De clusterstatistieken die u heeft die in deze handleiding zijn geconfigureerd, zijn de bestanden die via JMX Exporter worden weergegeven en op Grafana worden weergegeven.