Dlaczego Kafka różni się od tradycyjnej kody

Projektując system rozproszony, który musi obsługiwać strumienie zdarzeń w czasie rzeczywistym, pierwszą pokusą jest zastosowanie klasycznej kolejki komunikatów jak RabbitMQ lub ActiveMQ. Rozwiązania te sprawdzają się dobrze w prostych scenariuszach, ale mają kluczowe ograniczenie strukturalne: raz konsument zużył wiadomość, wiadomość zostanie usunięta. Nie ma możliwości jego ponownego odczytania, posiadania większej liczby niezależnych konsumentów, którzy go przetwarzają inaczej, albo odtworzyć całą historię wydarzeń.

Apacz Kafka urodził się w 2011 roku na LinkedIn z radykalnie inną filozofią: wiadomości (tzw nagrywać) są zapisane w a log tylko do dołączenia i pozostawać tam przez konfigurowalny okres (domyślnie: 7 dni). Różni konsumenci mogą czasami czytać te same zapisy różne, każdy śledzi swoją pozycję za pośrednictwemzrównoważyć. Ten wzór sprawia, że Kafka staje się czymś więcej niż kolejką: staje się źródło prawdy dla całej historii zdarzeń w Twoim systemie.

Kafka w 2026 r.: Kluczowe liczby

  • Używany przez ponad 80% firm z listy Fortune 500 w zakresie zastosowań związanych z przesyłaniem strumieniowym
  • Kafka 4.0 (marzec 2025) trwale usunęła ZooKeeper i przeniosła się do KRaft
  • Teoretyczna przepustowość: Ponad 1 milion wiadomości/sekundę dla brokerów (sprzęt towarowy)
  • Confluent Cloud: zarządzana platforma Kafka dostępna na AWS, GCP, Azure z opóźnieniem < 10 ms p99
  • Ekosystem: ponad 200 złączy za pośrednictwem Kafka Connect, Kafka Streams, integracja Apache Flink

Model podstawowy: broker, temat i partycja

Broker: węzeł klastra

Un pośrednik to po prostu serwer Kafki. Klaster Kafki składa się z jednego lub większej liczby brokerów, z których każdy jest identyfikowany przez a broker.id unikalny. W środowisku produkcyjnym zwykle wykorzystuje się 3, 6 lub 9 brokerów, aby zagwarantować odporność na awarie. Brokerzy zajmują się pisaniem oraz odczytywanie rekordów, utrzymywanie dzienników na dysku i replikowanie między węzłami.

Dzięki Kafce 4.0 i nowemu sposobowi KRaft jeden lub więcej brokerów przejmuje również rolę kontroler, zarządzanie metadanymi klastra (kto jest liderem której partycji, którzy brokerzy są aktywni itp.) za pośrednictwem wewnętrznego dziennika konsensusu Raft. Nie ma już takiej potrzeby odrębnego zespołu ZooKeeper.

Temat: Logiczna kategoria rekordów

Un temat to logiczna nazwa, pod którą producenci publikują nagrania, a konsumenci je czytają. Można o tym myśleć jako o kanale tematycznym: ordini-effettuati, pagamenti-confermati, eventi-utente. Każdy temat ma swoją własną konfigurację przechowywania, liczba partycji, współczynnik replikacji i zasady kompaktowania.

Tematy są podzielony: każdy temat jest podzielony na N partycji fizycznych, rozdzielonych pomiędzy brokerami. To jest ta dystrybucja co sprawia, że Kafka jest skalowalny w poziomie zarówno do pisania, jak i czytania.

Podział: jedność równoległości i porządku

Una przegroda jest to schludny i niezmienny dziennik przeznaczony tylko do dodawania. Każdy rekord zapisany na partycji otrzymuje plik a zrównoważyć monotonicznie rosnące (0, 1, 2, ...). Sortowanie jest gwarantowane wewnątrz przegrody, a nie pomiędzy różnymi partycjami.

Rozkład rekordów pomiędzy partycjami jest określony przez klucz partycji: Jeśli producent określi klucz, plik record zawsze trafia do tej samej partycji (skrót modułu klucza liczby partycji), gwarantując sortowanie dla tego klucza. Jeśli brakuje klucza, Kafka używa lepkiej strategii okrężnej (rekordy wsadowe na tej samej partycji przed rotacją).

# Creare un topic con 6 partizioni e replication factor 3
# (Kafka 4.0 con KRaft, niente --zookeeper flag)
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

# Descrivere il topic per verificare la distribuzione
kafka-topics.sh --describe \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati

Wyjście --describe pokazuje dla każdej partycji: brokera wiodącego, repliki i repliki synchronizowane (ISR — Repliki synchronizowane). ISR to repliki, które odtworzyły wszystkie rekordy lidera: jeśli lider upadnie, może zostać tylko jeden ISR wybrany na nowego lidera, zapewniając brak utraty danych.

Producent: Pisanie płyt w Kafce

Il producent jest to komponent publikujący rekordy dotyczące tematów. Konfiguracja producenta określa gwarancje dostawy. Najbardziej krytyczne właściwości to:

  • bootstrap.servers: lista brokerów dla pierwszego połączenia (klient automatycznie wykrywa pozostałych brokerów)
  • key.serializer e value.serializer: Jak serializować klucz i wartość (StringSerializer, AvroSerializer itp.)
  • acks: ile potwierdzeń odpowiedzi należy odczekać, zanim zapis zostanie uznany za pomyślny (0, 1, all)
  • retries: liczba prób w przypadku tymczasowego błędu
  • linger.ms: milisekundy oczekiwania przed wysłaniem partii (zwiększa przepustowość kosztem opóźnień)
  • batch.size: maksymalny rozmiar partii w bajtach (domyślnie: 16 KB)
// Producer Java con configurazione production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class OrdineProducer {

    public static KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        // Garanzie di consegna: all = acks da tutti le ISR
        props.put("acks", "all");
        // Retry automatico con backoff
        props.put("retries", 3);
        props.put("retry.backoff.ms", 100);
        // Batching per throughput
        props.put("linger.ms", 5);
        props.put("batch.size", 32768);   // 32KB
        // Compressione: riduce I/O di rete del 60-80%
        props.put("compression.type", "snappy");
        // Idempotenza: evita duplicati in caso di retry
        props.put("enable.idempotence", true);

        return new KafkaProducer<>(props);
    }

    public static void inviaOrdine(KafkaProducer<String, String> producer,
                                    String ordineId, String payload) {
        // La chiave (ordineId) determina la partizione target
        ProducerRecord<String, String> record =
            new ProducerRecord<>("ordini-effettuati", ordineId, payload);

        // Invio asincrono con callback
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Errore invio: " + exception.getMessage());
            } else {
                System.out.printf("Record inviato: topic=%s, partizione=%d, offset=%d%n",
                    metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }
}

Uwaga: potwierdzenia i przepustowość

Zwiększanie gwarancji ma swoją cenę: z acks=all e min.insync.replicas=2, producent oczekuje co najmniej 2 odpowiedzi zapisałeś protokół przed kontynuowaniem. Zwiększa to opóźnienie (zwykle dodatkowe 1–5 ms), ale także zapewnia brak utraty danych jeśli broker spadnie natychmiast po potwierdzeniu. W przypadku systemów analitycznych, które tolerują pewne straty, acks=1 o acks=0 oferują znacznie większą przepustowość.

Konsument: czytanie zapisów Kafki

Pętla odpytywania

Konsument Kafki używa szablonu ciągnąć: nie odbiera wiadomości push, ale aktywnie żąda ich od brokera za pośrednictwem połączeń poll(). Taka konstrukcja gwarantuje, że konsument nie zostanie przytłoczony ilością wiadomości przekraczającą jego możliwości przetwarzania.

// Consumer Java base con gestione degli offset manuale
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class OrdineConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("group.id", "servizio-inventario");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        // Comportamento alla prima lettura (nessun offset salvato per il gruppo)
        // "earliest" = dall'inizio; "latest" = solo nuovi messaggi
        props.put("auto.offset.reset", "earliest");

        // Disabilitiamo il commit automatico per controllo preciso
        props.put("enable.auto.commit", false);

        // Timeout max per il join al consumer group (default: 45s)
        props.put("session.timeout.ms", 30000);

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(List.of("ordini-effettuati"));

            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Offset: %d | Partizione: %d | Chiave: %s%n",
                        record.offset(), record.partition(), record.key());

                    // Elabora il record...
                    elaboraOrdine(record.value());
                }

                // Commit manuale DOPO l'elaborazione
                // Garantisce at-least-once semantics
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        }
    }

    private static void elaboraOrdine(String payload) {
        // logica di business...
    }
}

Grupa konsumentów: mechanizm skalowania

Il grupa konsumencka jest to podstawowy mechanizm równoległego skalowania konsumpcji. Wszyscy konsumenci udostępniają to samo group.id należą one do tej samej grupy i dzielą działy tematyczne. Zasada jest prosta: każda partycja może być przypisana tylko do jednego konsumenta na grupę w danym momencie.

Oznacza to, że maksymalna liczba równoległych odbiorców w grupie jest równa liczbie partycji. Jeśli masz 6 partycji i uruchamiasz 6 konsumentów w tej samej grupie, każdy otrzymuje dokładnie 1 partycję. Jeśli uruchomisz siódmego konsumenta, pozostanie on w stanie uśpienia w trybie gotowości (przydatne do szybkiego przełączania awaryjnego).

Grupa konsumentów: scenariusze skalowania

  • 1 konsument, 6 partycji → konsument przetwarza wszystko, bez równoległości
  • 3 konsumentów, 6 partycji → każdy konsument zarządza 2 partycjami równolegle
  • 6 odbiorców, 6 partycji → maksymalna równoległość, 1 partycja na konsumenta
  • 9 odbiorców, 6 partycji → 6 aktywnych, 3 w trybie gotowości do przełączania awaryjnego
  • Dwie różne grupy, ten sam temat → każda grupa niezależnie otrzymuje WSZYSTKIE wiadomości

Przesunięcia: mechanizm śledzenia pozycji

L'zrównoważyć jest liczbą całkowitą, która jednoznacznie identyfikuje lokalizację rekordu w partycji. Broker przypisuje przesunięcie sekwencyjnie do każdego zapisanego rekordu: pierwszy rekord ma przesunięcie 0, drugi 1 i tak dalej.

Grupa konsumencka przechowuje własne produkty dokonane przesunięcie — tj. przesunięcie ostatniego pomyślnie przetworzonego rekordu — w specjalnym wewnętrznym temacie Kafki o nazwie __consumer_offsets. Jest to punkt wyjścia w przypadku ponownego uruchomienia lub przełączenie awaryjne konsumenta.

Zrozumienie różnicy między tymi przesunięciami ma kluczowe znaczenie dla obsługi błędów:

  • Przesunięcie końca kłody (LEO): przesunięcie następnego rekordu, który zostanie zapisany w dzienniku (pozycja głowicy)
  • Wysoki znak wodny (HW): przesunięcie ostatniego rekordu replikowanego we wszystkich ISR (konsument widzi tylko rekordy ≤ HW)
  • Bieżące przesunięcie: przesunięcie następnego rekordu, który konsument przeczyta w następnym wywołaniu poll().
  • Zaangażowane przesunięcie: przesunięcie zapisane w temacie __consumer_offsets (z którego można ponownie uruchomić po awarii)
  • Opóźnienie konsumenckie: Różnica pomiędzy LEO i Committed Offset wskazuje, ile rekordów konsument musi jeszcze przetworzyć
# Controllare il consumer lag di un gruppo
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --describe \
  --group servizio-inventario

# Output tipico:
# GROUP              TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# servizio-inventario ordini-effettuati    0          1250            1280            30
# servizio-inventario ordini-effettuati    1          890             890             0
# servizio-inventario ordini-effettuati    2          2100            2105            5

# Resettare gli offset al principio (per reprocessing)
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group servizio-inventario \
  --topic ordini-effettuati \
  --reset-offsets --to-earliest \
  --execute

Replikacja: trwałość i tolerancja na błędy

Każda partycja ma lider i zero lub więcej świta (repliki). Producenci i konsumenci komunikują się zawsze z liderem. Obserwujący replikują dane od lidera w sposób asynchroniczny, ale ogólnie szybki.

Zbiór zwolenników, którzy są „wystarczająco zaktualizowani”, aby stać się liderami, tworzyISR (repliki synchronizowane). Obserwujący jest usuwany z ISR, jeśli pozostaje w tyle za więcej niż replica.lag.time.max.ms milisekundy (domyślnie: 30 s). Kiedy lider upadnie, kontroler Kafki wybiera na nowego lidera zwolennika z najwyższym przesunięciem wśród ISR.

Połączenie replication.factor e min.insync.replicas definiuje kompromis między trwałością a dostępnością:

# Configurazione consigliata per produzione
# replication.factor=3 significa: 1 leader + 2 follower

# topic-level overrides
kafka-topics.sh --alter \
  --bootstrap-server kafka1:9092 \
  --topic ordini-effettuati \
  --config min.insync.replicas=2

# Con questa configurazione:
# - acks=all: producer aspetta conferma da leader + 1 follower minimo
# - Se 2 broker su 3 sono down: il cluster rifiuta scritture (ma no data loss)
# - Se solo 1 broker è down: il cluster continua normalmente

# broker-level defaults in server.properties
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3

Polityka przechowywania: jak długo dane pozostają

Rekordy w Kafce są usuwane zgodnie z polityką, którą można skonfigurować dla poszczególnych tematów. Istnieją dwa główne tryby:

  • Przechowywanie oparte na czasie (retention.ms): Rekordy są usuwane po upływie określonego czasu od ich znacznika czasu. Wartość domyślna: 604800000ms = 7 dni. W przypadku tematów krytycznych, takich jak dzienniki audytu, ustawiane są znacznie wyższe wartości (lata).
  • Retencja oparta na rozmiarze (retention.bytes): dziennik na partycję nie przekracza określonego rozmiaru. Gdy rozmiar przekroczy limit, starsze segmenty zostaną usunięte.
  • Zagęszczanie kłód (cleanup.policy=compact): zamiast usuwać według czasu/rozmiaru, Kafka zachowuje tylko ostatni rekord dla każdego klucza. Idealny do tematów dotyczących stanu (takich jak tabele bazy danych replikowane za pośrednictwem CDC).
# Configurare retention per diversi use case

# Topic eventi real-time: retention breve, alta velocità
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic click-stream \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=3600000 \   # 1 ora
  --config retention.bytes=1073741824  # 1GB per partizione

# Topic log audit: retention lunga per compliance
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic audit-log \
  --partitions 3 \
  --replication-factor 3 \
  --config retention.ms=31536000000 \  # 1 anno
  --config compression.type=gzip

# Topic di stato con log compaction (es. profili utente)
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5

Python konsumencki: praktyczny przykład

Ekosystem Kafka obsługuje wiele języków. Oto przykład Pythona wykorzystujący bibliotekę confluent-kafka (oficjalne wiązanie Confluent oparte na librdkafce, znacznie wydajniejsze niż kafka-python):

# pip install confluent-kafka

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import signal
import sys

TOPIC = "ordini-effettuati"
GROUP_ID = "servizio-analytics-py"

config = {
    "bootstrap.servers": "kafka1:9092,kafka2:9092",
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "session.timeout.ms": 30000,
    "max.poll.interval.ms": 300000,  # 5 minuti per elaborazioni lente
}

consumer = Consumer(config)
running = True

def graceful_shutdown(signum, frame):
    global running
    running = False

signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)

try:
    consumer.subscribe([TOPIC])
    print(f"Consumer avviato, gruppo: {GROUP_ID}")

    while running:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Raggiunta la fine della partizione, aspetta nuovi messaggi
                print(f"Raggiunto EOF: {msg.topic()} [{msg.partition()}] offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            ordine = json.loads(msg.value().decode("utf-8"))
            print(f"Ricevuto ordine {ordine['id']} da partizione {msg.partition()}")

            # Elabora l'ordine...
            elabora_ordine(ordine)

            # Commit manuale dopo elaborazione riuscita
            consumer.commit(asynchronous=False)

finally:
    consumer.close()
    print("Consumer chiuso correttamente")

def elabora_ordine(ordine):
    # Logica di business...
    pass

Zalecana architektura: ile partycji?

Jednym z najczęstszych pytań dla osób zaczynających od Kafki jest: ile partycji utworzyć dla danego tematu? Odpowiedź zależy od kilku czynników:

  • Maksymalna równoległość grupy konsumentów: liczba partycji to maksymalna liczba równoległych odbiorców. Oszacuj, ilu konsumentów spodziewasz się mieć w godzinach szczytu.
  • Docelowa przepustowość: Każda partycja zazwyczaj obsługuje zapis z szybkością 10–50 MB/s (w zależności od dysku). Podziel całkowitą przepustowość przez tę liczbę, aby uzyskać minimalną liczbę potrzebnych partycji.
  • Sortowanie: jeśli chcesz zagwarantować zamówienie na konkretny klucz (np. wszystkie zdarzenia tego samego klienta), ten klient zawsze znajdzie się na tej samej partycji. Więcej partycji = lepszy rozkład obciążenia dla różnych kluczy.
  • Narzut pamięci: Każda partycja wymaga pamięci w brokerze (narzut ~1-2 MB). Przy łącznej liczbie partycji 100 000, zaczyna to zbierać żniwo.

Praktyczna zasada partycji

Przybliżona formuła: max(throughput_MB_s / 10, consumer_max_paralleli). W przypadku większości zastosowań 6, 12 lub 24 partycje to rozsądne wartości. Kafka pozwala później rozwijać partycje, ale aby ich nie umniejszać: Planuj z niewielkim marginesem.

Zagęszczanie dziennika: przypadek użycia tematów stanowych

La zagęszczanie kłód to zaawansowana funkcja Kafki, która całkowicie zmienia semantykę przechowywanie: zamiast usuwać rekordy według czasu lub rozmiaru, Kafka przechowuje tylkoostatni rekord dla każdego klucza. Wszystkie starsze rekordy z tym samym kluczem są usuwane podczas procesu kompaktowania.

Dzięki temu zagęszczone tematy idealnie nadają się do reprezentowania stan obecny podmiotów: profile użytkowników, aktualne ceny, konfiguracje systemów, stany magazynowe. Konsument łączący się z tematem zagęszczony po raz pierwszy, może zrekonstruować pełny stan, czytając wszystkie obecne zapisy (po jednym na klucz), bez konieczności czytania całej historii zdarzeń.

Rekord z wartością null („zapis nagrobny”) to sposób na usunięcie klucza z tematu zagęszczony: po zagęszczeniu sam klucz również znika z kłody.

# Creare un topic con log compaction
kafka-topics.sh --create \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --partitions 6 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config segment.ms=86400000 \
  --config delete.retention.ms=86400000

# cleanup.policy=compact: abilita compaction
# min.cleanable.dirty.ratio=0.5: compatta quando >50% del log e' "dirty"
# segment.ms=86400000: crea un nuovo segmento ogni 24h
# delete.retention.ms: quanto tenere i tombstone record prima di eliminarli

# Inviare un aggiornamento profilo (chiave = userId)
kafka-console-producer.sh \
  --bootstrap-server kafka1:9092 \
  --topic profili-utente \
  --property parse.key=true \
  --property key.separator=:
# Digita: user123:{"nome":"Mario","email":"mario@example.com","eta":30}
# Digita: user456:{"nome":"Anna","email":"anna@example.com","eta":25}
# Digita: user123:{"nome":"Mario","email":"mario.rossi@example.com","eta":31}
# Dopo compaction, nel topic rimane solo l'ultima riga per user123

Temat wewnętrzny Kafki: __consumer_offsets i __transaction_state

Kafka wewnętrznie używa specjalnych tematów do zarządzania swoim stanem. Znajomość ich pomaga zrozumieć, jak działają systemu i rozwiązywania problemów:

  • __consumer_offsets: przechowuje zatwierdzone przesunięcia każdej grupy konsumentów. Domyślnie ma 50 partycji (offsets.topic.num.partitions). Grupa konsumentów jest przypisana na partycję poprzez skrót group.id. Jeśli w tym temacie występują problemy z replikacją, grupy konsumentów nie dokonują offsetów.
  • __transaction_state: zarządza statusem trwających transakcji. Używany przez transakcyjny interfejs API platformy Kafka w celu zagwarantowania semantyki dokładnie jednorazowej. Domyślnie ma 50 partycji.
  • @metadata (Tylko KRaft): Dziennik metadanych kontrolera kworum. Zawiera wszystkie metadane klastra (tematy, partycje, brokerzy, listy ACL, konfiguracje). Dostępne tylko wewnętrznie dla kontrolerów.
# Ispezionare il topic __consumer_offsets (advanced troubleshooting)
# ATTENZIONE: operazione read-only, non modificare mai questi topic

kafka-console-consumer.sh \
  --bootstrap-server kafka1:9092 \
  --topic __consumer_offsets \
  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
  --from-beginning \
  --max-messages 20

# Output esempio:
# [servizio-inventario,ordini-effettuati,0]::OffsetAndMetadata(offset=1250, ...)
# [servizio-inventario,ordini-effettuati,1]::OffsetAndMetadata(offset=890, ...)

# Elencare tutti i consumer group attivi
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --list

# Dettaglio di un gruppo specifico
kafka-consumer-groups.sh \
  --bootstrap-server kafka1:9092 \
  --group servizio-inventario \
  --describe \
  --state  # include stato del gruppo (Stable, Rebalancing, Empty, Dead)

Nagłówek wiadomości i sygnatura czasowa

Każdy rekord Kafki ma precyzyjną strukturę:

  • Klawisz (opcjonalnie): określa partycję docelową, serializowaną w bajtach
  • Wartość: ładunek wiadomości serializowany w bajtach
  • Znacznik czasu: czas utworzenia po stronie producenta (CreateTime) lub pozyskiwanie po stronie brokera (LogAppendTime), konfigurowalne
  • Nagłówki: pary klucz-wartość dla metadanych (identyfikator korelacji, typ zdarzenia, wersja schematu itp.)
  • Partycja + przesunięcie: przydzielony przez brokera w momencie pisania tego tekstu
// Aggiungere headers a un ProducerRecord Java
ProducerRecord<String, String> record = new ProducerRecord<>(
    "ordini-effettuati",
    ordineId,
    payload
);

// Headers per tracciabilità e versioning
record.headers()
    .add("correlation-id", UUID.randomUUID().toString().getBytes())
    .add("schema-version", "2".getBytes())
    .add("source-service", "checkout-service".getBytes())
    .add("event-type", "OrdineCreato".getBytes());

producer.send(record);

Docker Compose: Szybki start dla lokalnego rozwoju

Aby rozpocząć lokalne eksperymenty z Kafką bez zajmowania się złożonymi konfiguracjami, najszybszym sposobem jest użycie Docker Compose z oficjalnym obrazem Apache Kafka 4.0:

# docker-compose.yml minimale per sviluppo locale (single-node KRaft)
version: "3.9"
services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka-local
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://kafka-local:9092,CONTROLLER://kafka-local:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-local:9093"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      CLUSTER_ID: "local-dev-cluster-id-001"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

# Avvio:
# docker-compose up -d
#
# Verifica:
# docker exec kafka-local kafka-topics.sh --bootstrap-server localhost:9092 --list

Podsumowanie: Podstawowe pojęcia

Po zbadaniu wewnętrznej struktury Kafki, oto podsumowanie kluczowych pojęć do zapamiętania:

  • Pośrednik: węzeł klastra, zarządza dziennikami dyskowymi i replikacją
  • Tematy: logiczna kategoria rekordów podzielona na partycje
  • Przegroda: posortowany dziennik tylko do dołączania, jednostka równoległości; zamówienie gwarantowane tylko wewnątrz
  • Zrównoważyć: progresywna pozycja każdego rekordu w partycji
  • Grupa Konsumencka: mechanizm skalowania; każda partycja przypisana tylko do jednego konsumenta na grupę
  • ISR: zestaw zaktualizowanych replik, z których w przypadku błędu wybierany jest nowy lider
  • Opóźnienie konsumenckie: krytyczny wskaźnik kondycji, różnica między LEO a zadeklarowanym przesunięciem
  • Zatrzymanie: Rekordy pozostają konfigurowalne, nie są usuwane po zużyciu

Kolejne kroki w serii

Teraz, gdy masz już solidne podstawy, w kolejnych artykułach z tej serii omówione zostaną bardziej zaawansowane aspekty:

  • Artykuł 2 – KRaft w Kafce 4.0: jak nowy kontroler działa bez ZooKeepera, proces migracji z Kafki 3.x i korzyści operacyjne na produkcji.
  • Artykuł 3 – Zaawansowany producent i konsument: szczegółowa konfiguracja acks, retries, max.in.flight.requests i idempotentnemu producentowi w celu uzyskania dokładnie jednorazowych gwarancji na poziomie producenta.
  • Artykuł 4 – Semantyka dokładnie raz: Transakcje Kafki dla zapisów atomowych na wiele tematów, koordynatora transakcji i wpływ na przepustowość.

Połącz z innymi seriami

  • Obserwowalność i OpenTelemetry: Jak instrumentować aplikację Kafka za pomocą OpenTelemetry śledzenie rozprzestrzeniania się zdarzeń między producentami i konsumentami.
  • Inżynieria Platformy: Kafka jako podstawowy element wewnętrznej platformy programistycznej do komunikacji między zespołami opartej na zdarzeniach.
  • Sztuczna inteligencja PostgreSQL: Wzorzec CDC (Change Data Capture) z Debezium do synchronizacji PostgreSQL do Kafki w czasie rzeczywistym, temat artykułu 7 z tej serii.