Proč Kafka Streams místo Flink nebo Spark

Když mluvíme o streamovém zpracování s Kafkou, přirozená otázka zní: proč to používat Kafka Streams místo Apache Flink nebo Spark Streaming? Odpověď závisí na kontextu. Kafka Streams má jedinečnou výhodu: a Java knihovna (nikoli samostatný rámec), který běží uvnitř vaší aplikace, není potřeba vyhrazeného clusteru, bez správce úloh, bez samostatného nasazení.

Pro týmy, které chtějí do své aplikace Java/Kotlin přidat zpracování datových proudů bez zvýšení provozní složitosti je Kafka Streams přirozenou volbou. Potrubí komplexní analytika s víceproudými dočasnými spojeními, komplexní agregace dat distribuované nebo pracovní zátěže, které vyžadují nezávislé škálování, je Flink tou nejlepší volbou (viz článek 08 řady).

Co se naučíte

  • Architektura Kafka Streams: topologie, streamová vlákna, úlohy
  • KSream vs KTable: koncepční a praktický rozdíl
  • DSL streamy: filtr, mapa, plochá mapa, groupBy, agregace
  • State Store: embedded RocksDB, v paměti, changelogy
  • Operace s okny: klopení, přeskakování, okna relace
  • Spojení KStream-KTable: Obohacení streamů o referenční data
  • Zpracování chyb a fronty nedoručených dopisů v Kafka Streams

Architektura: Topologie, úloha a oddíl

Aplikace Kafka Streams definuje a topologie: DAG (Směrovaný acyklický graf) zpracovatelských operátorů. Každý uzel je operace (filtr, mapa, agregát), každá hrana je proud záznamů. Kafkovy proudy zkompilovat tuto topologii úkoly, jeden pro každý oddíl zdrojového tématu.

// Dipendenze Maven
// <dependency>
//   <groupId>org.apache.kafka</groupId>
//   <artifactId>kafka-streams</artifactId>
//   <version>3.7.0</version>
// </dependency>

// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");

// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");

KSream vs KTable: Základní dualismus

Nejdůležitějším konceptem Kafka Streams je rozlišení mezi KStream a KTable. Pochopení tohoto rozdílu je zásadní pro navrhování správných topologií.

StreamsBuilder builder = new StreamsBuilder();

// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- evento diverso

// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"]  <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED

// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");

DSL streamy: základní operace

// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();

// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");

// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
    .filter((key, value) -> value != null && value.contains("order_id"));

// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
    .mapValues(value -> parseOrder(value));

// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
    .flatMapValues(order -> order.getItems());

// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId());

// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
    .peek((userId, order) ->
        log.debug("Processing order {} for user {}", order.getOrderId(), userId));

// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
    .split(Named.as("branch-"))
    .branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
    .branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
    .defaultBranch(Branched.as("low-value"));

KStream<String, Order> highValueOrders = branches.get("branch-high-value");

// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
    .mapValues(Order::toJson)
    .to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));

Agregace se státním obchodem

Stavové agregace jsou srdcem Kafka Streams. Každá agregace udržuje jeden Státní obchod local (standardně RocksDB), který zachovává hodnoty meziprodukty. Státní obchod má a téma changelogu korespondent na Kafka za odolnost proti chybám: pokud dojde k havárii instance, lze stav obnovit.

// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()  // Raggruppa per chiave corrente (user_id)
    .count(Materialized.as("orders-count-store"));
    // Il risultato e un KTable: user_id -> count

// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey(Grouped.with(Serdes.String(),
        // Serve Serde custom per Order
        new OrderSerde()))
    .aggregate(
        () -> 0.0,                          // Initializer: valore iniziale
        (userId, order, total) ->           // Aggregator: combina accumulatore e nuovo record
            total + order.getAmount(),
        Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Double())
    );

// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
    .toStream()
    .mapValues(total -> String.format("{\"total\": %.2f}", total))
    .to("user-spend-summary");

Operace s okny: Časové agregace

Operace s okny umožňují agregovat záznamy na základě časových intervalů. Kafka Streams podporuje čtyři typy oken, každý s odlišnou sémantikou.

// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("orders-per-5min-store"));

ordersPerWindow.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),  // user_id
        String.format(
            "{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
            windowedKey.key(),
            windowedKey.window().start(),
            count
        )
    ))
    .to("orders-per-window");

// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(
            Duration.ofMinutes(10),   // Dimensione finestra
            Duration.ofMinutes(1)     // Grace period per late arrivals
        ).advanceBy(Duration.ofMinutes(5))  // Avanza di 5 min (hopping)
    )
    .count();

// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapAndGrace(
        Duration.ofMinutes(30),
        Duration.ofMinutes(5)
    ))
    .count(Materialized.as("session-store"));

// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
    .selectKey((orderId, order) -> order.getUserId())
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
    .count();

KStream-KTable Join: Obohacení streamu

Jedním z nejčastějších případů použití v Kafka Streams je obohacení streamu událostí se statickými nebo pomalu se měnícími referenčními údaji: obohaťte objednávky o podrobnosti o produktu, přidat uživatelské jméno do události přihlášení atd.

// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();

// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), new OrderSerde())
);

// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
    "products-catalog",
    Consumed.with(Serdes.String(), new ProductSerde())
);

// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
    .selectKey((orderId, order) ->
        order.getItems().get(0).getProductId()  // Reparticiona per product_id
    );

// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
    products,
    (order, product) -> {
        if (product == null) {
            // Prodotto non trovato: usa dati parziali
            return EnrichedOrder.fromOrder(order);
        }
        return EnrichedOrder.fromOrderAndProduct(order, product);
    }
);

enrichedOrders
    .selectKey((productId, enriched) -> enriched.getOrderId())
    .mapValues(EnrichedOrder::toJson)
    .to("orders-enriched");

Zpracování chyb ve streamech Kafka

// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione

// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
    CustomProductionExceptionHandler.class.getName());

// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
    private final KafkaProducer<byte[], byte[]> dlqProducer;

    public DLQProductionExceptionHandler() {
        Properties dlqProps = new Properties();
        dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
        dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            ByteArraySerializer.class);
        this.dlqProducer = new KafkaProducer<>(dlqProps);
    }

    @Override
    public ProductionExceptionHandlerResponse handle(
            ProducerRecord<byte[], byte[]> record,
            Exception exception) {
        log.error("Errore produzione per record key={}", new String(record.key()), exception);

        // Invia il record problematico alla DLQ con metadata dell'errore
        Headers headers = new RecordHeaders();
        headers.add("error-message", exception.getMessage().getBytes());
        headers.add("error-type", exception.getClass().getSimpleName().getBytes());
        headers.add("source-topic", record.topic().getBytes());
        headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
            record.topic() + ".dlq",  // {topic-name}.dlq
            null,
            record.key(),
            record.value(),
            headers
        );

        dlqProducer.send(dlqRecord);

        // CONTINUE: skippa il record e va avanti (non blocca lo stream)
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
}

State Store: Přístup prostřednictvím interaktivních dotazů

// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "orders-count-store",
        QueryableStoreTypes.keyValueStore()
    )
);

// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);

// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
    KeyValue<String, Long> kv = allCounts.next();
    System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close();  // Importante: chiudi sempre l'iterator

// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
    keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()

Závěry a kdy použít Kafka Streams

Kafka Streams září ve scénářích, ke kterým chcete přidat zpracování streamu existující Java/Kotlin aplikace bez vyhrazeného clusteru: agregace v reálném čase, spojení mezi proudy a referenčními tabulkami, transformace a obohacení dat. Knihovna spravuje odolnost proti chybám, rozdělování a škálování vodorovně průhledným způsobem.

U složitějších analytických kanálů, které vyžadují dočasná spojení mezi proudy, CEP (Complex Event Processing) nebo integrace s datovým jezerem (Iceberg, Delta), Článek 08 ukazuje, jak je Kafka + Apache Flink de facto standardem v roce 2026.

Kompletní série: Apache Kafka

  • článek 01 — Základy Apache Kafky
  • článek 02 — KRaft v Kafce 4.0
  • článek 03 — Pokročilý výrobce a spotřebitel
  • článek 04 — Sémantika přesně jednou u Kafky
  • článek 05 — Registr schémat: Avro a Protobuf
  • článek 06 (tento) — Kafka Streams: Stream Processing zabudovaný v Javě
  • článek 07 — Kafka Connect: Debezium CDC a integrace DB
  • článek 08 — Kafka + Apache Flink: Pipeline Analytics v reálném čase