Tři záruky dodání

Než se pustíte do nastavení, je důležité porozumět třem režimům doručení, které Kafka podporuje a jejich praktický význam:

  • Maximálně jednou: Zpráva je odeslána jednou, bez opakování. Pokud jej makléř neobdrží, je trvale ztracena. Žádné duplikáty, možná ztráta dat. Přijatelné pro nekritické metriky, protokoly ladění, události typu click-stream, kde je ztráta některých událostí tolerovatelná.
  • Alespoň jednou: Pokud dojde k chybě, výrobce to zkusí znovu. Zpráva dorazí alespoň jednou, může však dorazit vícekrát (duplicitně), pokud jej broker obdržel, ale neposlal potvrzení před vypršením časového limitu. Spotřebitel musí být idempotentní ke zpracování duplikátů. Nejběžnější scénář ve výrobě.
  • Přesně - jednou: Zpráva je zpracována přesně jednou, bez ztráty a bez duplicit. Vyžaduje idempotentního výrobce (pro stranu výrobce přesně jednou) nebo transakce Kafka (přesně jednou end-to-end mezi výrobcem a spotřebitelem). Značná režie výkonu.

Zlaté pravidlo

V distribuovaném systému je nemožné zaručit přesně jednou bez režie. 90 % systémů Kafka v americké produkci alespoň jednou s idempotentními spotřebiteli (deduplikace na straně spotřebitele přes databázi nebo mezipaměť). Transakce Exactly-Once přes Kafka se používá pro finanční kanály, účtování duplikace události způsobí skutečné škody.

Parametr acks: Kolik potvrzení čekat

Parametr acks výrobce definuje, kolik replik musí nejprve potvrdit příjem že výrobce považuje žádost za dokončenou:

acks=0 (Ohni a zapomeň)

Producent odešle záznam a nečeká na žádnou odpověď makléře. Maximální propustnost, minimální latence, ale žádná záruka: pokud je broker mimo provoz nebo je záznam zapsán na brokera, který poté padne jako první odpovědět, záznam je ztracen. Sémantika je maximálně jednou.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1 (pouze vedoucí)

Producent čeká na potvrzení pouze od hlavního brokera oddílu. Následovníci možná ještě nemají replikoval záznam, když přijde potvrzení. Pokud vedoucí spadne ihned po odeslání potvrzení ale než sledující odpoví, záznam je ztracen. Sémantika alespoň jednou s rizikem ztráty dat ve velmi krátkých chybových oknech.

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=all (nebo acks=-1): Všechny ISR

Producent čeká, až bude deska napsána na všech ISR (In-Sync Replicas) oddílu. S min.insync.replicas=2 e replication.factor=3, to znamená, že minimálně 2 odpovědi (vedoucí + 1 sledující) musí potvrdit. Teprve poté obdrží výrobce potvrzení. Sémantika alespoň jednou minimálně do doby bez ztráty dat min.insync.replicas makléři jsou aktivní.

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

Problém duplikace s opakováním

Zvažte tento scénář s acks=all e retries=3:

  1. Producent odešle záznam R1 hlavnímu makléři
  2. Broker zapíše R1 na disk a pošle producentovi potvrzení
  3. Potvrzení je ztraceno (vyprší časový limit sítě, než se dostane k producentovi)
  4. Producent, který nedostal potvrzení, vstupuje request.timeout.ms, si myslí, že psaní selhalo
  5. Výrobce to zkusí znovu a pošle R1 znovu
  6. Broker obdrží R1 podruhé a zapíše jej jako samostatný záznam
  7. Téma nyní obsahuje duplicitní R1

Toto je správné chování alespoň jednou: každé údaje dorazí alespoň jednou, ale mohou přijít duplikáty. Chcete-li odstranit duplikáty na úrovni výrobce, použijteidempotentní výrobce.

Idempotentní producent

Idempotentní producent, představený v Kafka 0.11 (2017), eliminuje duplikáty způsobené opakováním producenta. Mechanismus je založen na dvou konceptech:

  • ID producenta (PID): Když se idempotentní producent připojí, broker přidělí jedinečný PID. PID přetrvává po dobu životnosti výrobce; pokud se výrobce restartuje, získá nový PID.
  • pořadové číslo: každý odeslaný záznam nese monotónně rostoucí pořadové číslo (0, 1, 2, ...). Broker sleduje poslední přijaté pořadové číslo pro každý PID+podsystém. Pokud dorazí záznam s již viděným pořadovým číslem se tiše zahodí (deduplikace na straně brokera).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

Limit idempotentního výrobce

Idempotentní producent zajišťuje, že se objeví každá deska přesně jednou v deníku Kafka pro aktuální relaci producenta. Pokud se výrobce restartuje (nové PID), záznam za letu dalo by se to přepsat. A především: nezaručuje nic o spotřebitelském zpracování. Pokud spotřebitel zpracuje záznam a poté dojde k selhání před provedením posunu při příštím spuštění znovu zpracuje stejný záznam. Pro přesně jednou end-to-end potřebujete transakční API.

max.požadavek.za letu na.připojení a řazení

Parametr max.in.flight.requests.per.connection (MIFR) řídí počet požadavků na výrobu mohou současně létat k jednomu brokerovi. Má zásadní vliv na třídění zpráv v případě opakování:

  • MIFR=1: každý požadavek musí být potvrzen před odesláním dalšího. zaručené třídění, ale snížená propustnost (žádné potrubí).
  • MIFR > 1 bez idempotence: zřetězení aktivní, vyšší propustnost, ale pokud dávka N selže a dávka N-1 je již v letu, po opětovném pokusu N se záznamy objeví v pořadí N-1, N, kdy by N musel předcházet N-1. Třídění již není zaručeno.
  • MIFR ≤ 5 s idempotencí: s enable.idempotence=true, garantuje Kafka řazení i s až 5 požadavky za letu, díky pořadovým číslům. Je to výchozí hodnota když je povolena idempotence a maximální podpora pro zachování záruky.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

Kompletní konfigurace producenta pro produkci

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

Pokročilý spotřebitel: Závazek a opětovné zpracování

Auto-commit: Jednoduché, ale riskantní

Con enable.auto.commit=true (výchozí), Kafka automaticky potvrdí offset každý auto.commit.interval.ms milisekundy (výchozí: 5000 ms = 5 sekund). problém: potvrzení probíhá na pozadí bez ohledu na to, kdy jsou záznamy skutečně zpracovávány.

Problematický scénář s automatickým potvrzením:

  1. Poll vrátí 100 záznamů s offsetem 1-100
  2. Spotřebitel začne zpracovávat záznamy
  3. Po 5 sekundách se spustí časovač: offset 100 je automaticky potvrzen
  4. Spotřebitel se zhroutí po zpracování pouze záznamů 1-60
  5. Při restartu spotřebitel začíná na offsetu 100: záznamy 61-100 byly přeskočeny (ztráta dat)

Synchronní ruční zavázání

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

Závazek pro oddíl (jemná zrnitost)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

Rebalance: Jak to funguje a jak to řídit

Un rebalancovat nastane, když se skupina spotřebitelů změní: spotřebitel vstoupí, odejde nebo se zhroutí. Během rebalance, všichni spotřebitelé ve skupině přestanou číst (stop-the-world) a koordinátor skupiny znovu přiřadí oddíly. V posledních verzích Kafka (2.4+) je k dispozici a rebalanční družstva (přírůstkové kooperativní vyvažování), které snižuje dopad: pouze oddíly, které mění příjemce, jsou odvolány a znovu přiřazeny.

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

Spotřebitel v Pythonu s pokročilou správou offsetů

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

Ladění výkonu: Klíčové parametry

Strana výrobce

Parametr Výchozí Účinek
linger.ms 0 Čeká N ms na nashromáždění větších dávek
batch.size 16384 (16 kB) Maximální velikost dávky na oddíl
buffer.memory 33554432 (32 MB) Celková vyrovnávací paměť v paměti pro všechny dávky
compression.type není none/gzip/snappy/lz4/zstd

Spotřebitelská strana

Parametr Výchozí Účinek
fetch.min.bytes 1 Minimální velikost dat pro návrat z načtení
fetch.max.wait.ms 500 Maximální čekání, pokud není dosaženo fetch.min.bytes
max.poll.records 500 Maximální počet záznamů pro jednu anketu()
max.partition.fetch.bytes 1048576 (1 MB) Maximální počet dat na oddíl a načtení

Testování s EmbeddedKafka

Testovací kód, který používá Kafka, vyžaduje dostupný cluster. U jednotkových a integračních testů Jarní Kafka dodává @EmbeddedKafka který během testování spustí in-memory broker:

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Pro testování s Testcontainers (realističtější, použijte skutečného brokera Docker):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

Další kroky v sérii

Díky pokročilým výrobcům a spotřebitelům se můžete vypořádat s nejsložitější výzvou:

  • Článek 4 – Sémantika přesně jednou: Zabezpečení transakcí Kafka přesně end-to-end zpracování s transakčním koordinátorem a dopady na propustnost. Nepostradatelné pro finanční kanály.
  • Článek 5 – Registrační systém: Jak používat Avro a Protobuf se Schema Registry aby se zabránilo nekompatibilitě schémat mezi výrobci a spotřebiteli v různých týmech.
  • Článek 6 – Kafkovy proudy: vestavěné zpracování datových proudů v Javě pomocí streamů DSL, který interně používá stejná rozhraní API pro producenty a spotřebitele, jaká byla popsána v tomto článku.

Propojení s ostatními sériemi

  • Architektura (od mikroslužeb k modulárnímu monolitu): Kafkovští výrobci a spotřebitelé jako konkrétní implementace událostmi řízených vzorů v distribuovaných architekturách.
  • Pokročilá Java: Ponořte se do Kafkova Java API s bezpečností vláken, správa životního cyklu a testování s EmbeddedKafka.