Trzy gwarancje dostawy

Przed przystąpieniem do konfiguracji niezwykle ważne jest zrozumienie trzech trybów dostarczania obsługiwanych przez platformę Kafka i ich praktyczne znaczenie:

  • Co najwyżej raz: Wiadomość zostanie wysłana raz, bez ponawiania. Jeżeli broker go nie otrzyma, zostaje trwale utracony. Zero duplikatów, możliwa utrata danych. Dopuszczalne dla metryk niekrytycznych, dzienniki debugowania, zdarzenia związane ze strumieniem kliknięć, w przypadku których utrata niektórych zdarzeń jest tolerowana.
  • Przynajmniej raz: Producent próbuje ponownie, jeśli wystąpi błąd. Wiadomość dotrze przynajmniej raz, ale może dotrzeć wiele razy (duplikaty), jeśli broker otrzymał je, ale nie wysłał potwierdzenia przed upływem limitu czasu. Konsument musi być idempotentny, aby móc obsługiwać duplikaty. Najczęstszy scenariusz w produkcji.
  • Dokładnie-raz: Wiadomość jest przetwarzana dokładnie raz, bez strat i bez duplikatów. Wymaga idempotentnego producenta (dla strony producenta „dokładnie raz”) lub transakcji Kafki (dla dokładnie raz od początku do końca pomiędzy producentem a konsumentem). Znaczące obciążenie wydajności.

Złota zasada

W systemie rozproszonym nie da się zagwarantować dokładnie raz bez kosztów ogólnych. 90% systemów Kafka w produkcji amerykańskiej przynajmniej raz z idempotentnymi konsumentami (deduplikacja po stronie konsumenta poprzez bazę danych lub pamięć podręczną). Dokładnie jednorazowo transakcje za pośrednictwem Kafki są wykorzystywane do potoków finansowych, rozliczeń systemów lub gdziekolwiek, gdzie powielanie zdarzenia powoduje rzeczywistą szkodę.

Parametr acks: ile potwierdzeń należy czekać

Parametr acks producenta określa, ile replik musi najpierw potwierdzić odbiór aby producent uznał wniosek za spełniony:

acks=0 (Odpal i zapomnij)

Producent wysyła zapis i nie czeka na jakąkolwiek odpowiedź ze strony brokera. Maksymalna przepustowość, minimalne opóźnienia, ale nie ma gwarancji: jeśli broker upadnie lub zapis zostanie zapisany brokerowi, który następnie upadnie jako pierwszy odpowiedzieć, zapis zostanie utracony. Semantyka jest co najwyżej raz.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1 (tylko lider)

Producent czeka jedynie na potwierdzenie od wiodącego brokera partycji. Obserwatorzy mogą jeszcze tego nie mieć replikował rekord po nadejściu potwierdzenia. Jeśli lider upadnie natychmiast po wysłaniu potwierdzenia ale zanim obserwujący odpowiedzą, zapis zostanie utracony. Semantyka przynajmniej raz z ryzykiem utraty danych w bardzo krótkich oknach usterek.

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=all (lub acks=-1): Wszystkie ISR-y

Producent czeka, aż płyta zostanie nagrana na wszystkich płytach ISR (repliki synchronizowane) partycji. Z min.insync.replicas=2 e replication.factor=3, to oznacza przynajmniej to 2 odpowiedzi (lider + 1 obserwujący) muszą potwierdzać. Dopiero wtedy producent otrzymuje potwierdzenie. Semantyka przynajmniej raz bez utraty danych przynajmniej do min.insync.replicas brokerzy są aktywni.

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

Zduplikowany problem z ponowną próbą

Rozważ ten scenariusz z acks=all e retries=3:

  1. Producent wysyła rekord R1 do brokera wiodącego
  2. Broker zapisuje R1 na dysk i wysyła potwierdzenie do producenta
  3. Utracono potwierdzenie (upłynął limit czasu sieci, zanim dotrze do producenta)
  4. Producent nie otrzymawszy potwierdzenia, wchodzi request.timeout.ms, uważa, że ​​pisanie się nie powiodło
  5. Producent próbuje ponownie i ponownie wysyła R1
  6. Broker otrzymuje R1 po raz drugi i zapisuje go jako osobny rekord
  7. Temat zawiera teraz duplikat R1

To jest prawidłowe zachowanie co najmniej raz: każde dane docierają co najmniej raz, ale mogą pojawić się duplikaty. Aby wyeliminować duplikaty na poziomie producenta, użyj metodyidempotentny producent.

Idempotentny producent

Producent idempotentny, wprowadzony w Kafce 0.11 (2017), eliminuje duplikaty spowodowane ponownymi próbami producenta. Mechanizm opiera się na dwóch koncepcjach:

  • Identyfikator producenta (PID): Kiedy idempotentny producent łączy się, broker przypisuje unikalny PID. PID utrzymuje się przez cały okres istnienia producenta; jeśli producent uruchomi się ponownie, otrzyma nowy PID.
  • Numer sekwencyjny: każdy wysłany rekord ma monotonicznie rosnący numer kolejny (0, 1, 2, ...). Broker śledzi ostatni numer sekwencyjny otrzymany dla każdej partycji PID+. Jeśli pojawi się płyta z już widocznym numerem sekwencyjnym, jest on dyskretnie odrzucany (deduplikacja po stronie brokera).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

Limit idempotentnego producenta

Producent idempotentny zapewnia pojawienie się każdego rekordu dokładnie raz w dzienniku Kafki dla bieżącej sesji producenta. Jeśli producent uruchomi się ponownie (nowy PID), rekord w locie można by to napisać od nowa. A przede wszystkim: nie gwarantuje niczego w zakresie przetwarzania konsumenckiego. Jeśli konsument przetwarza rekord, a następnie ulega awarii przed zatwierdzeniem przesunięcia przy następnym uruchomieniu przetworzy ponownie ten sam rekord. Do dokładnie jednorazowej kompleksowej obsługi potrzebujesz transakcyjnego API.

maksymalna liczba żądań.lotu na.połączenie i sortowanie

Parametr max.in.flight.requests.per.connection (MIFR) kontroluje liczbę żądań produkcyjnych mogą latać do jednego brokera w tym samym czasie. Ma to decydujący wpływ na sortowanie wiadomości w przypadku ponownej próby:

  • MIFR=1: każde żądanie musi zostać potwierdzone przed wysłaniem kolejnego. Gwarantowane sortowanie, ale zmniejszona przepustowość (brak potokowania).
  • MIFR > 1 bez idempotencji: potokowanie aktywne, większa przepustowość, ale w przypadku niepowodzenia partii N i partia N-1 jest już w locie, po ponownej próbie N zapisy pojawiają się w kolejności N-1, N, gdy N by to zrobił musiał poprzedzać N-1. Sortowanie nie jest już gwarantowane.
  • MIFR ≤ 5 z idempotencją: z enable.idempotence=true, gwarantuje Kafka sortowanie nawet przy maksymalnie 5 żądaniach w locie, dzięki numerom sekwencyjnym. Jest to wartość domyślna gdy idempotentność jest włączona i jest maksymalnie obsługiwana w celu utrzymania gwarancji.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

Kompletna konfiguracja producenta do produkcji

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

Zaawansowany konsument: zatwierdzenie i ponowne przetwarzanie

Automatyczne zatwierdzanie: proste, ale ryzykowne

Con enable.auto.commit=true (domyślnie), Kafka automatycznie zatwierdza przesunięcie co auto.commit.interval.ms milisekundy (domyślnie: 5000 ms = 5 sekund). Problem: zatwierdzenie odbywa się w tle, niezależnie od tego, kiedy rekordy są faktycznie przetwarzane.

Problematyczny scenariusz z automatycznym zatwierdzaniem:

  1. Sonda zwraca 100 rekordów z przesunięciem 1-100
  2. Konsument rozpoczyna przetwarzanie zapisów
  3. Po 5 sekundach rozpoczyna się timer: offset 100 jest automatycznie zatwierdzany
  4. Konsument ulega awarii po przetworzeniu rekordów tylko 1-60
  5. Po ponownym uruchomieniu konsument zaczyna od przesunięcia 100: rekordy 61-100 zostały pominięte (utrata danych)

Synchroniczne ręczne zatwierdzenie

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

Zatwierdzanie na partycję (dokładna szczegółowość)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

Rebalance: Jak to działa i jak nim zarządzać

Un przywrócenie równowagi ma miejsce, gdy zmienia się grupa konsumentów: konsument wchodzi, wychodzi lub ulega awarii. Podczas przywracania równowagi, wszystkich konsumentów w grupie przestają czytać (zatrzymaj świat) i koordynator grupy ponownie przydziela partycje. W najnowszych wersjach Kafki (2.4+) jest ona dostępna the zrównoważyć spółdzielnie (przyrostowe przywracanie równowagi w ramach współpracy), które zmniejsza wpływ: tylko partycje, które zmieniają cesjonariuszy, są unieważniane i ponownie przypisywane.

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

Konsument w Pythonie z zaawansowanym zarządzaniem offsetami

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

Strojenie wydajności: kluczowe parametry

Strona producenta

Parametr Domyślny Efekt
linger.ms 0 Czeka N ms na zgromadzenie większych partii
batch.size 16384 (16KB) Maksymalny rozmiar partii na partycję
buffer.memory 33554432 (32MB) Całkowity bufor w pamięci dla wszystkich partii
compression.type tak nie jest brak/gzip/snappy/lz4/zstd

Strona konsumencka

Parametr Domyślny Efekt
fetch.min.bytes 1 Minimalny rozmiar danych zwracanych z pobierania
fetch.max.wait.ms 500 Maksymalny czas oczekiwania, jeśli nie osiągnięto liczby fetch.min.bytes
max.poll.records 500 Maksymalna liczba rekordów dla pojedynczej ankiety()
max.partition.fetch.bytes 1048576 (1MB) Maksymalna ilość danych na partycję na pobranie

Testowanie za pomocą EmbeddedKafka

Testowanie kodu korzystającego z platformy Kafka wymaga dostępnego klastra. W przypadku testów jednostkowych i integracyjnych, Wiosna Kafka dostarcza @EmbeddedKafka który uruchamia brokera w pamięci podczas testowania:

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Do testowania z kontenerami Test (bardziej realistycznymi, użyj prawdziwego brokera Docker):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

Kolejne kroki w serii

Uwzględniając zaawansowanych producentów i konsumentów, możesz stawić czoła najbardziej złożonemu wyzwaniu:

  • Artykuł 4 – Semantyka dokładnie raz: Transakcje Kafka w celu zabezpieczenia dokładnie przetwarzanie od końca do końca, z konsekwencjami dla koordynatora transakcji i przepustowości. Niezbędny dla rurociągów finansowych.
  • Artykuł 5 – Schemat rejestru: Jak używać Avro i Protobuf z rejestrem schematu aby uniknąć niezgodności schematów między producentami i konsumentami w różnych zespołach.
  • Artykuł 6 – Strumienie Kafki: wbudowane przetwarzanie strumieni w Javie za pomocą Streams DSL, który wewnętrznie korzysta z tych samych interfejsów API producentów i konsumentów, które opisano w tym artykule.

Połącz z innymi seriami

  • Architektura (od mikrousług po modułowy monolit): Producenci i konsumenci Kafki jako konkretna implementacja wzorców sterowanych zdarzeniami w architekturach rozproszonych.
  • Zaawansowana Java: Głębokie zanurzenie się w API Java Kafki z bezpieczeństwem wątków, zarządzanie cyklem życia i testowanie za pomocą EmbeddedKafka.