Geavanceerde Kafka-producent en consument: Acks, Idempotentie en opnieuw proberen
Het gedrag van een Kafka-producent hangt in belangrijke mate af van drie parameters: acks, retries e
max.in.flight.requests.per.connection. Een verkeerde configuratie kan tot gegevensverlies leiden
of naar niet-gedetecteerde duplicaten. Deze gids behandelt elk scenario met configuraties uit de echte wereld, legt de idempotente producent uit
geïntroduceerd in Kafka 0.11 en hoe de consument omgaat met compensatie en herverwerking.
De drie leveringsgaranties
Voordat u met de installatie begint, is het van cruciaal belang dat u de drie leveringsmodi begrijpt die Kafka ondersteunt en hun praktische betekenis:
- Hoogstens één keer: Het bericht wordt één keer verzonden, zonder nieuwe poging. Als de makelaar het niet ontvangt, het is definitief verloren. Geen duplicaten, mogelijk gegevensverlies. Acceptabel voor niet-kritieke statistieken, debug-logboeken, click-stream-gebeurtenissen waarbij het verlies van sommige gebeurtenissen aanvaardbaar is.
- Minstens één keer: De producent probeert het opnieuw als er een fout is opgetreden. Het bericht komt minstens één keer aan, maar het kan meerdere keren aankomen (duplicaten) als de makelaar het heeft ontvangen maar geen bevestiging heeft gestuurd vóór de time-out. De consument moet idempotent zijn om met duplicaten om te gaan. Meest voorkomende scenario in productie.
- Precies één keer: Het bericht wordt precies één keer verwerkt, zonder verlies en zonder duplicaten. Vereist idempotente producent (voor precies één keer producentenzijde) of Kafka-transacties (voor precies één keer end-to-end tussen producent en consument). Aanzienlijke prestatieoverhead.
De gouden regel
In een gedistribueerd systeem is het onmogelijk om precies één keer te garanderen zonder overhead. 90% van de systemen Kafka in Amerikaanse productie minstens één keer met idempotente consumenten (deduplicatie aan de consumentenzijde). via database of cache). Exact één keer via Kafka-transacties wordt gebruikt voor financiële pijplijnen, facturering systemen, of waar dan ook, duplicatie van een gebeurtenis veroorzaakt echte schade.
De acks-parameter: Op hoeveel bevestigingen moet worden gewacht
De parameter acks van de producent bepaalt hoeveel replica's eerst de ontvangst moeten bevestigen
dat de producent het verzoek als voltooid beschouwt:
acks=0 (vuur en vergeet)
De producer stuurt de plaat op en wacht niet op een reactie van de makelaar. Maximale doorvoer, minimale latentie, maar geen garantie: als de makelaar down is of het record wordt geschreven naar een makelaar die dan als eerste valt om te antwoorden, het record is verloren. De semantiek is hoogstens één keer.
// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record
acks=1 (alleen leider)
De producent wacht alleen op bevestiging van de leidende makelaar van de partitie. Volgers hebben dat misschien nog niet gedaan repliceerde het record toen de bevestiging arriveerde. Als de leider direct na het versturen van de bevestiging valt maar voordat de volgers hebben geantwoord, is het record verloren. Semantiek minstens één keer met risico op gegevensverlies in zeer korte foutvensters.
// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
// -> possibile duplicato se broker ha ricevuto ma non risposto
acks=all (of acks=-1): Alle ISR's
De producer wacht tot de plaat op alle platen is geschreven ISR (gesynchroniseerde replica's) van de partitie.
Met min.insync.replicas=2 e replication.factor=3, dit betekent dat tenminste
2 antwoorden (leider + 1 volger) moeten bevestigen. Pas dan krijgt de producent de ack.
Semantiek minstens één keer zonder gegevensverlies tot tenminste min.insync.replicas
makelaars zijn actief.
// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all"); // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000); // attende fino a 60s se il broker è congestionato
// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)
// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova
Het dubbele probleem met opnieuw proberen
Overweeg dit scenario met acks=all e retries=3:
- De producent stuurt het R1-record naar de hoofdmakelaar
- De makelaar schrijft R1 naar schijf en stuurt een bericht naar de producent
- De ack is verloren (time-out van het netwerk voordat het de producent bereikt)
- De producent, die geen antwoord ontvangt, komt binnen
request.timeout.ms, denkt dat het schrijven is mislukt - De producent probeert het opnieuw en stuurt opnieuw R1
- De makelaar ontvangt R1 een tweede keer en schrijft dit als een afzonderlijk record
- Het onderwerp bevat nu dubbele R1
Dit is het juiste gedrag van minstens één keer: elke gegevens komt minstens één keer binnen, maar duplicaten kunnen voorkomen. Om duplicaten op producentenniveau te elimineren, gebruikt u deidempotente producent.
De Idempotente Producent
De idempotente producer, geïntroduceerd in Kafka 0.11 (2017), elimineert duplicaten veroorzaakt door nieuwe pogingen van de producer. Het mechanisme is gebaseerd op twee concepten:
- Producent-ID (PID): Wanneer de idempotente producent verbinding maakt, wijst de makelaar een unieke PID toe. De PID blijft bestaan gedurende de levensduur van de producent; als de producent opnieuw opstart, krijgt hij een nieuwe PID.
- Volgnummer: elk verzonden record heeft een monotoon oplopend volgnummer (0, 1, 2, ...). De makelaar houdt voor elke PID+partitie het laatst ontvangen volgnummer bij. Als er een record binnenkomt als het volgnummer al zichtbaar is, wordt het stilzwijgend weggegooid (deduplicatie aan de kant van de makelaar).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Abilitare idempotenza
props.put("enable.idempotence", true);
// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException
// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)
Grens van de Idempotente Producent
De idempotente producer zorgt ervoor dat iedere plaat verschijnt precies één keer in het Kafka-logboek voor de huidige sessie van de producer. Als de producent opnieuw opstart (nieuwe PID), een record tijdens de vlucht het zou herschreven kunnen worden. En vooral: het garandeert niets over de verwerking door de consument. Als de consument een record verwerkt en vervolgens crasht voordat de offset wordt vastgelegd, bij de volgende keer opstarten zal hetzelfde record opnieuw verwerken. Voor precies één keer end-to-end heb je de transactionele API nodig.
max.in.flight.requests.per.verbinding en sorteren
De parameter max.in.flight.requests.per.connection (MIFR) bepaalt hoeveel productieaanvragen er zijn
ze kunnen tegelijkertijd naar één enkele makelaar vliegen. Het heeft een cruciale impact op het sorteren
aantal berichten bij nieuwe poging:
- MIFR=1: elk verzoek moet worden bevestigd voordat er een ander wordt verzonden. Gegarandeerde sortering, maar verminderde doorvoer (geen pijpleidingen).
- MIFR > 1 zonder idempotentie: pijplijning actief, hogere doorvoer, maar als batch N mislukt en batch N-1 is al aan het vliegen, na de nieuwe poging van N verschijnen de records in volgorde N-1, N wanneer N dat zou hebben gedaan moest aan N-1 voorafgaan. Sortering is niet langer gegarandeerd.
-
MIFR ≤ 5 met idempotentie: met
enable.idempotence=true, garandeert Kafka sorteren zelfs met maximaal 5 aanvragen tijdens de vlucht, dankzij volgnummers. Het is de standaardwaarde wanneer idempotentie is ingeschakeld en maximaal wordt ondersteund om de garantie te behouden.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1); // nessun pipelining
// Througput limitato ma ordine garantito
// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry
// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer
Volledige producentenconfiguratie voor productie
// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerFactory {
/**
* Crea un producer ottimizzato per throughput elevato
* con garanzie at-least-once e idempotenza abilitata.
*/
public static KafkaProducer<String, byte[]> createHighThroughputProducer(
String bootstrapServers) {
Properties props = new Properties();
// Connessione
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Durabilità e idempotenza
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks=all e retries=MAX impostati automaticamente
// Batching: aspetta fino a 10ms per accumulare records
// Riduce numero di richieste al broker, aumenta throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale
// Compressione: snappy è bilanciata tra CPU e ratio
// lz4 per massima velocità; zstd per massimo ratio
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Timeout: se il buffer è pieno, blocca fino a max.block.ms
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
// Request timeout: quanto aspettare risposta dal broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// Delivery timeout: timeout totale inclusi tutti i retry
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new KafkaProducer<>(props);
}
/**
* Crea un producer per eventi critici (financial, billing)
* con latenza minima e massime garanzie.
*/
public static KafkaProducer<String, byte[]> createLowLatencyProducer(
String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Massima durabilità
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Nessun batching: invia immediatamente ogni record
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
// Nessuna compressione: elimina latenza CPU
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
return new KafkaProducer<>(props);
}
}
Geavanceerde consument: vastleggen en opnieuw verwerken
Automatisch vastleggen: eenvoudig maar riskant
Met enable.auto.commit=true (standaard), voert Kafka de compensatie automatisch elke keer uit
auto.commit.interval.ms milliseconden (standaard: 5000 ms = 5 seconden). Het probleem:
de commit gebeurt op de achtergrond, ongeacht wanneer de records daadwerkelijk worden verwerkt.
Problematisch scenario met automatische vastlegging:
- Poll retourneert 100 records met een offset van 1-100
- De consument begint de gegevens te verwerken
- Na 5 seconden start de timer: offset 100 wordt automatisch vastgelegd
- De consument crasht nadat hij alleen records 1-60 heeft verwerkt
- Bij het opnieuw opstarten begint de consument bij offset 100: records 61-100 zijn overgeslagen (gegevensverlies)
Synchrone handmatige vastlegging
// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");
// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);
// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000); // 5 minuti
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
elaboraOrdine(record);
}
// commitSync() bloca fino a conferma dal broker
// In caso di errore, lancia CommitFailedException
// Garantisce at-least-once: il commit avviene dopo l'elaborazione
consumer.commitSync();
}
} catch (CommitFailedException e) {
// Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
// Gestisci il rebalance e riprendi
log.error("Commit fallito, probabile rebalance", e);
}
Vastlegging per partitie (fijne granulariteit)
// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// Raggruppa i record per partizione
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Elabora il record
elaboraOrdine(record);
// Traccia l'offset per questa partizione
// IMPORTANTE: commita offset+1 (il prossimo record da leggere)
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// Commit di tutti gli offset accumulati
consumer.commitSync(offsets);
}
Herbalanceren: hoe het werkt en hoe je ermee om kunt gaan
Un opnieuw in evenwicht brengen vindt plaats wanneer de consumentengroep verandert: een consument komt binnen, verlaat of crasht. Tijdens de herbalancering alle consumenten in de groep ze stoppen met lezen (stop-de-wereld) en de groepscoördinator wijst partities opnieuw toe. In recente versies van Kafka (2.4+) is het beschikbaar de coöperaties opnieuw in evenwicht brengen (incrementele coöperatieve herbalancering) waardoor de impact wordt verminderd: alleen partities waarbij de toegewezen persoon verandert, worden ingetrokken en opnieuw toegewezen.
// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
Collections.singletonList("ordini-effettuati"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Chiamato prima che le partizioni vengano riassegnate
// Commita gli offset delle partizioni che stiamo per perdere
log.info("Partizioni revocate: {}", partitions);
// Commit degli offset non ancora committati per le partizioni revocate
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (TopicPartition tp : partitions) {
if (currentOffsets.containsKey(tp)) {
toCommit.put(tp, currentOffsets.get(tp));
}
}
if (!toCommit.isEmpty()) {
consumer.commitSync(toCommit);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Chiamato dopo che le nuove partizioni sono state assegnate
log.info("Partizioni assegnate: {}", partitions);
// Possiamo inizializzare state locale per le nuove partizioni
}
}
);
Consument in Python met geavanceerd offsetbeheer
from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging
log = logging.getLogger(__name__)
class AdvancedKafkaConsumer:
"""Consumer Kafka con commit manuale e gestione rebalance."""
def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
self.topic = topic
self.pending_offsets: Dict[tuple, int] = {}
config = {
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
# Cooperative rebalance: meno interruzioni
"partition.assignment.strategy": "cooperative-sticky",
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000,
}
self.consumer = Consumer(config)
self.consumer.subscribe([topic], on_assign=self._on_assign,
on_revoke=self._on_revoke)
def _on_assign(self, consumer, partitions):
log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")
def _on_revoke(self, consumer, partitions):
"""Commita gli offset prima di perdere le partizioni."""
log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
to_commit = []
for p in partitions:
key = (p.topic, p.partition)
if key in self.pending_offsets:
to_commit.append(
TopicPartition(p.topic, p.partition,
self.pending_offsets[key])
)
if to_commit:
consumer.commit(offsets=to_commit, asynchronous=False)
def process_messages(self, handler_fn, batch_size: int = 100):
"""Loop principale di elaborazione."""
batch_count = 0
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
log.error(f"Errore Kafka: {msg.error()}")
continue
# Elabora il messaggio
try:
handler_fn(msg)
# Traccia l'offset (offset+1 = prossimo da leggere)
self.pending_offsets[(msg.topic(), msg.partition())] = \
msg.offset() + 1
batch_count += 1
except Exception as e:
log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
# Qui potresti implementare una DLQ (Dead Letter Queue)
# o skippare il record problematico
# Commit ogni N record
if batch_count >= batch_size:
self._commit_pending()
batch_count = 0
finally:
# Commit finale prima di chiudere
self._commit_pending()
self.consumer.close()
def _commit_pending(self):
"""Commita tutti gli offset pendenti."""
if not self.pending_offsets:
return
offsets = [
TopicPartition(topic, partition, offset)
for (topic, partition), offset in self.pending_offsets.items()
]
self.consumer.commit(offsets=offsets, asynchronous=False)
self.pending_offsets.clear()
log.debug(f"Committati {len(offsets)} offset")
Prestatieafstemming: belangrijke parameters
Producentenkant
| Parameter | Standaard | Effect |
|---|---|---|
linger.ms |
0 | Wacht N ms om grotere batches te verzamelen |
batch.size |
16384 (16KB) | Maximale batchgrootte per partitie |
buffer.memory |
33554432 (32MB) | Totale buffer in het geheugen voor alle batches |
compression.type |
dat is het niet | geen/gzip/snappy/lz4/zstd |
Consumentenkant
| Parameter | Standaard | Effect |
|---|---|---|
fetch.min.bytes |
1 | Minimale gegevensgrootte die moet worden geretourneerd bij het ophalen |
fetch.max.wait.ms |
500 | Maximale wachttijd als fetch.min.bytes niet wordt bereikt |
max.poll.records |
500 | Maximale records voor enkele poll() |
max.partition.fetch.bytes |
1048576 (1MB) | Maximale gegevens per partitie per ophaalactie |
Testen met EmbeddedKafka
Voor het testen van code die gebruikmaakt van Kafka is een beschikbaar cluster vereist. Voor eenheids- en integratietests
Lente Kafka levert @EmbeddedKafka die tijdens het testen een in-memory broker start:
// Dipendenza Maven
// <dependency>
// <groupId>org.springframework.kafka</groupId>
// <artifactId>spring-kafka-test</artifactId>
// <scope>test</scope>
// </dependency>
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"ordini-test"},
brokerProperties = {
"auto.create.topics.enable=false",
"default.replication.factor=1"
}
)
class OrdineProducerTest {
@Autowired
private OrdineProducer producer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldSendOrderSuccessfully() throws Exception {
// Crea un consumer di test per verificare che il messaggio sia arrivato
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");
// Invia un ordine
producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");
// Verifica che il messaggio sia stato ricevuto entro 3 secondi
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key()).isEqualTo("order-001");
assertThat(record.value()).contains("99.99");
consumer.close();
}
}
Voor testen met Testcontainers (realistischer, gebruik een echte Docker-broker):
// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
class OrdineProducerIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("apache/kafka:4.0.0")
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Test
void testConBrokerReale() {
// Test con broker Kafka 4.0 reale in Docker
// Garantisce che il comportamento corrisponda alla produzione
}
}
Volgende stappen in de serie
Met inbegrip van geavanceerde producenten en consumenten kunt u de meest complexe uitdaging aangaan:
- Artikel 4 – Exactly-Once-semantiek: Kafka-transacties om te beveiligen exact end-to-end-verwerking, met implicaties voor de transactiecoördinator en de doorvoer. Onmisbaar voor financiële pijpleidingen.
- Artikel 5 – Registratieschema: Avro en Protobuf gebruiken met Schema Registry om schema-incompatibiliteiten tussen producenten en consumenten in verschillende teams te voorkomen.
- Artikel 6 – Kafka-streams: ingebedde streamverwerking in Java met de Streams DSL, die intern gebruikmaakt van dezelfde producenten- en consumenten-API's die in dit artikel worden onderzocht.
Link met andere series
- Architectuur (van microservices tot modulaire monoliet): Kafka-producenten en consumenten als een concrete implementatie van gebeurtenisgestuurde patronen in gedistribueerde architecturen.
- Geavanceerde Java: Diepe duik in Kafka's Java API met threadveiligheid, levenscyclusbeheer en testen met EmbeddedKafka.







