De drie leveringsgaranties

Voordat u met de installatie begint, is het van cruciaal belang dat u de drie leveringsmodi begrijpt die Kafka ondersteunt en hun praktische betekenis:

  • Hoogstens één keer: Het bericht wordt één keer verzonden, zonder nieuwe poging. Als de makelaar het niet ontvangt, het is definitief verloren. Geen duplicaten, mogelijk gegevensverlies. Acceptabel voor niet-kritieke statistieken, debug-logboeken, click-stream-gebeurtenissen waarbij het verlies van sommige gebeurtenissen aanvaardbaar is.
  • Minstens één keer: De producent probeert het opnieuw als er een fout is opgetreden. Het bericht komt minstens één keer aan, maar het kan meerdere keren aankomen (duplicaten) als de makelaar het heeft ontvangen maar geen bevestiging heeft gestuurd vóór de time-out. De consument moet idempotent zijn om met duplicaten om te gaan. Meest voorkomende scenario in productie.
  • Precies één keer: Het bericht wordt precies één keer verwerkt, zonder verlies en zonder duplicaten. Vereist idempotente producent (voor precies één keer producentenzijde) of Kafka-transacties (voor precies één keer end-to-end tussen producent en consument). Aanzienlijke prestatieoverhead.

De gouden regel

In een gedistribueerd systeem is het onmogelijk om precies één keer te garanderen zonder overhead. 90% van de systemen Kafka in Amerikaanse productie minstens één keer met idempotente consumenten (deduplicatie aan de consumentenzijde). via database of cache). Exact één keer via Kafka-transacties wordt gebruikt voor financiële pijplijnen, facturering systemen, of waar dan ook, duplicatie van een gebeurtenis veroorzaakt echte schade.

De acks-parameter: Op hoeveel bevestigingen moet worden gewacht

De parameter acks van de producent bepaalt hoeveel replica's eerst de ontvangst moeten bevestigen dat de producent het verzoek als voltooid beschouwt:

acks=0 (vuur en vergeet)

De producer stuurt de plaat op en wacht niet op een reactie van de makelaar. Maximale doorvoer, minimale latentie, maar geen garantie: als de makelaar down is of het record wordt geschreven naar een makelaar die dan als eerste valt om te antwoorden, het record is verloren. De semantiek is hoogstens één keer.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1 (alleen leider)

De producent wacht alleen op bevestiging van de leidende makelaar van de partitie. Volgers hebben dat misschien nog niet gedaan repliceerde het record toen de bevestiging arriveerde. Als de leider direct na het versturen van de bevestiging valt maar voordat de volgers hebben geantwoord, is het record verloren. Semantiek minstens één keer met risico op gegevensverlies in zeer korte foutvensters.

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=all (of acks=-1): Alle ISR's

De producer wacht tot de plaat op alle platen is geschreven ISR (gesynchroniseerde replica's) van de partitie. Met min.insync.replicas=2 e replication.factor=3, dit betekent dat tenminste 2 antwoorden (leider + 1 volger) moeten bevestigen. Pas dan krijgt de producent de ack. Semantiek minstens één keer zonder gegevensverlies tot tenminste min.insync.replicas makelaars zijn actief.

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

Het dubbele probleem met opnieuw proberen

Overweeg dit scenario met acks=all e retries=3:

  1. De producent stuurt het R1-record naar de hoofdmakelaar
  2. De makelaar schrijft R1 naar schijf en stuurt een bericht naar de producent
  3. De ack is verloren (time-out van het netwerk voordat het de producent bereikt)
  4. De producent, die geen antwoord ontvangt, komt binnen request.timeout.ms, denkt dat het schrijven is mislukt
  5. De producent probeert het opnieuw en stuurt opnieuw R1
  6. De makelaar ontvangt R1 een tweede keer en schrijft dit als een afzonderlijk record
  7. Het onderwerp bevat nu dubbele R1

Dit is het juiste gedrag van minstens één keer: elke gegevens komt minstens één keer binnen, maar duplicaten kunnen voorkomen. Om duplicaten op producentenniveau te elimineren, gebruikt u deidempotente producent.

De Idempotente Producent

De idempotente producer, geïntroduceerd in Kafka 0.11 (2017), elimineert duplicaten veroorzaakt door nieuwe pogingen van de producer. Het mechanisme is gebaseerd op twee concepten:

  • Producent-ID (PID): Wanneer de idempotente producent verbinding maakt, wijst de makelaar een unieke PID toe. De PID blijft bestaan ​​gedurende de levensduur van de producent; als de producent opnieuw opstart, krijgt hij een nieuwe PID.
  • Volgnummer: elk verzonden record heeft een monotoon oplopend volgnummer (0, 1, 2, ...). De makelaar houdt voor elke PID+partitie het laatst ontvangen volgnummer bij. Als er een record binnenkomt als het volgnummer al zichtbaar is, wordt het stilzwijgend weggegooid (deduplicatie aan de kant van de makelaar).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

Grens van de Idempotente Producent

De idempotente producer zorgt ervoor dat iedere plaat verschijnt precies één keer in het Kafka-logboek voor de huidige sessie van de producer. Als de producent opnieuw opstart (nieuwe PID), een record tijdens de vlucht het zou herschreven kunnen worden. En vooral: het garandeert niets over de verwerking door de consument. Als de consument een record verwerkt en vervolgens crasht voordat de offset wordt vastgelegd, bij de volgende keer opstarten zal hetzelfde record opnieuw verwerken. Voor precies één keer end-to-end heb je de transactionele API nodig.

max.in.flight.requests.per.verbinding en sorteren

De parameter max.in.flight.requests.per.connection (MIFR) bepaalt hoeveel productieaanvragen er zijn ze kunnen tegelijkertijd naar één enkele makelaar vliegen. Het heeft een cruciale impact op het sorteren aantal berichten bij nieuwe poging:

  • MIFR=1: elk verzoek moet worden bevestigd voordat er een ander wordt verzonden. Gegarandeerde sortering, maar verminderde doorvoer (geen pijpleidingen).
  • MIFR > 1 zonder idempotentie: pijplijning actief, hogere doorvoer, maar als batch N mislukt en batch N-1 is al aan het vliegen, na de nieuwe poging van N verschijnen de records in volgorde N-1, N wanneer N dat zou hebben gedaan moest aan N-1 voorafgaan. Sortering is niet langer gegarandeerd.
  • MIFR ≤ 5 met idempotentie: met enable.idempotence=true, garandeert Kafka sorteren zelfs met maximaal 5 aanvragen tijdens de vlucht, dankzij volgnummers. Het is de standaardwaarde wanneer idempotentie is ingeschakeld en maximaal wordt ondersteund om de garantie te behouden.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

Volledige producentenconfiguratie voor productie

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

Geavanceerde consument: vastleggen en opnieuw verwerken

Automatisch vastleggen: eenvoudig maar riskant

Met enable.auto.commit=true (standaard), voert Kafka de compensatie automatisch elke keer uit auto.commit.interval.ms milliseconden (standaard: 5000 ms = 5 seconden). Het probleem: de commit gebeurt op de achtergrond, ongeacht wanneer de records daadwerkelijk worden verwerkt.

Problematisch scenario met automatische vastlegging:

  1. Poll retourneert 100 records met een offset van 1-100
  2. De consument begint de gegevens te verwerken
  3. Na 5 seconden start de timer: offset 100 wordt automatisch vastgelegd
  4. De consument crasht nadat hij alleen records 1-60 heeft verwerkt
  5. Bij het opnieuw opstarten begint de consument bij offset 100: records 61-100 zijn overgeslagen (gegevensverlies)

Synchrone handmatige vastlegging

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

Vastlegging per partitie (fijne granulariteit)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

Herbalanceren: hoe het werkt en hoe je ermee om kunt gaan

Un opnieuw in evenwicht brengen vindt plaats wanneer de consumentengroep verandert: een consument komt binnen, verlaat of crasht. Tijdens de herbalancering alle consumenten in de groep ze stoppen met lezen (stop-de-wereld) en de groepscoördinator wijst partities opnieuw toe. In recente versies van Kafka (2.4+) is het beschikbaar de coöperaties opnieuw in evenwicht brengen (incrementele coöperatieve herbalancering) waardoor de impact wordt verminderd: alleen partities waarbij de toegewezen persoon verandert, worden ingetrokken en opnieuw toegewezen.

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

Consument in Python met geavanceerd offsetbeheer

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

Prestatieafstemming: belangrijke parameters

Producentenkant

Parameter Standaard Effect
linger.ms 0 Wacht N ms om grotere batches te verzamelen
batch.size 16384 (16KB) Maximale batchgrootte per partitie
buffer.memory 33554432 (32MB) Totale buffer in het geheugen voor alle batches
compression.type dat is het niet geen/gzip/snappy/lz4/zstd

Consumentenkant

Parameter Standaard Effect
fetch.min.bytes 1 Minimale gegevensgrootte die moet worden geretourneerd bij het ophalen
fetch.max.wait.ms 500 Maximale wachttijd als fetch.min.bytes niet wordt bereikt
max.poll.records 500 Maximale records voor enkele poll()
max.partition.fetch.bytes 1048576 (1MB) Maximale gegevens per partitie per ophaalactie

Testen met EmbeddedKafka

Voor het testen van code die gebruikmaakt van Kafka is een beschikbaar cluster vereist. Voor eenheids- en integratietests Lente Kafka levert @EmbeddedKafka die tijdens het testen een in-memory broker start:

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Voor testen met Testcontainers (realistischer, gebruik een echte Docker-broker):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

Volgende stappen in de serie

Met inbegrip van geavanceerde producenten en consumenten kunt u de meest complexe uitdaging aangaan:

  • Artikel 4 – Exactly-Once-semantiek: Kafka-transacties om te beveiligen exact end-to-end-verwerking, met implicaties voor de transactiecoördinator en de doorvoer. Onmisbaar voor financiële pijpleidingen.
  • Artikel 5 – Registratieschema: Avro en Protobuf gebruiken met Schema Registry om schema-incompatibiliteiten tussen producenten en consumenten in verschillende teams te voorkomen.
  • Artikel 6 – Kafka-streams: ingebedde streamverwerking in Java met de Streams DSL, die intern gebruikmaakt van dezelfde producenten- en consumenten-API's die in dit artikel worden onderzocht.

Link met andere series

  • Architectuur (van microservices tot modulaire monoliet): Kafka-producenten en consumenten als een concrete implementatie van gebeurtenisgestuurde patronen in gedistribueerde architecturen.
  • Geavanceerde Java: Diepe duik in Kafka's Java API met threadveiligheid, levenscyclusbeheer en testen met EmbeddedKafka.