Üç Teslimat Garantisi

Kuruluma başlamadan önce Kafka'nın desteklediği üç dağıtım modunu anlamak çok önemlidir ve pratik anlamları:

  • En fazla bir kez: Mesaj tekrar denemeden bir kez gönderilir. Eğer komisyoncu bunu almazsa, kalıcı olarak kaybolur. Sıfır kopya, olası veri kaybı. Kritik olmayan metrikler için kabul edilebilir, hata ayıklama günlükleri, bazı olayların kaybının tolere edilebildiği tıklama akışı olayları.
  • En az bir kez: Hata varsa yapımcı tekrar dener. Mesaj en az bir kez gelecek, ancak aracının bunu almasına rağmen zaman aşımından önce onay göndermemesi durumunda birden çok kez (kopyalar) gelebilir. Tüketicinin kopyaları işleme konusunda bağımsız olması gerekir. Üretimde en yaygın senaryo.
  • Tam olarak bir kez: Mesaj, kayıpsız ve kopyalanmadan tam olarak bir kez işlenir. İdempotent üretici (tam olarak bir kez üretici tarafı için) veya Kafka işlemleri (tam olarak bir kez için) gerektirir Üretici ile tüketici arasında uçtan uca). Önemli performans yükü.

Altın Kural

Dağıtılmış bir sistemde, ek yük olmadan tam olarak bir kez garanti etmek imkansızdır. Sistemlerin %90'ı ABD üretiminde Kafka en az bir kez idempotent tüketicilerle (tüketici tarafı veri tekilleştirme) veritabanı veya önbellek yoluyla). Tam olarak bir kez Kafka işlemleri aracılığıyla finansal işlem hatları ve faturalandırma için kullanılır sistemlerde veya herhangi bir yerde bir olayın kopyalanması gerçek zarara neden olur.

Acks Parametresi: Kaç Onay Beklenecek?

Parametre acks Üreticinin oranı, ilk önce kaç kopyanın alındığını onaylaması gerektiğini tanımlar yapımcının talebin tamamlanmış olduğunu düşünmesi:

acks=0 (Ateş Et ve Unut)

Yapımcı kaydı gönderir ve komisyoncudan herhangi bir yanıt beklemez. Maksimum verim, minimum gecikme, ancak garanti yok: eğer komisyoncu çökerse veya kayıt bir komisyoncuya yazılırsa, o da ilk önce düşer yanıt vermek için kayıt kaybolur. Anlambilim en fazla bir kez.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1 (Yalnızca Lider)

Üretici yalnızca bölümün önde gelen komisyoncusundan onay bekler. Takipçiler henüz sahip olmayabilir Onay geldiğinde kaydı kopyaladı. Lider, onayı gönderdikten hemen sonra düşerse ancak takipçiler cevap veremeden kayıt kayboldu. Anlambilim en az bir kez çok kısa hata pencerelerinde veri kaybı riski vardır.

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=all (veya acks=-1): Tüm ISR'ler

Yapımcı plağın tüm kayıtlara yazılmasını bekliyor. ISR (Senkronize Kopyalar) bölümün. İle min.insync.replicas=2 e replication.factor=3, bu en azından şu anlama geliyor 2 yanıtın (lider + 1 takipçi) onaylaması gerekir. Ancak o zaman yapımcı onayı alır. Anlambilim en az bir kez en azından veri kaybı olmadan min.insync.replicas komisyoncular aktif.

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

Yeniden Denemede Yinelenen Sorun

Bu senaryoyu göz önünde bulundurun acks=all e retries=3:

  1. Üretici R1 kaydını lider komisyoncuya gönderir
  2. Broker R1'i diske yazar ve üreticiye onay gönderir.
  3. Ack kayboldu (yapımcıya ulaşmadan ağ zaman aşımına uğradı)
  4. Onay alamayan yapımcı içeri girdi request.timeout.ms, yazmanın başarısız olduğunu düşünüyor
  5. Üretici tekrar dener ve tekrar R1'i gönderir.
  6. Aracı, R1'i ikinci kez alır ve bunu ayrı bir kayıt olarak yazar.
  7. Konu artık yinelenen R1 içeriyor

Bu, en az bir kerenin doğru davranışıdır: her veri en az bir kez gelir, ancak kopyalar da gelebilir. Üretici düzeyinde kopyaları ortadan kaldırmak içiniktidarsız yapımcı.

İdempotent Yapımcı

Kafka 0.11 (2017)'de tanıtılan idempotent yapımcı, üreticinin yeniden denemesinden kaynaklanan kopyaları ortadan kaldırır. Mekanizma iki kavrama dayanmaktadır:

  • Üretici Kimliği (PID): İdempotent üretici bağlandığında aracı benzersiz bir PID atar. PID, üreticinin ömrü boyunca varlığını sürdürür; yapımcı yeniden başlarsa yeni bir PID alır.
  • Sıra Numarası: Gönderilen her kayıt monoton olarak artan bir sıra numarası (0, 1, 2, ...) taşır. Aracı, her PID+bölümü için alınan son sıra numarasını takip eder. Bir kayıt gelirse sıra numarası zaten görüldükten sonra sessizce atılır (broker tarafında tekilleştirme).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

Idempotent Üreticinin Sınırı

İdempotent yapımcı her kaydın görünmesini sağlar tam olarak bir kez Kafka günlüğünde yapımcının mevcut oturumu için. Üretici yeniden başlarsa (yeni PID), uçuş sırasında bir kayıt yeniden yazılabilir. Ve hepsinden önemlisi: tüketici işlemlerine ilişkin hiçbir şeyi garanti etmez. Tüketici bir kaydı işlerse ve bir sonraki başlatmada dengelemeyi gerçekleştirmeden önce çökerse aynı kaydı yeniden işleyecektir. Tam olarak bir kez uçtan uca için işlemsel API'ye ihtiyacınız vardır.

max.in.flight.requests.per.connection ve Sıralama

Parametre max.in.flight.requests.per.connection (MIFR) kaç üretim talebinin olduğunu kontrol eder aynı anda tek bir komisyoncuya uçuyor olabilirler. Sıralama üzerinde kritik bir etkisi var yeniden deneme durumunda mesajların listesi:

  • MIFR=1: Her isteğin başka bir istek gönderilmeden önce onaylanması gerekir. Garantili sıralama, ancak verim azaldı (boru hattı yok).
  • İdempotans olmadan MIFR > 1: ardışık düzen etkin, daha yüksek verim, ancak N partisi başarısız olursa ve N-1 grubu zaten uçuş halindedir, N'nin yeniden denenmesinden sonra kayıtlar N-1, N sırasına göre görünür; N-1'den önce gelmesi gerekiyordu. Sıralama artık garanti edilmiyor.
  • MIFR ≤ 5, idempotens ile: ile enable.idempotence=trueKafka garanti eder Sıra numaraları sayesinde uçuş sırasında 5'e kadar istekte bile sıralama. Bu varsayılan değerdir idempotency etkinleştirildiğinde ve garantiyi sürdürmek için maksimum desteklendiğinde.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

Üretim için Eksiksiz Üretici Yapılandırması

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

İleri Düzey Tüketici: Taahhüt ve Yeniden İşleme

Otomatik taahhüt: Basit ama Riskli

İle enable.auto.commit=true (varsayılan), Kafka her seferinde ofseti otomatik olarak gerçekleştirir. auto.commit.interval.ms milisaniye (varsayılan: 5000ms = 5 saniye). Sorun: taahhüt, kayıtların gerçekte ne zaman işlendiğine bakılmaksızın arka planda gerçekleşir.

Otomatik taahhütle ilgili sorunlu senaryo:

  1. Anket, 1-100 uzaklığıyla 100 kayıt döndürür
  2. Tüketici kayıtları işlemeye başlar
  3. 5 saniye sonra zamanlayıcı başlar: ofset 100 otomatik olarak işlenir
  4. Tüketici yalnızca 1-60 arasındaki kayıtları işledikten sonra çöküyor
  5. Yeniden başlatma sırasında tüketici 100 ofseti ile başlar: 61-100 arasındaki kayıtlar atlandı (veri kaybı)

Senkronize Manuel Kaydetme

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

Bölüm Başına Taahhüt (İnce Parçalılık)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

Yeniden dengeleme: Nasıl çalışır ve nasıl yönetilir?

Un yeniden dengelemek tüketici grubu değiştiğinde meydana gelir: bir tüketici girer, çıkar veya çöker. Yeniden dengeleme sırasında gruptaki tüm tüketiciler okumayı bırakırlar (dünyayı durdururlar) ve grup koordinatörü bölümleri yeniden atar. Kafka'nın son sürümlerinde (2.4+) mevcuttur the kooperatiflerin yeniden dengelenmesi (artımlı işbirlikçi yeniden dengeleme) bu da etkiyi azaltır: yalnızca atananları değiştiren bölümler iptal edilir ve yeniden atanır.

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

Gelişmiş Dengeleme Yönetimi ile Python'da Tüketici

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

Performans Ayarlama: Temel Parametreler

Yapımcı tarafı

Parametre Varsayılan Etki
linger.ms 0 Daha büyük partilerin biriktirilmesi için N ms beklenir
batch.size 16384 (16KB) Bölüm başına maksimum toplu iş boyutu
buffer.memory 33554432 (32MB) Tüm gruplar için toplam bellek içi arabellek
compression.type öyle değil hiçbiri/gzip/snappy/lz4/zstd

Tüketici tarafı

Parametre Varsayılan Etki
fetch.min.bytes 1 Getirmeden döndürülecek minimum veri boyutu
fetch.max.wait.ms 500 fetch.min.bayt sayısına ulaşılmadığı takdirde maksimum bekleme süresi
max.poll.records 500 Tek anket için maksimum kayıt()
max.partition.fetch.bytes 1048576 (1MB) Getirme başına bölüm başına maksimum veri

EmbeddedKafka ile Test Etme

Kafka'yı kullanan kodu test etmek için kullanılabilir bir küme gerekir. Birim ve entegrasyon testleri için, Bahar Kafka sunar @EmbeddedKafka test sırasında bir bellek içi aracıyı başlatır:

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Testcontainers ile test yapmak için (daha gerçekçi, gerçek bir Docker aracısı kullanın):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

Serideki Sonraki Adımlar

Gelişmiş üreticiler ve tüketiciler de dahil edildiğinde en karmaşık zorluklarla karşılaşabilirsiniz:

  • Madde 4 – Tam Olarak Bir Kez Anlambilimi: Güvenliği sağlanacak Kafka işlemleri İşlem koordinatörü ve üretim sonuçlarıyla tam olarak uçtan uca işleme. Finansal boru hatları için vazgeçilmezdir.
  • Madde 5 – Kayıt Şeması: Avro ve Protobuf'un Schema Registry ile kullanılması Farklı ekiplerdeki üreticiler ve tüketiciler arasındaki şema uyumsuzluklarını önlemek için.
  • Madde 6 – Kafka Akımları: Streams DSL ile Java'da yerleşik akış işleme, Bu makalede ele alınan aynı üretici ve tüketici API'lerini dahili olarak kullanan.

Diğer Serilerle Bağlantı

  • Mimari (Mikroservislerden Modüler Monolitlere): Kafka üreticileri ve tüketicileri dağıtılmış mimarilerde olaya dayalı modellerin somut bir uygulaması olarak.
  • İleri Java: İş parçacığı güvenliğiyle Kafka'nın Java API'sine derinlemesine bakın, EmbeddedKafka ile yaşam döngüsü yönetimi ve test etme.