Cele trei garanții de livrare

Înainte de a începe configurarea, este esențial să înțelegeți cele trei moduri de livrare pe care le acceptă Kafka și semnificația lor practică:

  • Cel mult-o dată: Mesajul este trimis o singură dată, fără reîncercare. Dacă brokerul nu îl primește, se pierde definitiv. Zero duplicate, posibilă pierdere de date. Acceptabil pentru valori necritice, jurnalele de depanare, evenimente cu flux de clic în care pierderea unor evenimente este tolerabilă.
  • Măcar o dată: Producătorul încearcă din nou dacă există o eroare. Mesajul va sosi cel puțin o dată, dar poate ajunge de mai multe ori (duplicate) dacă brokerul l-a primit, dar nu a trimis confirmarea înainte de expirare. Consumatorul trebuie să fie idempotent să gestioneze duplicatele. Cel mai frecvent scenariu în producție.
  • Exact-o dată: Mesajul este procesat o singură dată, fără pierderi și fără duplicate. Necesită un producător idempotent (pentru partea producătorului exact o dată) sau tranzacții Kafka (pentru exact o dată end-to-end între producător și consumator). Performanță semnificativă.

Regula de aur

Într-un sistem distribuit, este imposibil să se garanteze exact o singură dată fără cheltuieli generale. 90% din sisteme Kafka în producția americană măcar-o dată cu consumatori idempotenți (deduplicare din partea consumatorului prin baza de date sau cache). Exact-o dată prin tranzacțiile Kafka este utilizat pentru conducte financiare, facturare sisteme sau oriunde duplicarea unui eveniment cauzează un rău real.

Parametrul acks: Câte confirmări să așteptați

Parametrul acks al producătorului definește câte replici trebuie să confirme mai întâi primirea că producătorul consideră cererea finalizată:

acks=0 (Foc și uita)

Producătorul trimite înregistrarea și nu așteaptă niciun răspuns din partea brokerului. Debit maxim, latență minimă, dar nicio garanție: dacă brokerul este în jos sau înregistrarea este scrisă unui broker care apoi cade primul pentru a răspunde, înregistrarea se pierde. Semantica este cel mult-o dată.

// Producer Java con acks=0 (at-most-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("acks", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// I retries vengono ignorati con acks=0
// Non c'è feedback dal broker, quindi non c'è errore da ritentare
props.put("retries", 0);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Il send() ritorna immediatamente, nessun callback utile
producer.send(new ProducerRecord<>("click-events", "user123", "{\"action\":\"click\"}"));
// Nessuna garanzia che il broker abbia ricevuto il record

acks=1 (doar lider)

Producătorul așteaptă confirmarea doar de la brokerul principal al partiției. Este posibil ca urmăritorii să nu aibă încă a replicat înregistrarea când sosește confirmarea. Dacă liderul cade imediat după trimiterea confirmării dar înainte ca adepții să fi răspuns, înregistrarea se pierde. Semantică măcar-o dată cu risc de pierdere a datelor în ferestre de eroare foarte scurte.

// Producer con acks=1 (default fino a Kafka 2.8, ora default è "all")
Properties props = new Properties();
props.put("acks", "1");
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// Con acks=1 e retries>0:
// - Record confermato ma broker leader cade prima di replica: perso (no retry possibile)
// - Record non confermato (timeout, errore rete): retry invia di nuovo
//   -> possibile duplicato se broker ha ricevuto ma non risposto

acks=toate (sau acks=-1): Toate ISR-urile

Producătorul așteaptă ca înregistrarea să fie scrisă pe toate ISR (replici în sincronizare) a partiției. Cu min.insync.replicas=2 e replication.factor=3, asta înseamnă că cel puțin 2 răspunsuri (lider + 1 urmăritor) trebuie să confirme. Abia atunci producătorul primește ack-ul. Semantică măcar-o dată fără pierderi de date până cel puțin min.insync.replicas brokerii sunt activi.

// Producer con acks=all: massima durabilità
Properties props = new Properties();
props.put("acks", "all");  // equivalente a "-1"
props.put("retries", Integer.MAX_VALUE);
props.put("retry.backoff.ms", 100);
props.put("max.block.ms", 60000);  // attende fino a 60s se il broker è congestionato

// ATTENZIONE: senza idempotenza abilitata, retries su acks=all
// possono creare duplicati se il broker ha ricevuto il record
// ma ha fallito nell'inviare l'ack (es. timeout di rete)

// Configurazione topic (broker-side): min.insync.replicas=2
// Se meno di 2 repliche sono disponibili, il broker ritorna
// NotEnoughReplicasException -> il producer riprova

Problema duplicat cu Reîncercați

Luați în considerare acest scenariu cu acks=all e retries=3:

  1. Producătorul trimite înregistrarea R1 brokerului principal
  2. Brokerul scrie R1 pe disc și trimite ack producătorului
  3. Ack-ul este pierdut (rețeaua expiră înainte de a ajunge la producător)
  4. Producătorul, neprimind ack, intră request.timeout.ms, crede că scrierea a eșuat
  5. Producătorul încearcă din nou și trimite din nou R1
  6. Brokerul primește R1 a doua oară și îl scrie ca înregistrare separată
  7. Subiectul conține acum duplicatul R1

Acesta este comportamentul corect al cel puțin o dată: fiecare dată sosește cel puțin o dată, dar pot ajunge duplicate. Pentru a elimina duplicatele la nivel de producător, utilizațiproducător idempotent.

Producătorul Idempotent

Producătorul idempotent, introdus în Kafka 0.11 (2017), elimină duplicatele cauzate de reîncercarea producătorului. Mecanismul se bazează pe două concepte:

  • ID producător (PID): Când producătorul idempotent se conectează, brokerul atribuie un PID unic. PID persistă pe toată durata de viață a producătorului; dacă producătorul repornește, primește un nou PID.
  • Numărul de secvență: fiecare înregistrare trimisă poartă un număr de secvență monoton crescător (0, 1, 2, ...). Brokerul ține evidența ultimului număr de secvență primit pentru fiecare PID+partiție. Dacă ajunge un record cu numărul de secvență deja văzut, este eliminat în tăcere (deduplicare pe partea brokerului).
// Abilitare l'idempotent producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Abilitare idempotenza
props.put("enable.idempotence", true);

// Con enable.idempotence=true, questi valori vengono impostati automaticamente:
// acks = "all"
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection = 5
// Se li impostassi a valori incompatibili, Kafka lancerebbe ConfigException

// Opzionale ma consigliato: linger e batch per performance
props.put("linger.ms", 5);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Ora i retry non creano duplicati nel log Kafka
// La garanzia è "exactly-once lato producer" (non end-to-end)

Limita Producătorului Idempotent

Producătorul idempotent se asigură că fiecare înregistrare apare exact o dată în jurnalul Kafka pentru sesiunea curentă a producătorului. Dacă producătorul repornește (nou PID), o înregistrare în zbor ar putea fi rescris. Și mai presus de toate: nu garantează nimic în ceea ce privește procesarea consumatorului. Dacă consumatorul procesează o înregistrare și apoi se blochează înainte de a efectua compensarea, la următoarea pornire va reprocesa aceeași înregistrare. Pentru exact o dată, de la capăt la capăt, aveți nevoie de API-ul tranzacțional.

max.în.flight.cereri.per.conexiune și sortare

Parametrul max.in.flight.requests.per.connection (MIFR) controlează câte cereri de producție pot zbura către un singur broker în același timp. Are un impact critic asupra sortării de mesaje în caz de reîncercare:

  • MIFR=1: fiecare cerere trebuie confirmată înainte de a trimite alta. sortare garantata, dar debit redus (fără conducte).
  • MIFR > 1 fara idempotenta: pipelining activ, debit mai mare, dar dacă lotul N eșuează iar lotul N-1 este deja în zbor, după reîncercarea lui N înregistrările apar în ordinea N-1, N când N ar fi trebuia să precedă N-1. Sortarea nu mai este garantată.
  • MIFR ≤ 5 cu idempotenta: cu enable.idempotence=true, garantează Kafka sortare chiar și cu până la 5 solicitări în zbor, datorită numerelor de ordine. Este valoarea implicită când idempotenta este activată și maxim acceptată pentru a menține garanția.
// SCENARIO 1: Ordinamento garantito senza idempotenza
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 1);  // nessun pipelining
// Througput limitato ma ordine garantito

// SCENARIO 2: Throughput massimo, no garanzie ordine (analytics)
props.put("enable.idempotence", false);
props.put("max.in.flight.requests.per.connection", 10);
props.put("acks", "1");
// Massimo throughput, possibili riordini in caso di retry

// SCENARIO 3: Produzione standard (consigliato)
props.put("enable.idempotence", true);
// max.in.flight viene impostato a 5 automaticamente
// acks viene impostato a "all" automaticamente
// Buon throughput + ordine garantito + no duplicati lato producer

Configurație completă a producătorului pentru producție

// ProducerFactory.java: factory per producer production-ready
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerFactory {

    /**
     * Crea un producer ottimizzato per throughput elevato
     * con garanzie at-least-once e idempotenza abilitata.
     */
    public static KafkaProducer<String, byte[]> createHighThroughputProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        // Connessione
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Durabilità e idempotenza
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // acks=all e retries=MAX impostati automaticamente

        // Batching: aspetta fino a 10ms per accumulare records
        // Riduce numero di richieste al broker, aumenta throughput
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);     // 64KB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB buffer totale

        // Compressione: snappy è bilanciata tra CPU e ratio
        // lz4 per massima velocità; zstd per massimo ratio
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // Timeout: se il buffer è pieno, blocca fino a max.block.ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);

        // Request timeout: quanto aspettare risposta dal broker
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);

        // Delivery timeout: timeout totale inclusi tutti i retry
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

        return new KafkaProducer<>(props);
    }

    /**
     * Crea un producer per eventi critici (financial, billing)
     * con latenza minima e massime garanzie.
     */
    public static KafkaProducer<String, byte[]> createLowLatencyProducer(
            String bootstrapServers) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.ByteArraySerializer");

        // Massima durabilità
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Nessun batching: invia immediatamente ogni record
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);

        // Nessuna compressione: elimina latenza CPU
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");

        return new KafkaProducer<>(props);
    }
}

Consumator avansat: commit și reprocesare

Auto-commit: simplu, dar riscant

Cu enable.auto.commit=true (implicit), Kafka comite automat offset-ul fiecare auto.commit.interval.ms milisecunde (implicit: 5000ms = 5 secunde). Problema: commit-ul are loc în fundal, indiferent de momentul în care înregistrările sunt procesate efectiv.

Scenariu problematic cu auto-commit:

  1. Sondajul returnează 100 de înregistrări cu offset 1-100
  2. Consumatorul începe procesarea înregistrărilor
  3. După 5 secunde pornește cronometrul: offset-ul 100 este activat automat
  4. Consumatorul se blochează după procesare doar înregistrările 1-60
  5. La repornire, consumatorul începe de la offset 100: înregistrările 61-100 au fost omise (pierdere de date)

Comitere manuală sincronă

// Consumer con commit manuale sincrono (at-least-once)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("group.id", "ordini-processor");
props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");

// Controlla quanti record vengono restituiti per poll()
props.put("max.poll.records", 500);

// Se l'elaborazione richiede piu di questo, Kafka considera il consumer morto
// e fa un rebalance (assegna le partizioni a un altro consumer del gruppo)
props.put("max.poll.interval.ms", 300000);  // 5 minuti

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

try {
    while (true) {
        ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {
            elaboraOrdine(record);
        }

        // commitSync() bloca fino a conferma dal broker
        // In caso di errore, lancia CommitFailedException
        // Garantisce at-least-once: il commit avviene dopo l'elaborazione
        consumer.commitSync();
    }
} catch (CommitFailedException e) {
    // Il consumer ha superato max.poll.interval.ms: ha perso le partizioni
    // Gestisci il rebalance e riprendi
    log.error("Commit fallito, probabile rebalance", e);
}

Commit per partiție (granularitate fină)

// Commit offset per partizione specifica dopo elaborazione
// Utile quando si elaborano batch grandi e si vuole commit incrementale

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("ordini-effettuati"));

while (true) {
    ConsumerRecords<String, String> records =
        consumer.poll(Duration.ofMillis(100));

    // Raggruppa i record per partizione
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    for (ConsumerRecord<String, String> record : records) {
        // Elabora il record
        elaboraOrdine(record);

        // Traccia l'offset per questa partizione
        // IMPORTANTE: commita offset+1 (il prossimo record da leggere)
        offsets.put(
            new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1)
        );
    }

    // Commit di tutti gli offset accumulati
    consumer.commitSync(offsets);
}

Reechilibrare: cum funcționează și cum să o gestionezi

Un reechilibrare apare atunci când grupul de consumatori se schimbă: un consumator intră, iese sau se prăbușește. În timpul reechilibrării, toți consumatorii din grup se opresc din citit (oprește-lumea) și coordonatorul grupului reatribuie partiții. În versiunile recente de Kafka (2.4+) este disponibil cel reechilibrarea cooperativelor (reechilibrare cooperativă incrementală) care reduce impactul: sunt revocate și reatribuite numai partițiile care schimbă cesionari.

// Implementare ConsumerRebalanceListener per gestire rebalance gracefully
consumer.subscribe(
    Collections.singletonList("ordini-effettuati"),
    new ConsumerRebalanceListener() {

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // Chiamato prima che le partizioni vengano riassegnate
            // Commita gli offset delle partizioni che stiamo per perdere
            log.info("Partizioni revocate: {}", partitions);

            // Commit degli offset non ancora committati per le partizioni revocate
            Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
            for (TopicPartition tp : partitions) {
                if (currentOffsets.containsKey(tp)) {
                    toCommit.put(tp, currentOffsets.get(tp));
                }
            }
            if (!toCommit.isEmpty()) {
                consumer.commitSync(toCommit);
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Chiamato dopo che le nuove partizioni sono state assegnate
            log.info("Partizioni assegnate: {}", partitions);
            // Possiamo inizializzare state locale per le nuove partizioni
        }
    }
);

Consumator în Python cu Advanced Offset Management

from confluent_kafka import Consumer, TopicPartition, KafkaError
from typing import Dict
import logging

log = logging.getLogger(__name__)

class AdvancedKafkaConsumer:
    """Consumer Kafka con commit manuale e gestione rebalance."""

    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.topic = topic
        self.pending_offsets: Dict[tuple, int] = {}

        config = {
            "bootstrap.servers": bootstrap_servers,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            # Cooperative rebalance: meno interruzioni
            "partition.assignment.strategy": "cooperative-sticky",
            "session.timeout.ms": 30000,
            "max.poll.interval.ms": 300000,
        }

        self.consumer = Consumer(config)
        self.consumer.subscribe([topic], on_assign=self._on_assign,
                                 on_revoke=self._on_revoke)

    def _on_assign(self, consumer, partitions):
        log.info(f"Assegnate partizioni: {[p.partition for p in partitions]}")

    def _on_revoke(self, consumer, partitions):
        """Commita gli offset prima di perdere le partizioni."""
        log.info(f"Revocate partizioni: {[p.partition for p in partitions]}")
        to_commit = []
        for p in partitions:
            key = (p.topic, p.partition)
            if key in self.pending_offsets:
                to_commit.append(
                    TopicPartition(p.topic, p.partition,
                                   self.pending_offsets[key])
                )
        if to_commit:
            consumer.commit(offsets=to_commit, asynchronous=False)

    def process_messages(self, handler_fn, batch_size: int = 100):
        """Loop principale di elaborazione."""
        batch_count = 0

        try:
            while True:
                msg = self.consumer.poll(timeout=1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        log.error(f"Errore Kafka: {msg.error()}")
                    continue

                # Elabora il messaggio
                try:
                    handler_fn(msg)

                    # Traccia l'offset (offset+1 = prossimo da leggere)
                    self.pending_offsets[(msg.topic(), msg.partition())] = \
                        msg.offset() + 1
                    batch_count += 1

                except Exception as e:
                    log.error(f"Errore elaborazione offset {msg.offset()}: {e}")
                    # Qui potresti implementare una DLQ (Dead Letter Queue)
                    # o skippare il record problematico

                # Commit ogni N record
                if batch_count >= batch_size:
                    self._commit_pending()
                    batch_count = 0

        finally:
            # Commit finale prima di chiudere
            self._commit_pending()
            self.consumer.close()

    def _commit_pending(self):
        """Commita tutti gli offset pendenti."""
        if not self.pending_offsets:
            return
        offsets = [
            TopicPartition(topic, partition, offset)
            for (topic, partition), offset in self.pending_offsets.items()
        ]
        self.consumer.commit(offsets=offsets, asynchronous=False)
        self.pending_offsets.clear()
        log.debug(f"Committati {len(offsets)} offset")

Ajustarea performanței: parametri cheie

Partea producătorului

Parametru Implicit Efect
linger.ms 0 Așteaptă N ms pentru a acumula loturi mai mari
batch.size 16384 (16KB) Dimensiunea maximă a lotului pe partiție
buffer.memory 33554432 (32MB) Buffer total în memorie pentru toate loturile
compression.type nu este none/gzip/snappy/lz4/zstd

Partea consumatorului

Parametru Implicit Efect
fetch.min.bytes 1 Dimensiunea minimă a datelor de returnat de la preluare
fetch.max.wait.ms 500 Așteptare maximă dacă fetch.min.bytes nu este atins
max.poll.records 500 Înregistrări maxime pentru un singur sondaj()
max.partition.fetch.bytes 1048576 (1MB) Date maxime pe partiție per preluare

Testarea cu EmbeddedKafka

Testarea codului care utilizează Kafka necesită un cluster disponibil. Pentru testele unitare și de integrare, Spring Kafka livrează @EmbeddedKafka care pornește un broker în memorie în timpul testării:

// Dipendenza Maven
// <dependency>
//   <groupId>org.springframework.kafka</groupId>
//   <artifactId>spring-kafka-test</artifactId>
//   <scope>test</scope>
// </dependency>

import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"ordini-test"},
    brokerProperties = {
        "auto.create.topics.enable=false",
        "default.replication.factor=1"
    }
)
class OrdineProducerTest {

    @Autowired
    private OrdineProducer producer;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    void shouldSendOrderSuccessfully() throws Exception {
        // Crea un consumer di test per verificare che il messaggio sia arrivato
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
            "test-group", "true", embeddedKafka
        );
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "ordini-test");

        // Invia un ordine
        producer.inviaOrdine("order-001", "{\"id\":\"order-001\",\"total\":99.99}");

        // Verifica che il messaggio sia stato ricevuto entro 3 secondi
        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(3));

        assertThat(records.count()).isEqualTo(1);
        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("order-001");
        assertThat(record.value()).contains("99.99");

        consumer.close();
    }
}

Pentru testarea cu Testcontainers (mai realist, utilizați un broker Docker real):

// Con Testcontainers: broker reale Docker durante i test
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class OrdineProducerIntegrationTest {

    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("apache/kafka:4.0.0")
    );

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }

    @Test
    void testConBrokerReale() {
        // Test con broker Kafka 4.0 reale in Docker
        // Garantisce che il comportamento corrisponda alla produzione
    }
}

Următorii pași din serie

Cu producători și consumatori avansați incluși, puteți face față celei mai complexe provocări:

  • Articolul 4 – Semantica Exact O dată: tranzacții Kafka pentru a se asigura exact procesare end-to-end, cu coordonatorul tranzacției și implicațiile de debit. Indispensabil pentru conductele financiare.
  • Articolul 5 – Schema Registrului: Cum să utilizați Avro și Protobuf cu Schema Registry pentru a evita incompatibilitățile de schemă între producători și consumatori din diferite echipe.
  • Articolul 6 – Kafka Streams: procesarea fluxului încorporat în Java cu Streams DSL, care utilizează în mod intern aceleași API-uri pentru producători și consumatori explorate în acest articol.

Legătură cu alte serii

  • Arhitectură (de la microservicii la monolit modular): producătorii și consumatorii Kafka ca implementare concretă a tiparelor bazate pe evenimente în arhitecturile distribuite.
  • Java avansat: Aprofundare în API-ul Java Kafka cu siguranță pentru fire, managementul ciclului de viață și testarea cu EmbeddedKafka.