İleri Kafka Üreticisi ve Tüketicisi: Acks, Idempotency ve Retry
Bir Kafka yapımcısının davranışı kritik olarak üç parametreye bağlıdır: acks, retries e
max.in.flight.requests.per.connection. Yanlış yapılandırma veri kaybına neden olabilir
veya tespit edilemeyen kopyalara. İdempotent yapımcı, bu kılavuzun gerçek dünya yapılandırmalarıyla her senaryoyu kapsadığını açıklıyor
Kafka 0.11'de tanıtılan bilgiler ve tüketicinin dengeleme ve yeniden işleme işlemlerini nasıl ele aldığı.
Üç Teslimat Garantisi
Kuruluma başlamadan önce Kafka'nın desteklediği üç dağıtım modunu anlamak çok önemlidir ve pratik anlamları:
- En fazla bir kez: Mesaj tekrar denemeden bir kez gönderilir. Eğer komisyoncu bunu almazsa, kalıcı olarak kaybolur. Sıfır kopya, olası veri kaybı. Kritik olmayan metrikler için kabul edilebilir, hata ayıklama günlükleri, bazı olayların kaybının tolere edilebildiği tıklama akışı olayları.
- En az bir kez: Hata varsa yapımcı tekrar dener. Mesaj en az bir kez gelecek, ancak aracının bunu almasına rağmen zaman aşımından önce onay göndermemesi durumunda birden çok kez (kopyalar) gelebilir. Tüketicinin kopyaları işleme konusunda bağımsız olması gerekir. Üretimde en yaygın senaryo.
- Tam olarak bir kez: Mesaj, kayıpsız ve kopyalanmadan tam olarak bir kez işlenir. İdempotent üretici (tam olarak bir kez üretici tarafı için) veya Kafka işlemleri (tam olarak bir kez için) gerektirir Üretici ile tüketici arasında uçtan uca). Önemli performans yükü.
Altın Kural
Dağıtılmış bir sistemde, ek yük olmadan tam olarak bir kez garanti etmek imkansızdır. Sistemlerin %90'ı ABD üretiminde Kafka en az bir kez idempotent tüketicilerle (tüketici tarafı veri tekilleştirme) veritabanı veya önbellek yoluyla). Tam olarak bir kez Kafka işlemleri aracılığıyla finansal işlem hatları ve faturalandırma için kullanılır sistemlerde veya herhangi bir yerde bir olayın kopyalanması gerçek zarara neden olur.
Acks Parametresi: Kaç Onay Beklenecek?
Parametre acks Üreticinin oranı, ilk önce kaç kopyanın alındığını onaylaması gerektiğini tanımlar
yapımcının talebin tamamlanmış olduğunu düşünmesi:
acks=0 (Ateş Et ve Unut)
Yapımcı kaydı gönderir ve komisyoncudan herhangi bir yanıt beklemez. Maksimum verim, minimum gecikme, ancak garanti yok: eğer komisyoncu çökerse veya kayıt bir komisyoncuya yazılırsa, o da ilk önce düşer yanıt vermek için kayıt kaybolur. Anlambilim en fazla bir kez.
// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record
acks=1 (Yalnızca Lider)
Üretici yalnızca bölümün önde gelen komisyoncusundan onay bekler. Takipçiler henüz sahip olmayabilir Onay geldiğinde kaydı kopyaladı. Lider, onayı gönderdikten hemen sonra düşerse ancak takipçiler cevap veremeden kayıt kayboldu. Anlambilim en az bir kez çok kısa hata pencerelerinde veri kaybı riski vardır.
// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
// -> possibile duplicato se broker ha ricevuto ma non risposto
acks=all (veya acks=-1): Tüm ISR'ler
Yapımcı plağın tüm kayıtlara yazılmasını bekliyor. ISR (Senkronize Kopyalar) bölümün.
İle min.insync.replicas=2 e replication.factor=3, bu en azından şu anlama geliyor
2 yanıtın (lider + 1 takipçi) onaylaması gerekir. Ancak o zaman yapımcı onayı alır.
Anlambilim en az bir kez en azından veri kaybı olmadan min.insync.replicas
komisyoncular aktif.
// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all"); // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000); // attende fino a 60s se il broker è congestionato
// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)
// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova
Yeniden Denemede Yinelenen Sorun
Bu senaryoyu göz önünde bulundurun acks=all e retries=3:
- Üretici R1 kaydını lider komisyoncuya gönderir
- Broker R1'i diske yazar ve üreticiye onay gönderir.
- Ack kayboldu (yapımcıya ulaşmadan ağ zaman aşımına uğradı)
- Onay alamayan yapımcı içeri girdi
request.timeout.ms, yazmanın başarısız olduğunu düşünüyor - Üretici tekrar dener ve tekrar R1'i gönderir.
- Aracı, R1'i ikinci kez alır ve bunu ayrı bir kayıt olarak yazar.
- Konu artık yinelenen R1 içeriyor
Bu, en az bir kerenin doğru davranışıdır: her veri en az bir kez gelir, ancak kopyalar da gelebilir. Üretici düzeyinde kopyaları ortadan kaldırmak içiniktidarsız yapımcı.
İdempotent Yapımcı
Kafka 0.11 (2017)'de tanıtılan idempotent yapımcı, üreticinin yeniden denemesinden kaynaklanan kopyaları ortadan kaldırır. Mekanizma iki kavrama dayanmaktadır:
- Üretici Kimliği (PID): İdempotent üretici bağlandığında aracı benzersiz bir PID atar. PID, üreticinin ömrü boyunca varlığını sürdürür; yapımcı yeniden başlarsa yeni bir PID alır.
- Sıra Numarası: Gönderilen her kayıt monoton olarak artan bir sıra numarası (0, 1, 2, ...) taşır. Aracı, her PID+bölümü için alınan son sıra numarasını takip eder. Bir kayıt gelirse sıra numarası zaten görüldükten sonra sessizce atılır (broker tarafında tekilleştirme).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Abilitare idempotenza
props.put("enable.idempotence", true);
// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException
// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)
Idempotent Üreticinin Sınırı
İdempotent yapımcı her kaydın görünmesini sağlar tam olarak bir kez Kafka günlüğünde yapımcının mevcut oturumu için. Üretici yeniden başlarsa (yeni PID), uçuş sırasında bir kayıt yeniden yazılabilir. Ve hepsinden önemlisi: tüketici işlemlerine ilişkin hiçbir şeyi garanti etmez. Tüketici bir kaydı işlerse ve bir sonraki başlatmada dengelemeyi gerçekleştirmeden önce çökerse aynı kaydı yeniden işleyecektir. Tam olarak bir kez uçtan uca için işlemsel API'ye ihtiyacınız vardır.
max.in.flight.requests.per.connection ve Sıralama
Parametre max.in.flight.requests.per.connection (MIFR) kaç üretim talebinin olduğunu kontrol eder
aynı anda tek bir komisyoncuya uçuyor olabilirler. Sıralama üzerinde kritik bir etkisi var
yeniden deneme durumunda mesajların listesi:
- MIFR=1: Her isteğin başka bir istek gönderilmeden önce onaylanması gerekir. Garantili sıralama, ancak verim azaldı (boru hattı yok).
- İdempotans olmadan MIFR > 1: ardışık düzen etkin, daha yüksek verim, ancak N partisi başarısız olursa ve N-1 grubu zaten uçuş halindedir, N'nin yeniden denenmesinden sonra kayıtlar N-1, N sırasına göre görünür; N-1'den önce gelmesi gerekiyordu. Sıralama artık garanti edilmiyor.
-
MIFR ≤ 5, idempotens ile: ile
enable.idempotence=trueKafka garanti eder Sıra numaraları sayesinde uçuş sırasında 5'e kadar istekte bile sıralama. Bu varsayılan değerdir idempotency etkinleştirildiğinde ve garantiyi sürdürmek için maksimum desteklendiğinde.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1); // nessun pipelining
// Througput limitato ma ordine garantito
// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry
// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer
Üretim için Eksiksiz Üretici Yapılandırması
// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerFactory {
/**
* Crea un producer ottimizzato per throughput elevato
* con garanzie at-least-once e idempotenza abilitata.
*/
public static KafkaProducer<String, byte[]> createHighThroughputProducer(
String bootstrapServers) {
Properties props = new Properties();
// Connessione
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Durabilità e idempotenza
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// acks=all e retries=MAX impostati automaticamente
// Batching: aspetta fino a 10ms per accumulare records
// Riduce numero di richieste al broker, aumenta throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale
// Compressione: snappy è bilanciata tra CPU e ratio
// lz4 per massima velocità; zstd per massimo ratio
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// Timeout: se il buffer è pieno, blocca fino a max.block.ms
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
// Request timeout: quanto aspettare risposta dal broker
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
// Delivery timeout: timeout totale inclusi tutti i retry
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new KafkaProducer<>(props);
}
/**
* Crea un producer per eventi critici (financial, billing)
* con latenza minima e massime garanzie.
*/
public static KafkaProducer<String, byte[]> createLowLatencyProducer(
String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
// Massima durabilità
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Nessun batching: invia immediatamente ogni record
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
// Nessuna compressione: elimina latenza CPU
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
return new KafkaProducer<>(props);
}
}
İleri Düzey Tüketici: Taahhüt ve Yeniden İşleme
Otomatik taahhüt: Basit ama Riskli
İle enable.auto.commit=true (varsayılan), Kafka her seferinde ofseti otomatik olarak gerçekleştirir.
auto.commit.interval.ms milisaniye (varsayılan: 5000ms = 5 saniye). Sorun:
taahhüt, kayıtların gerçekte ne zaman işlendiğine bakılmaksızın arka planda gerçekleşir.
Otomatik taahhütle ilgili sorunlu senaryo:
- Anket, 1-100 uzaklığıyla 100 kayıt döndürür
- Tüketici kayıtları işlemeye başlar
- 5 saniye sonra zamanlayıcı başlar: ofset 100 otomatik olarak işlenir
- Tüketici yalnızca 1-60 arasındaki kayıtları işledikten sonra çöküyor
- Yeniden başlatma sırasında tüketici 100 ofseti ile başlar: 61-100 arasındaki kayıtlar atlandı (veri kaybı)
Senkronize Manuel Kaydetme
// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");
// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);
// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000); // 5 minuti
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
elaboraOrdine(record);
}
// commitSync() bloca fino a conferma dal broker
// In caso di errore, lancia CommitFailedException
// Garantisce at-least-once: il commit avviene dopo l'elaborazione
consumer.commitSync();
}
} catch (CommitFailedException e) {
// Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
// Gestisci il rebalance e riprendi
log.error("Commit fallito, probabile rebalance", e);
}
Bölüm Başına Taahhüt (İnce Parçalılık)
// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// Raggruppa i record per partizione
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
// Elabora il record
elaboraOrdine(record);
// Traccia l'offset per questa partizione
// IMPORTANTE: commita offset+1 (il prossimo record da leggere)
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
// Commit di tutti gli offset accumulati
consumer.commitSync(offsets);
}
Yeniden dengeleme: Nasıl çalışır ve nasıl yönetilir?
Un yeniden dengelemek tüketici grubu değiştiğinde meydana gelir: bir tüketici girer, çıkar veya çöker. Yeniden dengeleme sırasında gruptaki tüm tüketiciler okumayı bırakırlar (dünyayı durdururlar) ve grup koordinatörü bölümleri yeniden atar. Kafka'nın son sürümlerinde (2.4+) mevcuttur the kooperatiflerin yeniden dengelenmesi (artımlı işbirlikçi yeniden dengeleme) bu da etkiyi azaltır: yalnızca atananları değiştiren bölümler iptal edilir ve yeniden atanır.
// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
Collections.singletonList("ordini-effettuati"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Chiamato prima che le partizioni vengano riassegnate
// Commita gli offset delle partizioni che stiamo per perdere
log.info("Partizioni revocate: {}", partitions);
// Commit degli offset non ancora committati per le partizioni revocate
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (TopicPartition tp : partitions) {
if (currentOffsets.containsKey(tp)) {
toCommit.put(tp, currentOffsets.get(tp));
}
}
if (!toCommit.isEmpty()) {
consumer.commitSync(toCommit);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Chiamato dopo che le nuove partizioni sono state assegnate
log.info("Partizioni assegnate: {}", partitions);
// Possiamo inizializzare state locale per le nuove partizioni
}
}
);
Gelişmiş Dengeleme Yönetimi ile Python'da Tüketici
from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging
log = logging.getLogger(__name__)
class AdvancedKafkaConsumer:
"""Consumer Kafka con commit manuale e gestione rebalance."""
def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
self.topic = topic
self.pending_offsets: Dict[tuple, int] = {}
config = {
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
# Cooperative rebalance: meno interruzioni
"partition.assignment.strategy": "cooperative-sticky",
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000,
}
self.consumer = Consumer(config)
self.consumer.subscribe([topic], on_assign=self._on_assign,
on_revoke=self._on_revoke)
def _on_assign(self, consumer, partitions):
log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")
def _on_revoke(self, consumer, partitions):
"""Commita gli offset prima di perdere le partizioni."""
log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
to_commit = []
for p in partitions:
key = (p.topic, p.partition)
if key in self.pending_offsets:
to_commit.append(
TopicPartition(p.topic, p.partition,
self.pending_offsets[key])
)
if to_commit:
consumer.commit(offsets=to_commit, asynchronous=False)
def process_messages(self, handler_fn, batch_size: int = 100):
"""Loop principale di elaborazione."""
batch_count = 0
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
log.error(f"Errore Kafka: {msg.error()}")
continue
# Elabora il messaggio
try:
handler_fn(msg)
# Traccia l'offset (offset+1 = prossimo da leggere)
self.pending_offsets[(msg.topic(), msg.partition())] = \
msg.offset() + 1
batch_count += 1
except Exception as e:
log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
# Qui potresti implementare una DLQ (Dead Letter Queue)
# o skippare il record problematico
# Commit ogni N record
if batch_count >= batch_size:
self._commit_pending()
batch_count = 0
finally:
# Commit finale prima di chiudere
self._commit_pending()
self.consumer.close()
def _commit_pending(self):
"""Commita tutti gli offset pendenti."""
if not self.pending_offsets:
return
offsets = [
TopicPartition(topic, partition, offset)
for (topic, partition), offset in self.pending_offsets.items()
]
self.consumer.commit(offsets=offsets, asynchronous=False)
self.pending_offsets.clear()
log.debug(f"Committati {len(offsets)} offset")
Performans Ayarlama: Temel Parametreler
Yapımcı tarafı
| Parametre | Varsayılan | Etki |
|---|---|---|
linger.ms |
0 | Daha büyük partilerin biriktirilmesi için N ms beklenir |
batch.size |
16384 (16KB) | Bölüm başına maksimum toplu iş boyutu |
buffer.memory |
33554432 (32MB) | Tüm gruplar için toplam bellek içi arabellek |
compression.type |
öyle değil | hiçbiri/gzip/snappy/lz4/zstd |
Tüketici tarafı
| Parametre | Varsayılan | Etki |
|---|---|---|
fetch.min.bytes |
1 | Getirmeden döndürülecek minimum veri boyutu |
fetch.max.wait.ms |
500 | fetch.min.bayt sayısına ulaşılmadığı takdirde maksimum bekleme süresi |
max.poll.records |
500 | Tek anket için maksimum kayıt() |
max.partition.fetch.bytes |
1048576 (1MB) | Getirme başına bölüm başına maksimum veri |
EmbeddedKafka ile Test Etme
Kafka'yı kullanan kodu test etmek için kullanılabilir bir küme gerekir. Birim ve entegrasyon testleri için,
Bahar Kafka sunar @EmbeddedKafka test sırasında bir bellek içi aracıyı başlatır:
// Dipendenza Maven
// <dependency>
// <groupId>org.springframework.kafka</groupId>
// <artifactId>spring-kafka-test</artifactId>
// <scope>test</scope>
// </dependency>
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
topics = {"ordini-test"},
brokerProperties = {
"auto.create.topics.enable=false",
"default.replication.factor=1"
}
)
class OrdineProducerTest {
@Autowired
private OrdineProducer producer;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void shouldSendOrderSuccessfully() throws Exception {
// Crea un consumer di test per verificare che il messaggio sia arrivato
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");
// Invia un ordine
producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");
// Verifica che il messaggio sia stato ricevuto entro 3 secondi
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key()).isEqualTo("order-001");
assertThat(record.value()).contains("99.99");
consumer.close();
}
}
Testcontainers ile test yapmak için (daha gerçekçi, gerçek bir Docker aracısı kullanın):
// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
class OrdineProducerIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("apache/kafka:4.0.0")
);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Test
void testConBrokerReale() {
// Test con broker Kafka 4.0 reale in Docker
// Garantisce che il comportamento corrisponda alla produzione
}
}
Serideki Sonraki Adımlar
Gelişmiş üreticiler ve tüketiciler de dahil edildiğinde en karmaşık zorluklarla karşılaşabilirsiniz:
- Madde 4 – Tam Olarak Bir Kez Anlambilimi: Güvenliği sağlanacak Kafka işlemleri İşlem koordinatörü ve üretim sonuçlarıyla tam olarak uçtan uca işleme. Finansal boru hatları için vazgeçilmezdir.
- Madde 5 – Kayıt Şeması: Avro ve Protobuf'un Schema Registry ile kullanılması Farklı ekiplerdeki üreticiler ve tüketiciler arasındaki şema uyumsuzluklarını önlemek için.
- Madde 6 – Kafka Akımları: Streams DSL ile Java'da yerleşik akış işleme, Bu makalede ele alınan aynı üretici ve tüketici API'lerini dahili olarak kullanan.
Diğer Serilerle Bağlantı
- Mimari (Mikroservislerden Modüler Monolitlere): Kafka üreticileri ve tüketicileri dağıtılmış mimarilerde olaya dayalı modellerin somut bir uygulaması olarak.
- İleri Java: İş parçacığı güvenliğiyle Kafka'nın Java API'sine derinlemesine bakın, EmbeddedKafka ile yaşam döngüsü yönetimi ve test etme.







