De ce Kafka transmite în flux în loc de Flink sau Spark

Când vorbim despre procesarea fluxului cu Kafka, întrebarea firească este: de ce să-l folosești Kafka Streams în loc de Apache Flink sau Spark Streaming? Răspunsul depinde de context. Kafka Streams are un avantaj unic: și a Biblioteca Java (nu un cadru separat) care rulează în aplicația dvs., nu este nevoie a unui cluster dedicat, fără manager de locuri de muncă, fără implementare separată.

Pentru echipele care doresc să adauge procesarea fluxului la aplicația lor Java/Kotlin fără a adăuga complexitate operațională, Kafka Streams este alegerea naturală. Prin conductă analiză complexă cu îmbinări temporale multi-stream, agregări complexe pe date distribuite sau încărcături de lucru care necesită scalare independentă, Flink este cea mai bună alegere (a se vedea articolul 08 din serie).

Ce vei învăța

  • Arhitectura Kafka Streams: topologie, fire de stream, sarcini
  • KStream vs KTable: diferența conceptuală și practică
  • Fluxuri DSL: filtru, hartă, flatMap, groupBy, agregat
  • Magazin de stat: RocksDB încorporat, în memorie, jurnalele de modificări
  • Operațiuni pe fereastră: tumbling, hop, ferestre de sesiune
  • KStream-KTable join: Îmbogățirea fluxurilor cu date de referință
  • Gestionarea erorilor și cozile cu scrisori moarte în Kafka Streams

Arhitectură: Topologie, Sarcină și Partiție

O aplicație Kafka Streams definește a topologie: un DAG (Grafic Aciclic Dirijat) al operatorilor de procesare. Fiecare nod este o operație (filtru, hartă, agregat), fiecare margine este un flux de înregistrări. Kafka Streams compilați această topologie în sarcini, câte unul pentru fiecare partiție a subiectului sursă.

// Dipendenze Maven
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-streams</artifactId>
//   <version>3.7.0</version>
// </dependency>

// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KStream vs KTable: Dualism fundamental

Cel mai important concept al Kafka Streams este distincția dintre KStream și KTable. Înțelegerea acestei diferențe este fundamentală pentru proiectarea topologiilor corecte.

StreamsBuilder builder = new StreamsBuilder();

// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- evento diverso

// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED

// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");

Fluxuri DSL: operațiuni fundamentale

// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();

// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");

// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
    .filter((key, value) -> value != null && value.contains("order_id"));

// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
    .mapValues(value -> parseOrder(value));

// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
    .flatMapValues(order -> order.getItems());

// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId());

// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
    .peek((userId, order) ->
        log.debug("Processing order {} for user {}", order.getOrderId(), userId));

// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
    .split(Named.as("branch-"))
    .branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
    .branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
    .defaultBranch(Branched.as("low-value"));

KStream<String, Order> highValueOrders = branches.get("branch-high-value");

// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
    .mapValues(Order::toJson)
    .to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));

Agregări cu State Store

Agregările cu state sunt inima Kafka Streams. Fiecare agregare menține unul Magazin de stat local (RocksDB în mod implicit) care persistă valorile intermediari. Magazinul de stat are un subiect de jurnal de modificări corespondent pe Kafka pentru toleranța la erori: dacă o instanță se prăbușește, statul poate fi reconstituit.

// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()  // Raggruppa per chiave corrente (user_id)
    .count(Materialized.as("orders-count-store"));
    // Il risultato e un KTable: user_id -> count

// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey(Grouped.with(Serdes.String(),
        // Serve Serde custom per Order
        new OrderSerde()))
    .aggregate(
        () -> 0.0,                          // Initializer: valore iniziale
        (userId, order, total) ->           // Aggregator: combina accumulatore e nuovo record
            total + order.getAmount(),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    );

// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
    .toStream()
    .mapValues(total -> String.format("{\"total\": %.2f}", total))
    .to("user-spend-summary");

Operațiuni cu ferestre: Agregații temporale

Operațiile cu fereastră vă permit să agregați înregistrări pe intervale de timp. Kafka Streams acceptă patru tipuri de ferestre, fiecare cu o semantică distinctă.

// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("orders-per-5min-store"));

ordersPerWindow.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),  // user_id
        String.format(
            "{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
            windowedKey.key(),
            windowedKey.window().start(),
            count
        )
    ))
    .to("orders-per-window");

// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(
            Duration.ofMinutes(10),   // Dimensione finestra
            Duration.ofMinutes(1)     // Grace period per late arrivals
        ).advanceBy(Duration.ofMinutes(5))  // Avanza di 5 min (hopping)
    )
    .count();

// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapAndGrace(
        Duration.ofMinutes(30),
        Duration.ofMinutes(5)
    ))
    .count(Materialized.as("session-store"));

// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

KStream-KTable Join: Îmbogățirea fluxului

Unul dintre cele mai frecvente cazuri de utilizare în Kafka Streams este îmbogățirea unui flux de evenimente cu date de referință statice sau care se schimbă lent: îmbogățiți comenzile cu detaliile produsului, adăugați numele de utilizator la evenimentul de conectare etc.

// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();

// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), new OrderSerde())
);

// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
    "products-catalog",
    Consumed.with(Serdes.String(), new ProductSerde())
);

// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
    .selectKey((orderId, order) ->
        order.getItems().get(0).getProductId()  // Reparticiona per product_id
    );

// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
    products,
    (order, product) -> {
        if (product == null) {
            // Prodotto non trovato: usa dati parziali
            return EnrichedOrder.fromOrder(order);
        }
        return EnrichedOrder.fromOrderAndProduct(order, product);
    }
);

enrichedOrders
    .selectKey((productId, enriched) -> enriched.getOrderId())
    .mapValues(EnrichedOrder::toJson)
    .to("orders-enriched");

Gestionarea erorilor în fluxurile Kafka

// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione

// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
    CustomProductionExceptionHandler.class.getName());

// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
    private final KafkaProducer<byte[], byte[]> dlqProducer;

    public DLQProductionExceptionHandler() {
        Properties dlqProps = new Properties();
        dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
        dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        this.dlqProducer = new KafkaProducer<>(dlqProps);
    }

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        log.error("Errore produzione per record key={}", new String(record.key()), exception);

        // Invia il record problematico alla DLQ con metadata dell'errore
        Headers headers = new RecordHeaders();
        headers.add("error-message", exception.getMessage().getBytes());
        headers.add("error-type", exception.getClass().getSimpleName().getBytes());
        headers.add("source-topic", record.topic().getBytes());
        headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
            record.topic() + ".dlq",  // {topic-name}.dlq
            null,
            record.key(),
            record.value(),
            headers
        );

        dlqProducer.send(dlqRecord);

        // CONTINUE: skippa il record e va avanti (non blocca lo stream)
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

Magazin de stat: Acces prin interogări interactive

// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "orders-count-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);

// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
    KeyValue<String, Long> kv = allCounts.next();
    System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close();  // Importante: chiudi sempre l'iterator

// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
    keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()

Concluzii și când să folosiți fluxurile Kafka

Kafka Streams strălucește în scenariile în care doriți să adăugați procesarea fluxului o aplicație Java/Kotlin existentă fără un cluster dedicat: agregări în timp real, îmbinări între fluxuri și tabele de referință, transformări și îmbogățire a datelor. Biblioteca gestionează toleranța la erori, partiționarea și scalarea orizontală într-un mod transparent.

Pentru conducte de analiză mai complexe care necesită îmbinări temporale încrucișate, CEP (Procesare complexă a evenimentelor) sau integrare cu un lac de date (Iceberg, Delta), Articolul 08 arată cum Kafka + Apache Flink este standardul de facto în 2026.

Seria completă: Apache Kafka

  • Articolul 01 — Fundamentele Apache Kafka
  • Articolul 02 — KRaft în Kafka 4.0
  • Articolul 03 — Producător și Consumator avansat
  • Articolul 04 — Exact-O dată Semantică în Kafka
  • Articolul 05 — Schema Registry: Avro și Protobuf
  • Articolul 06 (acest) — Kafka Streams: Procesarea fluxului încorporată în Java
  • Articolul 07 — Kafka Connect: Debezium CDC și integrare DB
  • Articolul 08 — Kafka + Apache Flink: Pipeline Analytics în timp real