Apache Kafka Temelleri: Konular, Bölümler, Uzaklıklar ve Tüketici Grupları
Kafka yalnızca bir mesaj kuyruğu değildir: dağıtılmış taahhüt günlüğü Saniyede milyonlarca olayı sürdürmek için tasarlandı garantili dayanıklılık ile. Bu temel kılavuzda konuların ve bölümlerin iç yapısını ve izleme farklarının nasıl çalıştığını keşfedin kesin konum ve neden tüketici grubunun paralel olarak tüketimi ölçeklendirmenin temel mekanizması olduğu.
Kafka Geleneksel Coda'dan Neden Farklıdır?
Olay akışlarını gerçek zamanlı olarak yönetmesi gereken dağıtılmış bir sistem tasarlarken ilk akla gelen klasik mesaj kuyruğunu kullanmaktır. RabbitMQ veya ActiveMQ gibi. Bu çözümler basit senaryolar için iyi çalışır ancak önemli bir yapısal sınırlamaya sahiptir: Tüketici bir kez mesaj tüketildiğinde mesaj silinir. Onu yeniden okumanın, onu işleyen daha fazla bağımsız tüketiciye sahip olmanın imkânı yok farklı şekilde veya olayların tüm geçmişini tekrar oynatmak için.
Apaçi Kafka 2011 yılında LinkedIn'de tamamen farklı bir felsefeyle doğdu: mesajlar ( kayıt) bir şekilde yazılır yalnızca günlük ekleme ve yapılandırılabilir bir süre boyunca orada kalır (varsayılan: 7 gün). Farklı tüketiciler aynı kayıtları aynı anda okuyabilir farklı, her biri konumunu takip ediyortelafi etmek. Bu kalıp Kafka'yı bir kuyruktan çok daha fazlasına dönüştürüyor: olur gerçeğin kaynağı sisteminizin tüm olay geçmişi için.
2026'da Kafka: Anahtar Sayılar
- Üzeri için kullanılır %80 Akış kullanım senaryoları için Fortune 500 şirketlerinin listesi
- Kafka 4.0 (Mart 2025) ZooKeeper'ı kalıcı olarak kaldırdı ve şuraya taşındı: KRaft
- Teorik verim: 1 milyondan fazla mesaj/saniye komisyoncular için (emtia donanımı)
- Confluent Cloud: yönetilen Kafka, AWS, GCP ve Azure'da < 10 ms gecikmeyle kullanılabilir p99
- Ekosistem: Kafka Connect, Kafka Streams, Apache Flink entegrasyonu aracılığıyla 200'den fazla bağlayıcı
Temel Model: Komisyoncu, Konu ve Bölüm
Komisyoncu: Küme Düğümü
Un komisyoncu bu sadece bir Kafka sunucusu. Bir Kafka kümesi, her biri bir anahtar tarafından tanımlanan bir veya daha fazla aracıdan oluşur.
broker.id eşsiz. Üretimde hata toleransını garanti etmek için genellikle 3, 6 veya 9 aracı kullanılır. Brokerlar yazı işleri ile ilgileniyor
ve kayıtları okumak, günlükleri diskte tutmak ve düğümler arasında kopyalama yapmak.
Kafka 4.0 ve yeni KRaft yöntemiyle bir veya daha fazla komisyoncu aynı zamanda denetleyici, küme meta verilerini yönetme (hangi bölümün liderinin kim olduğu, hangi aracıların aktif olduğu vb.) dahili bir Raft konsensüs günlüğü aracılığıyla. Artık gerek yok ayrı bir ZooKeeper topluluğunun.
Konu: Kayıtların Mantıksal Kategorisi
Un başlık üreticilerin kayıtları yayınladığı ve tüketicilerin bunları okuduğu mantıksal addır. Bunu tematik bir kanal olarak düşünebilirsiniz:
ordini-effettuati, pagamenti-confermati, eventi-utente. Her konunun kendi akılda tutma yapılandırması vardır,
bölüm sayısı, çoğaltma faktörü ve sıkıştırma ilkeleri.
Konular bölünmüş: Her konu, aracılar arasında dağıtılan N fiziksel bölüme bölünmüştür. Bu dağıtım bu da Kafka'yı hem yazma hem de okuma için yatay olarak ölçeklenebilir hale getirir.
Bölünme: Paralellik ve Düzenin Birliği
Una bölme düzenli ve değişmez, salt eklemeli bir günlüktür. Bir bölüme yazılan her kayıt bir telafi etmek monoton olarak artan (0, 1, 2, ...). Sıralama garantilidir bölmenin içinde, farklı bölümler arasında değil.
Kayıtların bölümler arasındaki dağılımı şu şekilde belirlenir: bölüm anahtarı: Yapımcı bir anahtar belirtirse kayıt her zaman aynı bölüme gider (bölümlerin anahtar modülü sayısının karması), bu anahtar için sıralamayı garanti eder. Anahtar eksikse, Kafka, yapışkan bir hepsini bir kez deneme stratejisi kullanır (döndürmeden önce aynı bölüme toplu kayıtlar yapar).
# Creare un topic con 6 partizioni e replication factor 3
# (Kafka 4.0 con KRaft, niente --zookeeper flag)
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
# Descrivere il topic per verificare la distribuzione
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati
Çıktısı --describe her bölüm için şunu gösterir: lider aracı, replikalar ve senkronize replikalar (ISR —
Senkronize Kopyalar). ISR'ler liderin tüm kayıtlarını kopyalayan kopyalardır: Lider düşerse yalnızca bir ISR kaydedilebilir
veri kaybı yaşanmamasını sağlayacak şekilde yeni lider olarak seçildi.
Yapımcı: Kafka'da Kayıt Yazmak
Il yapımcı konulara ilişkin kayıtları yayınlayan bileşendir. Üretici konfigürasyonu teslimat garantilerini belirler. En kritik özellikler şunlardır:
bootstrap.servers: ilk bağlantı için aracıların listesi (istemci diğer aracıları otomatik olarak keşfeder)key.serializerevalue.serializer: Anahtar ve değer nasıl serileştirilir (StringSerializer, AvroSerializer, vb.)acks: yazmanın başarılı olduğunu düşünmeden önce kaç yanıt onayının beklenmesi gerektiği (0,1,all)retries: geçici hata durumunda deneme sayısılinger.ms: Bir toplu işi göndermeden önce beklenecek milisaniye (gecikme pahasına verimi artırır)batch.size: bayt cinsinden maksimum toplu iş boyutu (varsayılan: 16KB)
// Producer Java con configurazione production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class OrdineProducer {
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// Garanzie di consegna: all = acks da tutti le ISR
props.put("acks", "all");
// Retry automatico con backoff
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Batching per throughput
props.put("linger.ms", 5);
props.put("batch.size", 32768); // 32KB
// Compressione: riduce I/O di rete del 60-80%
props.put("compression.type", "snappy");
// Idempotenza: evita duplicati in caso di retry
props.put("enable.idempotence", true);
return new KafkaProducer<>(props);
}
public static void inviaOrdine(KafkaProducer<String, String> producer,
String ordineId, String payload) {
// La chiave (ordineId) determina la partizione target
ProducerRecord<String, String> record =
new ProducerRecord<>("ordini-effettuati", ordineId, payload);
// Invio asincrono con callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Errore invio: " + exception.getMessage());
} else {
System.out.printf("Record inviato: topic=%s, partizione=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
}
Dikkat: Acks ve Verim
Garantileri artırmanın bir maliyeti vardır: acks=all e min.insync.replicas=2, yapımcı en az 2 yanıt bekliyor
Devam etmeden önce kaydı yazdım. Bu, gecikmeyi artırır (genellikle 1-5 ms ekstra), ancak aynı zamanda veri kaybı olmamasını da sağlar
bir komisyoncu onaylandıktan hemen sonra düşerse. Bir miktar kaybı tolere edebilen analitik sistemler için, acks=1 o acks=0
çok daha yüksek verim sunarlar.
Tüketici: Kafka'dan Kayıtları Okumak
Oylama Döngüsü
Kafka tüketicisi bir şablon kullanıyor çekmek: Push mesajlarını almaz, ancak çağrılar yoluyla bunları komisyoncudan aktif olarak ister
poll(). Bu tasarım, tüketicinin kapasitesinin üzerinde mesaj yoğunluğuna maruz kalmamasını sağlar
işleme.
// Consumer Java base con gestione degli offset manuale
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class OrdineConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("group.id", "servizio-inventario");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// Comportamento alla prima lettura (nessun offset salvato per il gruppo)
// "earliest" = dall'inizio; "latest" = solo nuovi messaggi
props.put("auto.offset.reset", "earliest");
// Disabilitiamo il commit automatico per controllo preciso
props.put("enable.auto.commit", false);
// Timeout max per il join al consumer group (default: 45s)
props.put("session.timeout.ms", 30000);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d | Partizione: %d | Chiave: %s%n",
record.offset(), record.partition(), record.key());
// Elabora il record...
elaboraOrdine(record.value());
}
// Commit manuale DOPO l'elaborazione
// Garantisce at-least-once semantics
if (!records.isEmpty()) {
consumer.commitSync();
}
}
}
}
private static void elaboraOrdine(String payload) {
// logica di business...
}
}
Tüketici Grubu: Ölçeklendirme Mekanizması
Il tüketici grubu tüketimi paralel olarak ölçeklendirmenin temel mekanizmasıdır. Tüm tüketiciler paylaşıyor
aynı group.id aynı grubun parçasıdırlar ve konu bölümlerine ayrılırlar. Kural basit:
her bölüm aynı anda grup başına yalnızca bir tüketiciye atanabilir.
Bu, bir gruptaki maksimum paralel tüketici sayısının bölüm sayısına eşit olduğu anlamına gelir. 6 bölümünüz varsa ve 6 tüketiciyi aynı grupta başlatırsınız, her biri tam olarak 1 bölüm alır. 7. tüketiciyi başlatırsanız, hareketsiz kalacaktır beklemede (hızlı yük devretme için kullanışlıdır).
Tüketici Grubu: Ölçeklendirme Senaryoları
- 1 tüketici, 6 bölüm → tüketici her şeyi işler, paralellik yoktur
- 3 tüketici, 6 bölüm → her tüketici 2 bölümü paralel olarak yönetir
- 6 tüketici, 6 bölüm → maksimum paralellik, tüketici başına 1 bölüm
- 9 tüketici, 6 bölüm → 6 aktif, 3 tanesi yük devretme için beklemede
- İki farklı grup, aynı konu → her grup TÜM mesajları bağımsız olarak alır
Ofsetler: Pozisyon Takip Mekanizması
L'telafi etmek bir bölümün içindeki bir kaydın konumunu benzersiz şekilde tanımlayan bir tam sayıdır. Aracı, yazılan her kayda sıralı olarak uzaklığı atar: ilk kaydın uzaklığı 0'dır, ikincininki 1'dir ve bu şekilde devam eder.
Tüketici grubu kendi taahhüt edilen mahsup - yani başarıyla işlenen son kaydın mahsup edilmesi -
Kafka'nın özel bir iç konusu olan __consumer_offsets. Yeniden başlatma durumunda başlangıç noktası burasıdır
veya tüketici yük devretmesi.
Bu uzaklıklar arasındaki farkı anlamak hata yönetimi açısından kritik öneme sahiptir:
- Günlük Sonu Uzaklığı (LEO): Günlüğe yazılacak bir sonraki kaydın ofseti (baş konumu)
- Yüksek Filigran (HW): tüm ISR'lerde çoğaltılan son kaydın mahsup değeri (tüketici yalnızca ≤ HW kayıtlarını görür)
- Mevcut Ofset: tüketicinin bir sonraki poll() çağrısında okuyacağı sonraki kaydın uzaklığı
- Taahhüt Edilen Ofset: konuya kaydedilen ofset
__consumer_offsets(bir kazadan sonra yeniden başlatılacak yer) - Tüketici Gecikmesi: LEO ve Taahhüt Edilen Dengeleme arasındaki fark, tüketicinin hala kaç kaydı işlemesi gerektiğini gösterir
# Controllare il consumer lag di un gruppo
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--describe \
--group servizio-inventario
# Output tipico:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# servizio-inventario ordini-effettuati 0 1250 1280 30
# servizio-inventario ordini-effettuati 1 890 890 0
# servizio-inventario ordini-effettuati 2 2100 2105 5
# Resettare gli offset al principio (per reprocessing)
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group servizio-inventario \
--topic ordini-effettuati \
--reset-offsets --to-earliest \
--execute
Çoğaltma: Dayanıklılık ve Hata Toleransı
Her bölümün bir lider ve sıfır veya daha fazla takipçiler (kopyalar). Üreticiler ve tüketiciler iletişim halinde Her zaman liderin yanında. Takipçiler liderden gelen verileri eşzamansız ancak genellikle hızlı bir şekilde kopyalar.
Lider olmak için "yeterince güncellenen" takipçiler kümesi,ISR (Senkronize Kopyalar).
Bir takipçinin birden fazla gerisinde kalması durumunda ISR'den çıkarılır. replica.lag.time.max.ms milisaniye (varsayılan: 30s).
Lider düştüğünde Kafka denetleyicisi ISR'ler arasında en yüksek ofsete sahip takipçiyi yeni lider olarak seçer.
kombinasyonu replication.factor e min.insync.replicas dayanıklılık ve kullanılabilirlik arasındaki dengeyi tanımlar:
# Configurazione consigliata per produzione
# replication.factor=3 significa: 1 leader + 2 follower
# topic-level overrides
kafka-topics.sh --alter \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--config min.insync.replicas=2
# Con questa configurazione:
# - acks=all: producer aspetta conferma da leader + 1 follower minimo
# - Se 2 broker su 3 sono down: il cluster rifiuta scritture (ma no data loss)
# - Se solo 1 broker è down: il cluster continua normalmente
# broker-level defaults in server.properties
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
Saklama Politikası: Veriler ne kadar süreyle kalır?
Kafka'daki kayıtlar konuya göre yapılandırılabilen politikalara göre kaldırılır. İki ana mod vardır:
-
Zamana dayalı saklama (
retention.ms): Kayıtlar zaman damgasından belirli bir süre sonra silinir. Varsayılan: 604800000ms = 7 gün. Denetim logları gibi kritik konular için çok daha yüksek değerler (yıllar) belirlenmektedir. -
Boyuta dayalı saklama (
retention.bytes): bölüm başına günlük belirli bir boyutu aşmaz. Boyut sınırı aştığında eski segmentler silinir. -
Günlük sıkıştırma (
cleanup.policy=compact): zamana/boyuta göre silmek yerine Kafka, her tuş için yalnızca son kayıt. Durum konuları için idealdir (CDC aracılığıyla çoğaltılan veritabanı tabloları gibi).
# Configurare retention per diversi use case
# Topic eventi real-time: retention breve, alta velocità
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic click-stream \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=3600000 \ # 1 ora
--config retention.bytes=1073741824 # 1GB per partizione
# Topic log audit: retention lunga per compliance
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic audit-log \
--partitions 3 \
--replication-factor 3 \
--config retention.ms=31536000000 \ # 1 anno
--config compression.type=gzip
# Topic di stato con log compaction (es. profili utente)
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5
Tüketici Python'u: Pratik Bir Örnek
Kafka ekosistemi birçok dili desteklemektedir. İşte kütüphaneyi kullanan bir Python örneği confluent-kafka
(librdkafka'yı temel alan resmi Confluent bağlaması, kafka-python'dan çok daha performanslı):
# pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys
TOPIC = "ordini-effettuati"
GROUP_ID = "servizio-analytics-py"
config = {
"bootstrap.servers": "kafka1:9092,kafka2:9092",
"group.id": GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000, # 5 minuti per elaborazioni lente
}
consumer = Consumer(config)
running = True
def graceful_shutdown(signum, frame):
global running
running = False
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
try:
consumer.subscribe([TOPIC])
print(f"Consumer avviato, gruppo: {GROUP_ID}")
while running:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Raggiunta la fine della partizione, aspetta nuovi messaggi
print(f"Raggiunto EOF: {msg.topic()} [{msg.partition()}] offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
ordine = json.loads(msg.value().decode("utf-8"))
print(f"Ricevuto ordine {ordine['id']} da partizione {msg.partition()}")
# Elabora l'ordine...
elabora_ordine(ordine)
# Commit manuale dopo elaborazione riuscita
consumer.commit(asynchronous=False)
finally:
consumer.close()
print("Consumer chiuso correttamente")
def elabora_ordine(ordine):
# Logica di business...
pass
Önerilen Mimari: Kaç Bölüm?
Kafka ile başlayanlar için en sık sorulan sorulardan biri şudur: Bir konu için kaç bölüm oluşturulmalıdır? Cevap birkaç faktöre bağlıdır:
- Tüketici grubunun maksimum paralelliği: Bölüm sayısı maksimum paralel tüketici sayısıdır. Zirvede olmasını beklediğiniz tüketici sayısını tahmin edin.
- Hedef aktarım hızı: Her bölüm genellikle 10-50 MB/s yazma hızını işleyebilir (diske bağlıdır). Gereken minimum bölüm sayısını elde etmek için toplam verimi bu rakama bölün.
- Sıralama: Belirli bir anahtar için siparişi garanti etmeniz gerekiyorsa (örneğin, aynı müşterinin tüm etkinlikleri), bu istemci her zaman aynı bölümde yer alacaktır. Daha fazla bölüm = farklı anahtarlar için daha iyi yük dağılımı.
- Bellek yükü: Her bölüm, aracıda bellek gerektirir (~1-2 MB ek yük). Toplam 100K bölümle, bunun bedeli ağır olmaya başlıyor.
Bölmeler İçin Pratik Kural
Yaklaşık bir formül: max(throughput_MB_s / 10, consumer_max_paralleli). Çoğu uygulama için,
6, 12 veya 24 bölüm makul değerlerdir. Kafka, bölümleri daha sonra büyütmenize izin verir, ancak
onları azaltmamak için: Biraz marj bırakarak plan yapın.
Günlük Sıkıştırma: Durum Konuları İçin Kullanım Örneği
La kütük sıkıştırma Kafka'nın anlambilimini tamamen değiştiren gelişmiş bir özelliğidir saklama: Kayıtları zamana veya boyuta göre silmek yerine Kafka yalnızcaher tuş için son kayıt. Sıkıştırma işlemi sırasında aynı anahtara sahip tüm eski kayıtlar silinir.
Bu, sıkıştırılmış konuları temsil etmek için ideal hale getirir. mevcut durum varlıkların sayısı: kullanıcı profilleri, güncel fiyatlar, sistem konfigürasyonları, envanter. Bir konuya bağlanan bir tüketici ilk kez sıkıştırıldığında mevcut tüm kayıtları okuyarak tam durumu yeniden oluşturabilir (anahtar başına bir tane), olayların tüm geçmişini okumak zorunda kalmadan.
Değeri olan bir kayıt null (“mezar taşı kaydı”) bir konunun anahtarını silmenin yoludur
sıkıştırılmış: sıkıştırmadan sonra anahtarın kendisi de günlükten kaybolur.
# Creare un topic con log compaction
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config segment.ms=86400000 \
--config delete.retention.ms=86400000
# cleanup.policy=compact: abilita compaction
# min.cleanable.dirty.ratio=0.5: compatta quando >50% del log e' "dirty"
# segment.ms=86400000: crea un nuovo segmento ogni 24h
# delete.retention.ms: quanto tenere i tombstone record prima di eliminarli
# Inviare un aggiornamento profilo (chiave = userId)
kafka-console-producer.sh \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--property parse.key=true \
--property key.separator=:
# Digita: user123:{"nome":"Mario","email":"mario@example.com","eta":30}
# Digita: user456:{"nome":"Anna","email":"anna@example.com","eta":25}
# Digita: user123:{"nome":"Mario","email":"mario.rossi@example.com","eta":31}
# Dopo compaction, nel topic rimane solo l'ultima riga per user123
Kafka'nın dahili konusu: __consumer_offsets ve __transaction_state
Kafka, durumunu yönetmek için dahili olarak özel konuları kullanır. Onları bilmek nasıl çalıştıklarını anlamanıza yardımcı olur Sistem hakkında bilgi edinmek ve sorun gidermek için:
-
__consumer_offsets: her tüketici grubunun taahhüt edilen ofsetlerini saklar. Varsayılan olarak 50 bölümü vardır (offsets.topic.num.partitions). Tüketici grubu atandı group.id'nin karması yoluyla bir bölüme. Bu konunun çoğaltma sorunları varsa tüketici grupları ofsetleri gerçekleştirmede başarısız olurlar. -
__transaction_state: Devam eden işlemlerin durumunu yönetir. Tam olarak bir kerelik anlambilimi garanti etmek için Kafka işlem API'si tarafından kullanılır. Varsayılan olarak 50 bölümü vardır. -
@metadata(Yalnızca KRaft): Çekirdek denetleyicisi meta veri günlüğü. Tüm küme meta verilerini (konular, bölümler, aracılar, ACL'ler, yapılandırmalar) içerir. Yalnızca denetleyicilerin içinden erişilebilir.
# Ispezionare il topic __consumer_offsets (advanced troubleshooting)
# ATTENZIONE: operazione read-only, non modificare mai questi topic
kafka-console-consumer.sh \
--bootstrap-server kafka1:9092 \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--from-beginning \
--max-messages 20
# Output esempio:
# [servizio-inventario,ordini-effettuati,0]::OffsetAndMetadata(offset=1250, ...)
# [servizio-inventario,ordini-effettuati,1]::OffsetAndMetadata(offset=890, ...)
# Elencare tutti i consumer group attivi
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--list
# Dettaglio di un gruppo specifico
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group servizio-inventario \
--describe \
--state # include stato del gruppo (Stable, Rebalancing, Empty, Dead)
İleti Başlığı ve Zaman Damgası
Her Kafka kaydının kesin bir yapısı vardır:
- Anahtar (isteğe bağlı): bayt cinsinden serileştirilmiş hedef bölümü belirler
- Değer: Bayt cinsinden serileştirilmiş mesaj yükü
- Zaman damgası: üretici tarafında oluşturma süresi (
CreateTime) veya aracı tarafı alımı (LogAppendTime), yapılandırılabilir - Başlıklar: meta veriler için anahtar/değer çiftleri (bağıntı kimliği, etkinlik türü, şema sürümü vb.)
- Bölme + Ofset: yazım sırasında komisyoncu tarafından atanan
// Aggiungere headers a un ProducerRecord Java
ProducerRecord<String, String> record = new ProducerRecord<>(
"ordini-effettuati",
ordineId,
payload
);
// Headers per tracciabilità e versioning
record.headers()
.add("correlation-id", UUID.randomUUID().toString().getBytes())
.add("schema-version", "2".getBytes())
.add("source-service", "checkout-service".getBytes())
.add("event-type", "OrdineCreato".getBytes());
producer.send(record);
Docker Compose: Yerel Kalkınma için Hızlı Başlangıç
Karmaşık konfigürasyonlarla uğraşmadan Kafka'yı yerel olarak denemeye başlamak için, en hızlı yol, Docker Compose'u resmi Apache Kafka 4.0 görüntüsüyle kullanmaktır:
# docker-compose.yml minimale per sviluppo locale (single-node KRaft)
version: "3.9"
services:
kafka:
image: apache/kafka:4.0.0
container_name: kafka-local
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://kafka-local:9092,CONTROLLER://kafka-local:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-local:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
CLUSTER_ID: "local-dev-cluster-id-001"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# Avvio:
# docker-compose up -d
#
# Verifica:
# docker exec kafka-local kafka-topics.sh --bootstrap-server localhost:9092 --list
Özet: Temel Kavramlar
Kafka'nın iç yapısını inceledikten sonra ezberlemeniz gereken temel kavramların bir özetini burada bulabilirsiniz:
- Komisyoncu: küme düğümü, disk günlüklerini ve çoğaltmayı yönetir
- Konular: bölümlere ayrılmış mantıksal kayıt kategorisi
- Bölüm: sıralanmış salt ekleme günlüğü, paralellik birimi; sipariş yalnızca içeride garanti edilir
- Telafi etmek: bir bölümdeki her kaydın aşamalı konumu
- Tüketici Grubu: ölçeklendirme mekanizması; her bölüm grup başına yalnızca bir tüketiciye atanmıştır
- ISR: Bir hata durumunda yeni liderin seçileceği güncellenmiş kopyalar kümesi
- Tüketici Gecikmesi: kritik sağlık göstergesi, LEO ile taahhüt edilen dengeleme arasındaki fark
- Tutulma: Kayıtlar yapılandırılabilir durumda kalır, tüketildikten sonra silinmez
Serideki Sonraki Adımlar
Artık sağlam bir temele sahip olduğunuza göre serideki sonraki makaleler daha ileri düzey konuları ele alacak:
- Madde 2 – Kafka 4.0'da KRaft: yeni denetleyicinin ZooKeeper olmadan nasıl çalıştığı, Kafka 3.x'ten geçiş süreci ve üretimdeki operasyonel faydalar.
-
Madde 3 – İleri Üretici ve Tüketici: ayrıntılı yapılandırması
acks,retries,max.in.flight.requestsve üretici düzeyinde tam olarak bir kerelik garantiler için idempotent üretici. - Madde 4 – Tam Olarak Bir Kez Anlambilimi: Birden fazla konudaki atomik yazılar için Kafka işlemleri, işlem koordinatörü ve verim üzerindeki etkileri.
Diğer Serilerle Bağlantı
- Gözlemlenebilirlik ve Açık Telemetri: OpenTelemetry ile Kafka uygulaması nasıl enstrümanlanır? Üreticiler ve tüketiciler arasındaki olayların yayılmasını izlemek.
- Platform Mühendisliği: Dahili Geliştirici Platformunun temel bileşeni olarak Kafka Ekipler arasında olaya dayalı iletişim için.
- PostgreSQL yapay zekası: PostgreSQL'i senkronize etmek için Debezium ile CDC (Veri Yakalamayı Değiştir) modeli Kafka'ya gerçek zamanlı olarak bu serinin 7. Maddesinin konusu.







