Podstawy Apache Kafka: tematy, partycje, przesunięcia i grupy konsumentów
Kafka nie jest po prostu kolejką komunikatów: jest to rozproszony dziennik zatwierdzeń zaprojektowany do obsługi milionów zdarzeń na sekundę z gwarantowaną trwałością. W tym podstawowym przewodniku odkryj wewnętrzną strukturę tematów i partycji oraz sposób działania przesunięć śledzenia dokładnej lokalizacji i dlaczego grupa konsumentów jest kluczowym mechanizmem równoległego skalowania konsumpcji.
Dlaczego Kafka różni się od tradycyjnej kody
Projektując system rozproszony, który musi obsługiwać strumienie zdarzeń w czasie rzeczywistym, pierwszą pokusą jest zastosowanie klasycznej kolejki komunikatów jak RabbitMQ lub ActiveMQ. Rozwiązania te sprawdzają się dobrze w prostych scenariuszach, ale mają kluczowe ograniczenie strukturalne: raz konsument zużył wiadomość, wiadomość zostanie usunięta. Nie ma możliwości jego ponownego odczytania, posiadania większej liczby niezależnych konsumentów, którzy go przetwarzają inaczej, albo odtworzyć całą historię wydarzeń.
Apacz Kafka urodził się w 2011 roku na LinkedIn z radykalnie inną filozofią: wiadomości (tzw nagrywać) są zapisane w a log tylko do dołączenia i pozostawać tam przez konfigurowalny okres (domyślnie: 7 dni). Różni konsumenci mogą czasami czytać te same zapisy różne, każdy śledzi swoją pozycję za pośrednictwemzrównoważyć. Ten wzór sprawia, że Kafka staje się czymś więcej niż kolejką: staje się źródło prawdy dla całej historii zdarzeń w Twoim systemie.
Kafka w 2026 r.: Kluczowe liczby
- Używany przez ponad 80% firm z listy Fortune 500 w zakresie zastosowań związanych z przesyłaniem strumieniowym
- Kafka 4.0 (marzec 2025) trwale usunęła ZooKeeper i przeniosła się do KRaft
- Teoretyczna przepustowość: Ponad 1 milion wiadomości/sekundę dla brokerów (sprzęt towarowy)
- Confluent Cloud: zarządzana platforma Kafka dostępna na AWS, GCP, Azure z opóźnieniem < 10 ms p99
- Ekosystem: ponad 200 złączy za pośrednictwem Kafka Connect, Kafka Streams, integracja Apache Flink
Model podstawowy: broker, temat i partycja
Broker: węzeł klastra
Un pośrednik to po prostu serwer Kafki. Klaster Kafki składa się z jednego lub większej liczby brokerów, z których każdy jest identyfikowany przez a
broker.id unikalny. W środowisku produkcyjnym zwykle wykorzystuje się 3, 6 lub 9 brokerów, aby zagwarantować odporność na awarie. Brokerzy zajmują się pisaniem
oraz odczytywanie rekordów, utrzymywanie dzienników na dysku i replikowanie między węzłami.
Dzięki Kafce 4.0 i nowemu sposobowi KRaft jeden lub więcej brokerów przejmuje również rolę kontroler, zarządzanie metadanymi klastra (kto jest liderem której partycji, którzy brokerzy są aktywni itp.) za pośrednictwem wewnętrznego dziennika konsensusu Raft. Nie ma już takiej potrzeby odrębnego zespołu ZooKeeper.
Temat: Logiczna kategoria rekordów
Un temat to logiczna nazwa, pod którą producenci publikują nagrania, a konsumenci je czytają. Można o tym myśleć jako o kanale tematycznym:
ordini-effettuati, pagamenti-confermati, eventi-utente. Każdy temat ma swoją własną konfigurację przechowywania,
liczba partycji, współczynnik replikacji i zasady kompaktowania.
Tematy są podzielony: każdy temat jest podzielony na N partycji fizycznych, rozdzielonych pomiędzy brokerami. To jest ta dystrybucja co sprawia, że Kafka jest skalowalny w poziomie zarówno do pisania, jak i czytania.
Podział: jedność równoległości i porządku
Una przegroda jest to schludny i niezmienny dziennik przeznaczony tylko do dodawania. Każdy rekord zapisany na partycji otrzymuje plik a zrównoważyć monotonicznie rosnące (0, 1, 2, ...). Sortowanie jest gwarantowane wewnątrz przegrody, a nie pomiędzy różnymi partycjami.
Rozkład rekordów pomiędzy partycjami jest określony przez klucz partycji: Jeśli producent określi klucz, plik record zawsze trafia do tej samej partycji (skrót modułu klucza liczby partycji), gwarantując sortowanie dla tego klucza. Jeśli brakuje klucza, Kafka używa lepkiej strategii okrężnej (rekordy wsadowe na tej samej partycji przed rotacją).
# Creare un topic con 6 partizioni e replication factor 3
# (Kafka 4.0 con KRaft, niente --zookeeper flag)
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
# Descrivere il topic per verificare la distribuzione
kafka-topics.sh --describe \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati
Wyjście --describe pokazuje dla każdej partycji: brokera wiodącego, repliki i repliki synchronizowane (ISR —
Repliki synchronizowane). ISR to repliki, które odtworzyły wszystkie rekordy lidera: jeśli lider upadnie, może zostać tylko jeden ISR
wybrany na nowego lidera, zapewniając brak utraty danych.
Producent: Pisanie płyt w Kafce
Il producent jest to komponent publikujący rekordy dotyczące tematów. Konfiguracja producenta określa gwarancje dostawy. Najbardziej krytyczne właściwości to:
bootstrap.servers: lista brokerów dla pierwszego połączenia (klient automatycznie wykrywa pozostałych brokerów)key.serializerevalue.serializer: Jak serializować klucz i wartość (StringSerializer, AvroSerializer itp.)acks: ile potwierdzeń odpowiedzi należy odczekać, zanim zapis zostanie uznany za pomyślny (0,1,all)retries: liczba prób w przypadku tymczasowego błędulinger.ms: milisekundy oczekiwania przed wysłaniem partii (zwiększa przepustowość kosztem opóźnień)batch.size: maksymalny rozmiar partii w bajtach (domyślnie: 16 KB)
// Producer Java con configurazione production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class OrdineProducer {
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// Garanzie di consegna: all = acks da tutti le ISR
props.put("acks", "all");
// Retry automatico con backoff
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Batching per throughput
props.put("linger.ms", 5);
props.put("batch.size", 32768); // 32KB
// Compressione: riduce I/O di rete del 60-80%
props.put("compression.type", "snappy");
// Idempotenza: evita duplicati in caso di retry
props.put("enable.idempotence", true);
return new KafkaProducer<>(props);
}
public static void inviaOrdine(KafkaProducer<String, String> producer,
String ordineId, String payload) {
// La chiave (ordineId) determina la partizione target
ProducerRecord<String, String> record =
new ProducerRecord<>("ordini-effettuati", ordineId, payload);
// Invio asincrono con callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Errore invio: " + exception.getMessage());
} else {
System.out.printf("Record inviato: topic=%s, partizione=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
}
}
Uwaga: potwierdzenia i przepustowość
Zwiększanie gwarancji ma swoją cenę: z acks=all e min.insync.replicas=2, producent oczekuje co najmniej 2 odpowiedzi
zapisałeś protokół przed kontynuowaniem. Zwiększa to opóźnienie (zwykle dodatkowe 1–5 ms), ale także zapewnia brak utraty danych
jeśli broker spadnie natychmiast po potwierdzeniu. W przypadku systemów analitycznych, które tolerują pewne straty, acks=1 o acks=0
oferują znacznie większą przepustowość.
Konsument: czytanie zapisów Kafki
Pętla odpytywania
Konsument Kafki używa szablonu ciągnąć: nie odbiera wiadomości push, ale aktywnie żąda ich od brokera za pośrednictwem połączeń
poll(). Taka konstrukcja gwarantuje, że konsument nie zostanie przytłoczony ilością wiadomości przekraczającą jego możliwości
przetwarzania.
// Consumer Java base con gestione degli offset manuale
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class OrdineConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("group.id", "servizio-inventario");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// Comportamento alla prima lettura (nessun offset salvato per il gruppo)
// "earliest" = dall'inizio; "latest" = solo nuovi messaggi
props.put("auto.offset.reset", "earliest");
// Disabilitiamo il commit automatico per controllo preciso
props.put("enable.auto.commit", false);
// Timeout max per il join al consumer group (default: 45s)
props.put("session.timeout.ms", 30000);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d | Partizione: %d | Chiave: %s%n",
record.offset(), record.partition(), record.key());
// Elabora il record...
elaboraOrdine(record.value());
}
// Commit manuale DOPO l'elaborazione
// Garantisce at-least-once semantics
if (!records.isEmpty()) {
consumer.commitSync();
}
}
}
}
private static void elaboraOrdine(String payload) {
// logica di business...
}
}
Grupa konsumentów: mechanizm skalowania
Il grupa konsumencka jest to podstawowy mechanizm równoległego skalowania konsumpcji. Wszyscy konsumenci udostępniają
to samo group.id należą one do tej samej grupy i dzielą działy tematyczne. Zasada jest prosta:
każda partycja może być przypisana tylko do jednego konsumenta na grupę w danym momencie.
Oznacza to, że maksymalna liczba równoległych odbiorców w grupie jest równa liczbie partycji. Jeśli masz 6 partycji i uruchamiasz 6 konsumentów w tej samej grupie, każdy otrzymuje dokładnie 1 partycję. Jeśli uruchomisz siódmego konsumenta, pozostanie on w stanie uśpienia w trybie gotowości (przydatne do szybkiego przełączania awaryjnego).
Grupa konsumentów: scenariusze skalowania
- 1 konsument, 6 partycji → konsument przetwarza wszystko, bez równoległości
- 3 konsumentów, 6 partycji → każdy konsument zarządza 2 partycjami równolegle
- 6 odbiorców, 6 partycji → maksymalna równoległość, 1 partycja na konsumenta
- 9 odbiorców, 6 partycji → 6 aktywnych, 3 w trybie gotowości do przełączania awaryjnego
- Dwie różne grupy, ten sam temat → każda grupa niezależnie otrzymuje WSZYSTKIE wiadomości
Przesunięcia: mechanizm śledzenia pozycji
L'zrównoważyć jest liczbą całkowitą, która jednoznacznie identyfikuje lokalizację rekordu w partycji. Broker przypisuje przesunięcie sekwencyjnie do każdego zapisanego rekordu: pierwszy rekord ma przesunięcie 0, drugi 1 i tak dalej.
Grupa konsumencka przechowuje własne produkty dokonane przesunięcie — tj. przesunięcie ostatniego pomyślnie przetworzonego rekordu —
w specjalnym wewnętrznym temacie Kafki o nazwie __consumer_offsets. Jest to punkt wyjścia w przypadku ponownego uruchomienia
lub przełączenie awaryjne konsumenta.
Zrozumienie różnicy między tymi przesunięciami ma kluczowe znaczenie dla obsługi błędów:
- Przesunięcie końca kłody (LEO): przesunięcie następnego rekordu, który zostanie zapisany w dzienniku (pozycja głowicy)
- Wysoki znak wodny (HW): przesunięcie ostatniego rekordu replikowanego we wszystkich ISR (konsument widzi tylko rekordy ≤ HW)
- Bieżące przesunięcie: przesunięcie następnego rekordu, który konsument przeczyta w następnym wywołaniu poll().
- Zaangażowane przesunięcie: przesunięcie zapisane w temacie
__consumer_offsets(z którego można ponownie uruchomić po awarii) - Opóźnienie konsumenckie: Różnica pomiędzy LEO i Committed Offset wskazuje, ile rekordów konsument musi jeszcze przetworzyć
# Controllare il consumer lag di un gruppo
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--describe \
--group servizio-inventario
# Output tipico:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# servizio-inventario ordini-effettuati 0 1250 1280 30
# servizio-inventario ordini-effettuati 1 890 890 0
# servizio-inventario ordini-effettuati 2 2100 2105 5
# Resettare gli offset al principio (per reprocessing)
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group servizio-inventario \
--topic ordini-effettuati \
--reset-offsets --to-earliest \
--execute
Replikacja: trwałość i tolerancja na błędy
Każda partycja ma lider i zero lub więcej świta (repliki). Producenci i konsumenci komunikują się zawsze z liderem. Obserwujący replikują dane od lidera w sposób asynchroniczny, ale ogólnie szybki.
Zbiór zwolenników, którzy są „wystarczająco zaktualizowani”, aby stać się liderami, tworzyISR (repliki synchronizowane).
Obserwujący jest usuwany z ISR, jeśli pozostaje w tyle za więcej niż replica.lag.time.max.ms milisekundy (domyślnie: 30 s).
Kiedy lider upadnie, kontroler Kafki wybiera na nowego lidera zwolennika z najwyższym przesunięciem wśród ISR.
Połączenie replication.factor e min.insync.replicas definiuje kompromis między trwałością a dostępnością:
# Configurazione consigliata per produzione
# replication.factor=3 significa: 1 leader + 2 follower
# topic-level overrides
kafka-topics.sh --alter \
--bootstrap-server kafka1:9092 \
--topic ordini-effettuati \
--config min.insync.replicas=2
# Con questa configurazione:
# - acks=all: producer aspetta conferma da leader + 1 follower minimo
# - Se 2 broker su 3 sono down: il cluster rifiuta scritture (ma no data loss)
# - Se solo 1 broker è down: il cluster continua normalmente
# broker-level defaults in server.properties
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
Polityka przechowywania: jak długo dane pozostają
Rekordy w Kafce są usuwane zgodnie z polityką, którą można skonfigurować dla poszczególnych tematów. Istnieją dwa główne tryby:
-
Przechowywanie oparte na czasie (
retention.ms): Rekordy są usuwane po upływie określonego czasu od ich znacznika czasu. Wartość domyślna: 604800000ms = 7 dni. W przypadku tematów krytycznych, takich jak dzienniki audytu, ustawiane są znacznie wyższe wartości (lata). -
Retencja oparta na rozmiarze (
retention.bytes): dziennik na partycję nie przekracza określonego rozmiaru. Gdy rozmiar przekroczy limit, starsze segmenty zostaną usunięte. -
Zagęszczanie kłód (
cleanup.policy=compact): zamiast usuwać według czasu/rozmiaru, Kafka zachowuje tylko ostatni rekord dla każdego klucza. Idealny do tematów dotyczących stanu (takich jak tabele bazy danych replikowane za pośrednictwem CDC).
# Configurare retention per diversi use case
# Topic eventi real-time: retention breve, alta velocità
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic click-stream \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=3600000 \ # 1 ora
--config retention.bytes=1073741824 # 1GB per partizione
# Topic log audit: retention lunga per compliance
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic audit-log \
--partitions 3 \
--replication-factor 3 \
--config retention.ms=31536000000 \ # 1 anno
--config compression.type=gzip
# Topic di stato con log compaction (es. profili utente)
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5
Python konsumencki: praktyczny przykład
Ekosystem Kafka obsługuje wiele języków. Oto przykład Pythona wykorzystujący bibliotekę confluent-kafka
(oficjalne wiązanie Confluent oparte na librdkafce, znacznie wydajniejsze niż kafka-python):
# pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys
TOPIC = "ordini-effettuati"
GROUP_ID = "servizio-analytics-py"
config = {
"bootstrap.servers": "kafka1:9092,kafka2:9092",
"group.id": GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000, # 5 minuti per elaborazioni lente
}
consumer = Consumer(config)
running = True
def graceful_shutdown(signum, frame):
global running
running = False
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
try:
consumer.subscribe([TOPIC])
print(f"Consumer avviato, gruppo: {GROUP_ID}")
while running:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Raggiunta la fine della partizione, aspetta nuovi messaggi
print(f"Raggiunto EOF: {msg.topic()} [{msg.partition()}] offset {msg.offset()}")
elif msg.error():
raise KafkaException(msg.error())
else:
ordine = json.loads(msg.value().decode("utf-8"))
print(f"Ricevuto ordine {ordine['id']} da partizione {msg.partition()}")
# Elabora l'ordine...
elabora_ordine(ordine)
# Commit manuale dopo elaborazione riuscita
consumer.commit(asynchronous=False)
finally:
consumer.close()
print("Consumer chiuso correttamente")
def elabora_ordine(ordine):
# Logica di business...
pass
Zalecana architektura: ile partycji?
Jednym z najczęstszych pytań dla osób zaczynających od Kafki jest: ile partycji utworzyć dla danego tematu? Odpowiedź zależy od kilku czynników:
- Maksymalna równoległość grupy konsumentów: liczba partycji to maksymalna liczba równoległych odbiorców. Oszacuj, ilu konsumentów spodziewasz się mieć w godzinach szczytu.
- Docelowa przepustowość: Każda partycja zazwyczaj obsługuje zapis z szybkością 10–50 MB/s (w zależności od dysku). Podziel całkowitą przepustowość przez tę liczbę, aby uzyskać minimalną liczbę potrzebnych partycji.
- Sortowanie: jeśli chcesz zagwarantować zamówienie na konkretny klucz (np. wszystkie zdarzenia tego samego klienta), ten klient zawsze znajdzie się na tej samej partycji. Więcej partycji = lepszy rozkład obciążenia dla różnych kluczy.
- Narzut pamięci: Każda partycja wymaga pamięci w brokerze (narzut ~1-2 MB). Przy łącznej liczbie partycji 100 000, zaczyna to zbierać żniwo.
Praktyczna zasada partycji
Przybliżona formuła: max(throughput_MB_s / 10, consumer_max_paralleli). W przypadku większości zastosowań
6, 12 lub 24 partycje to rozsądne wartości. Kafka pozwala później rozwijać partycje, ale
aby ich nie umniejszać: Planuj z niewielkim marginesem.
Zagęszczanie dziennika: przypadek użycia tematów stanowych
La zagęszczanie kłód to zaawansowana funkcja Kafki, która całkowicie zmienia semantykę przechowywanie: zamiast usuwać rekordy według czasu lub rozmiaru, Kafka przechowuje tylkoostatni rekord dla każdego klucza. Wszystkie starsze rekordy z tym samym kluczem są usuwane podczas procesu kompaktowania.
Dzięki temu zagęszczone tematy idealnie nadają się do reprezentowania stan obecny podmiotów: profile użytkowników, aktualne ceny, konfiguracje systemów, stany magazynowe. Konsument łączący się z tematem zagęszczony po raz pierwszy, może zrekonstruować pełny stan, czytając wszystkie obecne zapisy (po jednym na klucz), bez konieczności czytania całej historii zdarzeń.
Rekord z wartością null („zapis nagrobny”) to sposób na usunięcie klucza z tematu
zagęszczony: po zagęszczeniu sam klucz również znika z kłody.
# Creare un topic con log compaction
kafka-topics.sh --create \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config segment.ms=86400000 \
--config delete.retention.ms=86400000
# cleanup.policy=compact: abilita compaction
# min.cleanable.dirty.ratio=0.5: compatta quando >50% del log e' "dirty"
# segment.ms=86400000: crea un nuovo segmento ogni 24h
# delete.retention.ms: quanto tenere i tombstone record prima di eliminarli
# Inviare un aggiornamento profilo (chiave = userId)
kafka-console-producer.sh \
--bootstrap-server kafka1:9092 \
--topic profili-utente \
--property parse.key=true \
--property key.separator=:
# Digita: user123:{"nome":"Mario","email":"mario@example.com","eta":30}
# Digita: user456:{"nome":"Anna","email":"anna@example.com","eta":25}
# Digita: user123:{"nome":"Mario","email":"mario.rossi@example.com","eta":31}
# Dopo compaction, nel topic rimane solo l'ultima riga per user123
Temat wewnętrzny Kafki: __consumer_offsets i __transaction_state
Kafka wewnętrznie używa specjalnych tematów do zarządzania swoim stanem. Znajomość ich pomaga zrozumieć, jak działają systemu i rozwiązywania problemów:
-
__consumer_offsets: przechowuje zatwierdzone przesunięcia każdej grupy konsumentów. Domyślnie ma 50 partycji (offsets.topic.num.partitions). Grupa konsumentów jest przypisana na partycję poprzez skrót group.id. Jeśli w tym temacie występują problemy z replikacją, grupy konsumentów nie dokonują offsetów. -
__transaction_state: zarządza statusem trwających transakcji. Używany przez transakcyjny interfejs API platformy Kafka w celu zagwarantowania semantyki dokładnie jednorazowej. Domyślnie ma 50 partycji. -
@metadata(Tylko KRaft): Dziennik metadanych kontrolera kworum. Zawiera wszystkie metadane klastra (tematy, partycje, brokerzy, listy ACL, konfiguracje). Dostępne tylko wewnętrznie dla kontrolerów.
# Ispezionare il topic __consumer_offsets (advanced troubleshooting)
# ATTENZIONE: operazione read-only, non modificare mai questi topic
kafka-console-consumer.sh \
--bootstrap-server kafka1:9092 \
--topic __consumer_offsets \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
--from-beginning \
--max-messages 20
# Output esempio:
# [servizio-inventario,ordini-effettuati,0]::OffsetAndMetadata(offset=1250, ...)
# [servizio-inventario,ordini-effettuati,1]::OffsetAndMetadata(offset=890, ...)
# Elencare tutti i consumer group attivi
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--list
# Dettaglio di un gruppo specifico
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group servizio-inventario \
--describe \
--state # include stato del gruppo (Stable, Rebalancing, Empty, Dead)
Nagłówek wiadomości i sygnatura czasowa
Każdy rekord Kafki ma precyzyjną strukturę:
- Klawisz (opcjonalnie): określa partycję docelową, serializowaną w bajtach
- Wartość: ładunek wiadomości serializowany w bajtach
- Znacznik czasu: czas utworzenia po stronie producenta (
CreateTime) lub pozyskiwanie po stronie brokera (LogAppendTime), konfigurowalne - Nagłówki: pary klucz-wartość dla metadanych (identyfikator korelacji, typ zdarzenia, wersja schematu itp.)
- Partycja + przesunięcie: przydzielony przez brokera w momencie pisania tego tekstu
// Aggiungere headers a un ProducerRecord Java
ProducerRecord<String, String> record = new ProducerRecord<>(
"ordini-effettuati",
ordineId,
payload
);
// Headers per tracciabilità e versioning
record.headers()
.add("correlation-id", UUID.randomUUID().toString().getBytes())
.add("schema-version", "2".getBytes())
.add("source-service", "checkout-service".getBytes())
.add("event-type", "OrdineCreato".getBytes());
producer.send(record);
Docker Compose: Szybki start dla lokalnego rozwoju
Aby rozpocząć lokalne eksperymenty z Kafką bez zajmowania się złożonymi konfiguracjami, najszybszym sposobem jest użycie Docker Compose z oficjalnym obrazem Apache Kafka 4.0:
# docker-compose.yml minimale per sviluppo locale (single-node KRaft)
version: "3.9"
services:
kafka:
image: apache/kafka:4.0.0
container_name: kafka-local
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://kafka-local:9092,CONTROLLER://kafka-local:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-local:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
CLUSTER_ID: "local-dev-cluster-id-001"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# Avvio:
# docker-compose up -d
#
# Verifica:
# docker exec kafka-local kafka-topics.sh --bootstrap-server localhost:9092 --list
Podsumowanie: Podstawowe pojęcia
Po zbadaniu wewnętrznej struktury Kafki, oto podsumowanie kluczowych pojęć do zapamiętania:
- Pośrednik: węzeł klastra, zarządza dziennikami dyskowymi i replikacją
- Tematy: logiczna kategoria rekordów podzielona na partycje
- Przegroda: posortowany dziennik tylko do dołączania, jednostka równoległości; zamówienie gwarantowane tylko wewnątrz
- Zrównoważyć: progresywna pozycja każdego rekordu w partycji
- Grupa Konsumencka: mechanizm skalowania; każda partycja przypisana tylko do jednego konsumenta na grupę
- ISR: zestaw zaktualizowanych replik, z których w przypadku błędu wybierany jest nowy lider
- Opóźnienie konsumenckie: krytyczny wskaźnik kondycji, różnica między LEO a zadeklarowanym przesunięciem
- Zatrzymanie: Rekordy pozostają konfigurowalne, nie są usuwane po zużyciu
Kolejne kroki w serii
Teraz, gdy masz już solidne podstawy, w kolejnych artykułach z tej serii omówione zostaną bardziej zaawansowane aspekty:
- Artykuł 2 – KRaft w Kafce 4.0: jak nowy kontroler działa bez ZooKeepera, proces migracji z Kafki 3.x i korzyści operacyjne na produkcji.
-
Artykuł 3 – Zaawansowany producent i konsument: szczegółowa konfiguracja
acks,retries,max.in.flight.requestsi idempotentnemu producentowi w celu uzyskania dokładnie jednorazowych gwarancji na poziomie producenta. - Artykuł 4 – Semantyka dokładnie raz: Transakcje Kafki dla zapisów atomowych na wiele tematów, koordynatora transakcji i wpływ na przepustowość.
Połącz z innymi seriami
- Obserwowalność i OpenTelemetry: Jak instrumentować aplikację Kafka za pomocą OpenTelemetry śledzenie rozprzestrzeniania się zdarzeń między producentami i konsumentami.
- Inżynieria Platformy: Kafka jako podstawowy element wewnętrznej platformy programistycznej do komunikacji między zespołami opartej na zdarzeniach.
- Sztuczna inteligencja PostgreSQL: Wzorzec CDC (Change Data Capture) z Debezium do synchronizacji PostgreSQL do Kafki w czasie rzeczywistym, temat artykułu 7 z tej serii.







