Üretimde Kafka: Boyutlandırma, Saklama, Çoğaltma ve Olağanüstü Durum Kurtarma
Kafka'yı üretime geçirmek sadece üç komisyoncuyu işe almaktan ibaret değil. Anlamı doğru boyut beklenen verime dayalı küme, dayanıklılık garantileri için saklama ve çoğaltma faktörünü yapılandırın MirrorMaker 2 ile isteklerinizi gerçekleştirin, kesinti olmadan bakımı planlayın ve bir felaket kurtarma planı hazırlayın. Bu operasyonel kılavuz, kurumsal bir Kafka kümesi için en kritik kararları toplar.
Küme Boyutu: Kaç Broker ve Hangi Boyutta?
Kafka kümesini boyutlandırmak iki temel soruyla başlar: Verim nedir? hedef yazma hızı (MB/s) ve gereken saklama süresi nedir (veriler ne kadar süreyle kullanılabilir durumda kalmalıdır)? Bu iki sorunun cevabı, gereken aracı sayısını, disk boyutunu ve ağı belirler.
Broker Numarası Formülü
Her Kafka komisyoncusu yaklaşık olarak sürdürülebilir bir şekilde yönetebilir 100-200 MB/sn verim emtia donanımına yazma (NVMe SSD disk, 10 Gbps ağ). Bu değer birçok şeye bağlıdır iş yükü profilinden (ileti boyutu, bölüm sayısı, acks ayarı).
# Formula per il sizing del cluster
#
# Throughput totale in scrittura: T_write = messaggi_al_secondo * dimensione_media_messaggio
# Throughput totale con replica: T_total = T_write * replication_factor
# Numero di broker: N_broker = ceil(T_total / throughput_per_broker)
#
# Esempio pratico:
# - 100.000 messaggi/s * 1 KB/messaggio = 100 MB/s throughput in scrittura
# - Replication factor 3: 100 * 3 = 300 MB/s throughput totale
# - Throughput per broker: 150 MB/s 300 / 150 = 2 broker
# - Aggiungi 30% di margine: ceil(2 * 1.3) = 3 broker
#
# Per la memoria RAM:
# - Kafka usa la page cache del SO per le letture recenti (hot data)
# - Regola empirica: 64-128 GB RAM per broker (o almeno 50% del dataset "caldo")
# - JVM heap: 6-8 GB (non di piu: il resto serve alla page cache)
#
# Per il disco:
# Disco_per_broker = (T_write * retention_giorni * 86400 * replication_factor) / N_broker
# Esempio: 100 MB/s, 7 giorni di retention, replication factor 3, 3 broker
# = (100 * 7 * 86400 * 3) / 3 = 181.440.000 MB ~= 173 TB totale / 3 broker = ~58 TB per broker
# Arrotonda a 80 TB con margine
Üretimde Kafka Broker için Önerilen Donanım
- İşlemci: 16-32 çekirdek (Kafka CPU'ya bağlı değildir ancak sıkıştırma onu kullanır)
- Veri deposu: 64-128 GB (sayfa önbelleği performans için gereklidir)
- Disko: NVMe SSD RAID 10 veya daha fazla ayrı disk (paralel G/Ç için ayrı olarak monte edilir)
- Açık: Minimum 10 Gbps, yüksek verimli kümeler için 25 Gbps
- OS: Ext4 veya XFS'li Linux, sıralı yazma işlemleri için optimize edilmiş dosya sistemi
- JVM: Java 21 (LTS), G1GC, 6-8 GB yığın, -XX:MaxGCPauseMillis=20
Optimum Aracı Yapılandırması: server.properties
Varsayılan Kafka konfigürasyonları genellikle üretim için idealin altındadır. İşte özellikler
özelleştirmek için en önemli şey server.properties:
# server.properties - configurazione production-ready per Kafka 4.0
# ===== Rete e I/O =====
# Thread per le richieste di rete
num.network.threads=8
# Thread per le operazioni di I/O su disco
num.io.threads=16
# Buffer per richieste/risposte di rete
socket.send.buffer.bytes=1048576 # 1 MB
socket.receive.buffer.bytes=1048576 # 1 MB
socket.request.max.bytes=104857600 # 100 MB
# ===== Log e Disco =====
# Separare i log su piu dischi per I/O parallelo
log.dirs=/disk1/kafka-data,/disk2/kafka-data,/disk3/kafka-data
# Numero di thread per il log recovery all'avvio
num.recovery.threads.per.data.dir=4
# Flush su disco: NON abbassare questi valori, usa acks=all invece
log.flush.interval.messages=10000000
log.flush.interval.ms=1000
# ===== Retention Default =====
# Retention temporale (7 giorni)
log.retention.hours=168
# Retention per dimensione (-1 = illimitata per default)
log.retention.bytes=-1
# Dimensione massima del segmento log (1 GB)
log.segment.bytes=1073741824
# Intervallo di controllo per la retention
log.retention.check.interval.ms=300000
# ===== Replica =====
# Numero minimo di ISR per accettare scritture (con acks=all)
min.insync.replicas=2
# Default replication factor per topic auto-creati
default.replication.factor=3
# Replication factor per topic interni
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ===== Performance Replica =====
# Dimensione buffer per la replica
replica.fetch.max.bytes=10485760 # 10 MB
replica.socket.receive.buffer.bytes=10485760
# Timeout per il leader election
leader.imbalance.check.interval.seconds=300
# ===== Consumer Groups =====
# Timeout sessione consumer
group.initial.rebalance.delay.ms=3000
# ===== Auto Create Topics =====
# DISABILITARE in produzione per evitare topic creati per errore
auto.create.topics.enable=false
Bölüm Sayısı: Kaç tane Oluşturulacak
Bölüm sayısının seçimi Kafka'da en kritik ve en az geri döndürülebilir olanlardan biridir:
yapabilirler arttırmak bölümler (ile kafka-topics.sh --alter) Ancak
azalma. Bölümlerin arttırılması sıralamayı ve yeniden dengelemeyi etkiler.
# Regola pratica per il numero di partizioni:
# max(throughput_MB_s / 10, consumer_max_paralleli, producer_max_paralleli)
#
# Esempio:
# - Throughput target: 50 MB/s --> almeno 5 partizioni per throughput
# - Consumer parallelismo max: 12 istanze --> almeno 12 partizioni
# - Scelta: 12 partizioni
#
# Linee guida pratiche:
# - Topic ad alto volume, molti consumer: 12, 24, 48 partizioni
# - Topic a basso volume, pochi consumer: 3, 6 partizioni
# - Topic interni (audit, DLQ): 3 partizioni sono spesso sufficienti
# - Non creare mai piu di 4000-6000 partizioni per broker (overhead memoria)
# Creare un topic con sizing ottimale
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic pagamenti-confermati \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000 \
--config compression.type=snappy
# Verificare la distribuzione delle partizioni tra i broker
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--topic pagamenti-confermati
# Se la distribuzione e sbilanciata (troppi leader sullo stesso broker):
kafka-leader-election.sh \
--bootstrap-server kafka1:9092 \
--election-type preferred \
--all-topic-partitions
Saklama Politikası: Zaman, Boyut ve Sıkıştırma
Saklama ilkesi, kayıtların Kafka'da ne kadar süreyle kullanılabilir kalacağını belirler. Yanlış seçim iki zıt soruna yol açar: yetersiz disk (çok uzun süreli saklama) veya Yavaş tüketiciler için tekrar oynatma imkansız (saklama süresi çok kısa).
# Configurazioni di retention per diversi use case
# Use case 1: Event streaming real-time (clickstream, metriche)
# Retention breve, alta velocita, i dati vengono aggregati immediatamente
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic click-events \
--partitions 24 \
--replication-factor 3 \
--config retention.ms=3600000 \ # 1 ora
--config retention.bytes=5368709120 \ # 5 GB per partizione
--config compression.type=lz4 # compressione veloce
# Use case 2: Integrazione tra servizi (domain events)
# Retention media, consumer devono poter recuperare eventi recenti
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 giorni (default)
--config compression.type=snappy
# Use case 3: Audit log, compliance
# Retention lunga, dati critici per regolamentazione
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic audit-trail \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=94608000000 \ # 3 anni
--config compression.type=gzip # compressione massima per dati storici
# Use case 4: Change Data Capture (CDC), topic di stato
# Log compaction: mantiene solo ultimo valore per chiave
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic clienti-profilo \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config delete.retention.ms=86400000 # tombstone retention 24h
Devamlı Yeniden Başlatma: Kümeyi Kesinti Süresi Olmadan Yükseltme
Kafka kümesi güncellemeleri (yazılım sürümü, aracı yapılandırması, JVM) gerçekleştirilir bir ile yuvarlanan yeniden başlatma: ISR'yi bekleyerek her seferinde bir aracıyı yeniden başlatırsınız Bir sonraki komisyoncuya geçmeden önce tamamen yeniden inşa edin.
# Procedura di rolling restart sicura
# 1. Verifica che il cluster sia sano prima di iniziare
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--under-replicated-partitions
# Output atteso: nessuna partizione elencata
# 2. Per ogni broker (uno alla volta):
# a) Marca il broker come "not preferred" per evitare leader election inutili
kafka-leader-election.sh --bootstrap-server kafka1:9092 \
--election-type unclean --all-topic-partitions
# b) Riavvia il broker
systemctl restart kafka
# c) ASPETTA che il broker si riunga al cluster e l'ISR si ricostruisca
# Monitora: tutte le under-replicated partitions devono tornare a 0
watch -n 5 "kafka-topics.sh --bootstrap-server kafka1:9092 --describe --under-replicated-partitions"
# d) Solo quando l'ISR e completo, passa al broker successivo
# 3. Dopo il rolling restart, ribilancia i leader
kafka-leader-election.sh \
--bootstrap-server kafka1:9092 \
--election-type preferred \
--all-topic-partitions
# Verifica finale
kafka-broker-api-versions.sh --bootstrap-server kafka1:9092
MirrorMaker 2: Coğrafi Çoğaltma ve Olağanüstü Durum Kurtarma
MirrorMaker 2 (MM2) farklı kümeler arasında verileri kopyalamaya yönelik Kafka bileşenidir. Kafka Connect üzerine kurulmuştur ve çift yönlü çoğaltmayı, dengeleme senkronizasyonunu destekler ve otomatik yük devretme. Şunlar için kullanılır:
- Olağanüstü Durum Kurtarma (DR): Birincil kümeden farklı bir veri merkezindeki yedek konuma kopyalanır
- Coğrafi dağıtım: Yerel tüketicilere düşük gecikme süresi sağlamak için veriler birden fazla bölgede çoğaltılır
- Göç: Kesinti süresi olmadan kümeler arasında kontrollü geçiş
# mm2.properties - Configurazione MirrorMaker 2
# Replica da cluster "primary" a "secondary" (datacenter DR)
# I due cluster
clusters=primary,secondary
# Connessione ai cluster
primary.bootstrap.servers=kafka-primary-1:9092,kafka-primary-2:9092,kafka-primary-3:9092
secondary.bootstrap.servers=kafka-secondary-1:9092,kafka-secondary-2:9092,kafka-secondary-3:9092
# Abilita la replica primary -> secondary
primary->secondary.enabled=true
# Non abilitare secondary->primary per evitare loop (solo se non e' bidirezionale)
secondary->primary.enabled=false
# Topic da replicare (regex): tutti i topic produzione escludendo interni
primary->secondary.topics=^(?!(__|\.)).*$
# Sincronizza anche le configurazioni dei topic
primary->secondary.sync.topic.configs.enabled=true
primary->secondary.sync.topic.acls.enabled=true
# Sincronizza gli offset dei consumer group
primary->secondary.sync.group.offsets.enabled=true
primary->secondary.sync.group.offsets.interval.seconds=60
# Prefisso per i topic replicati (su secondary avrai "primary.ordini-effettuati")
replication.factor=3
# Performance
tasks.max=4
producer.override.acks=all
producer.override.compression.type=snappy
# Avvio di MirrorMaker 2:
# connect-mirror-maker.sh mm2.properties
MirrorMaker 2 ile Yük Devretme: Tüketici Ofset Çevirisi
MM2'nin en güçlü özelliklerinden biri ofset çeviri:
ikincil kümedeki uzaklıklar birincildekilerden farklıdır (kopyada
çoğaltma gecikmesi varsa tüm kayıtlar). MM2, konuyla ilgili bir eşleme tablosu tutar
mm2-offset-syncs.primary.internal ofsetleri çevirmek için.
# RemoteClusterUtils: tradurre gli offset per il failover
# Da usare durante un failover emergency per far ripartire i consumer
# dal punto corretto sul cluster secondary
from confluent_kafka.admin import AdminClient
import json
# Script di failover: trova l'offset tradotto per ogni consumer group
def translate_offsets_for_failover(primary_group_id, secondary_bootstrap):
"""
Usa i checkpoint di MM2 per trovare l'offset equivalente
su secondary per un consumer group originalmente su primary.
"""
admin = AdminClient({'bootstrap.servers': secondary_bootstrap})
# MM2 scrive i checkpoint nel topic dedicato
# Topic: mm2-checkpoints.primary.internal
# Leggi i checkpoint per il gruppo specifico
# In alternativa, usa la MM2 RemoteClusterUtils Java API:
# Map<TopicPartition, OffsetAndMetadata> translated =
# RemoteClusterUtils.translateOffsets(
# adminClient, "primary", groupId, Duration.ofMinutes(1));
print(f"Failover completato: consumer group '{primary_group_id}' "
f"ora punta al cluster secondary con offset tradotti")
# Dopo il failover, cambia il bootstrap.servers dei consumer
# da primary a secondary e ripartono dal punto corretto
Üretim Konularını Yapılandırma: Kontrol Listesi
# Script di creazione topic production-ready con tutti i parametri
create_production_topic() {
TOPIC=$1
PARTITIONS=$2
RETENTION_DAYS=$3
RETENTION_MS=$((RETENTION_DAYS * 24 * 3600 * 1000))
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic $TOPIC \
--partitions $PARTITIONS \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=$RETENTION_MS \
--config compression.type=snappy \
--config max.message.bytes=10485760 \ # 10 MB max message
--if-not-exists
echo "Topic $TOPIC creato: $PARTITIONS partizioni, RF=3, MIR=2, retention=${RETENTION_DAYS}d"
}
# Crea i topic principali
create_production_topic ordini-effettuati 12 7
create_production_topic pagamenti-confermati 12 30
create_production_topic audit-trail 3 1095 # 3 anni
create_production_topic ordini-effettuati.DLT 3 30 # DLQ
# Verifica di tutti i topic
kafka-topics.sh --list --bootstrap-server kafka1:9092
# Verifica configurazione di un topic specifico
kafka-configs.sh --bootstrap-server kafka1:9092 \
--entity-type topics \
--entity-name ordini-effettuati \
--describe
Strimzi Operatörü ile Kubernetes üzerinde Kafka
Kubernetes'te dağıtım için, Strimzi resmi referans operatörüdür (CNCF sanal alanının bir parçası). Kafka kümelerinin oluşturulmasını, güncellenmesini ve yapılandırılmasını yönetir Kubernetes Özel Kaynak Tanımları (CRD) aracılığıyla.
# kafka-cluster.yaml - Strimzi KafkaNodePool + Kafka CRD
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker
namespace: kafka
labels:
strimzi.io/cluster: production-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: persistent-claim
size: 2Ti # 2 TB per broker
class: fast-ssd
resources:
requests:
memory: 32Gi
cpu: "4"
limits:
memory: 64Gi
cpu: "8"
jvmOptions:
-Xms: 6144m
-Xmx: 6144m
gcLoggingEnabled: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: production-cluster
namespace: kafka
spec:
kafka:
version: 4.0.0
metadataVersion: "4.0"
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
auto.create.topics.enable: "false"
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.retention.hours: 168
compression.type: snappy
zookeeper:
replicas: 0 # KRaft: niente ZooKeeper in Kafka 4.0
Performans Ayarlama: Her Şeyi Değiştiren Yapılandırmalar
Üretimde Yaygın Anti-Desenler
- hız için acks=0: sıfır dayanıklılık garantisi, aracının arızalanması durumunda mesajlar sessizce kaybolur
- çoğaltma.faktörü=1: artıklık yok, aracının kaybı verileri yok eder
- auto.create.topics.enable=true: yazım hatası nedeniyle oluşturulan konular, kontrol edilmesi imkansız
- JVM yığını > 8 GB: komisyoncunun performansını düşüren GC'nin uzun duraklamalarına neden olur
- Çok az bölüm: paralellik darboğazı kesinti olmadan çözülmesi imkansız
# Configurazioni producer per massimo throughput (batch processing)
props.put("batch.size", 65536); # 64 KB per batch
props.put("linger.ms", 20); # Aspetta 20ms per riempire il batch
props.put("compression.type", "snappy"); # Snappy: buon bilanciamento velocita/ratio
props.put("buffer.memory", 67108864); # 64 MB buffer totale
# Configurazioni consumer per massimo throughput
props.put("fetch.min.bytes", 65536); # 64 KB fetch minimo
props.put("fetch.max.wait.ms", 500); # Aspetta max 500ms se non ci sono dati
props.put("max.partition.fetch.bytes", 10485760); # 10 MB per fetch per partizione
props.put("max.poll.records", 500); # 500 record per poll()
# Per sistemi a bassa latenza (trading, real-time alerts):
props.put("linger.ms", 0); # Invia immediatamente
props.put("batch.size", 1); # Nessun batching
props.put("acks", "1"); # Solo leader ack (senza aspettare ISR)
Özet: Üretimde Kafka için Kontrol Listesi
- Minimum 3 aracılı KRaft kümesi (yüksek kullanılabilirlik için 5+)
replication.factor=3,min.insync.replicas=2tüm kritik konulardaauto.create.topics.enable=false: Konuları açıkça yönetin- JVM yığını 6-8 GB, G1GC yapılandırılmış, sayfa önbelleği için kalan RAM
- İşletim sisteminden ayrı olarak ayrılmış diskte (NVMe SSD) oturum açın
- İzleme: JMX Exporter + Prometheus + Grafana (bkz. Madde 9)
- Uyarı açık: çevrimdışı bölümler, az çoğaltılmış bölümler, tüketici gecikmesi
- Her tüketici grubu için yapılandırılmış DLQ (bkz. Madde 10)
- İkincil veri merkezinde coğrafi çoğaltma ve DR için MirrorMaker 2
- Düzenli olarak belgelenen ve test edilen sürekli yeniden başlatma prosedürü
- Kapasite planlaması büyüme trendlerine göre üç ayda bir gözden geçiriliyor
Serinin Sonu: Sonraki Adımlar
Apache Kafka serisinin tamamını tamamladınız. İlgili konular hakkında daha fazla bilgiyi şu şekilde edinebilirsiniz:
- Olay Odaklı Mimari (Seri 39): Kafka'yı EDA kalıplarının omurgası olarak uygulayın, Karmaşık mikro hizmet sistemlerinde Saga, CQRS ve Outbox.
- PostgreSQL AI ve Debezium CDC: Değişiklikleri yaymak için Kafka'yı bir boru hattı olarak kullanın veritabanından alt hizmetlere gerçek zamanlı olarak.
Diğer Serilerle Bağlantı
- Olay Odaklı Mimari – Saga Deseni ve CQRS: Bir mesaj komisyoncusu olarak Kafka Saga Pattern ve CQRS projeksiyonlarını mikro hizmet sistemlerinde uygulamak.
- Prometheus ve Grafana ile İzleme (Madde 9): Sahip olduğunuz küme metrikleri Bu kılavuzda yapılandırılanlar, JMX Exporter aracılığıyla kullanıma sunulan ve Grafana'da görüntülenenlerdir.







