Kafka Streams: Zpracování streamu vložené do Java, KTable a Windowing
Kafka Streams je Java knihovna pro zpracování streamů přímo v aplikaci bez vyhrazeného clusteru. Objevte streamy DSL, KTables, státní obchody s RocksDB a operace oken pro časové agregace.
Proč Kafka Streams místo Flink nebo Spark
Když mluvíme o streamovém zpracování s Kafkou, přirozená otázka zní: proč to používat Kafka Streams místo Apache Flink nebo Spark Streaming? Odpověď závisí na kontextu. Kafka Streams má jedinečnou výhodu: a Java knihovna (nikoli samostatný rámec), který běží uvnitř vaší aplikace, není potřeba vyhrazeného clusteru, bez správce úloh, bez samostatného nasazení.
Pro týmy, které chtějí do své aplikace Java/Kotlin přidat zpracování datových proudů bez zvýšení provozní složitosti je Kafka Streams přirozenou volbou. Potrubí komplexní analytika s víceproudými dočasnými spojeními, komplexní agregace dat distribuované nebo pracovní zátěže, které vyžadují nezávislé škálování, je Flink tou nejlepší volbou (viz článek 08 řady).
Co se naučíte
- Architektura Kafka Streams: topologie, streamová vlákna, úlohy
- KSream vs KTable: koncepční a praktický rozdíl
- DSL streamy: filtr, mapa, plochá mapa, groupBy, agregace
- State Store: embedded RocksDB, v paměti, changelogy
- Operace s okny: klopení, přeskakování, okna relace
- Spojení KStream-KTable: Obohacení streamů o referenční data
- Zpracování chyb a fronty nedoručených dopisů v Kafka Streams
Architektura: Topologie, úloha a oddíl
Aplikace Kafka Streams definuje a topologie: DAG (Směrovaný acyklický graf) zpracovatelských operátorů. Každý uzel je operace (filtr, mapa, agregát), každá hrana je proud záznamů. Kafkovy proudy zkompilovat tuto topologii úkoly, jeden pro každý oddíl zdrojového tématu.
// Dipendenze Maven
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.7.0</version>
// </dependency>
// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KSream vs KTable: Základní dualismus
Nejdůležitějším konceptem Kafka Streams je rozlišení mezi KStream a KTable. Pochopení tohoto rozdílu je zásadní pro navrhování správných topologií.
StreamsBuilder builder = new StreamsBuilder();
// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- evento diverso
// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED
// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");
DSL streamy: základní operace
// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();
// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");
// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
.filter((key, value) -> value != null && value.contains("order_id"));
// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
.mapValues(value -> parseOrder(value));
// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
.flatMapValues(order -> order.getItems());
// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId());
// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
.peek((userId, order) ->
log.debug("Processing order {} for user {}", order.getOrderId(), userId));
// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
.split(Named.as("branch-"))
.branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
.branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
KStream<String, Order> highValueOrders = branches.get("branch-high-value");
// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
.mapValues(Order::toJson)
.to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));
Agregace se státním obchodem
Stavové agregace jsou srdcem Kafka Streams. Každá agregace udržuje jeden Státní obchod local (standardně RocksDB), který zachovává hodnoty meziprodukty. Státní obchod má a téma changelogu korespondent na Kafka za odolnost proti chybám: pokud dojde k havárii instance, lze stav obnovit.
// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey() // Raggruppa per chiave corrente (user_id)
.count(Materialized.as("orders-count-store"));
// Il risultato e un KTable: user_id -> count
// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey(Grouped.with(Serdes.String(),
// Serve Serde custom per Order
new OrderSerde()))
.aggregate(
() -> 0.0, // Initializer: valore iniziale
(userId, order, total) -> // Aggregator: combina accumulatore e nuovo record
total + order.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
.toStream()
.mapValues(total -> String.format("{\"total\": %.2f}", total))
.to("user-spend-summary");
Operace s okny: Časové agregace
Operace s okny umožňují agregovat záznamy na základě časových intervalů. Kafka Streams podporuje čtyři typy oken, každý s odlišnou sémantikou.
// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("orders-per-5min-store"));
ordersPerWindow.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(), // user_id
String.format(
"{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
windowedKey.key(),
windowedKey.window().start(),
count
)
))
.to("orders-per-window");
// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // Dimensione finestra
Duration.ofMinutes(1) // Grace period per late arrivals
).advanceBy(Duration.ofMinutes(5)) // Avanza di 5 min (hopping)
)
.count();
// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(30),
Duration.ofMinutes(5)
))
.count(Materialized.as("session-store"));
// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
KStream-KTable Join: Obohacení streamu
Jedním z nejčastějších případů použití v Kafka Streams je obohacení streamu událostí se statickými nebo pomalu se měnícími referenčními údaji: obohaťte objednávky o podrobnosti o produktu, přidat uživatelské jméno do události přihlášení atd.
// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();
// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
"products-catalog",
Consumed.with(Serdes.String(), new ProductSerde())
);
// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
.selectKey((orderId, order) ->
order.getItems().get(0).getProductId() // Reparticiona per product_id
);
// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
products,
(order, product) -> {
if (product == null) {
// Prodotto non trovato: usa dati parziali
return EnrichedOrder.fromOrder(order);
}
return EnrichedOrder.fromOrderAndProduct(order, product);
}
);
enrichedOrders
.selectKey((productId, enriched) -> enriched.getOrderId())
.mapValues(EnrichedOrder::toJson)
.to("orders-enriched");
Zpracování chyb ve streamech Kafka
// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione
// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
private final KafkaProducer<byte[], byte[]> dlqProducer;
public DLQProductionExceptionHandler() {
Properties dlqProps = new Properties();
dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
this.dlqProducer = new KafkaProducer<>(dlqProps);
}
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
log.error("Errore produzione per record key={}", new String(record.key()), exception);
// Invia il record problematico alla DLQ con metadata dell'errore
Headers headers = new RecordHeaders();
headers.add("error-message", exception.getMessage().getBytes());
headers.add("error-type", exception.getClass().getSimpleName().getBytes());
headers.add("source-topic", record.topic().getBytes());
headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".dlq", // {topic-name}.dlq
null,
record.key(),
record.value(),
headers
);
dlqProducer.send(dlqRecord);
// CONTINUE: skippa il record e va avanti (non blocca lo stream)
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
State Store: Přístup prostřednictvím interaktivních dotazů
// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType(
"orders-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);
// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
KeyValue<String, Long> kv = allCounts.next();
System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close(); // Importante: chiudi sempre l'iterator
// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()
Závěry a kdy použít Kafka Streams
Kafka Streams září ve scénářích, ke kterým chcete přidat zpracování streamu existující Java/Kotlin aplikace bez vyhrazeného clusteru: agregace v reálném čase, spojení mezi proudy a referenčními tabulkami, transformace a obohacení dat. Knihovna spravuje odolnost proti chybám, rozdělování a škálování vodorovně průhledným způsobem.
U složitějších analytických kanálů, které vyžadují dočasná spojení mezi proudy, CEP (Complex Event Processing) nebo integrace s datovým jezerem (Iceberg, Delta), Článek 08 ukazuje, jak je Kafka + Apache Flink de facto standardem v roce 2026.
Kompletní série: Apache Kafka
- článek 01 — Základy Apache Kafky
- článek 02 — KRaft v Kafce 4.0
- článek 03 — Pokročilý výrobce a spotřebitel
- článek 04 — Sémantika přesně jednou u Kafky
- článek 05 — Registr schémat: Avro a Protobuf
- článek 06 (tento) — Kafka Streams: Stream Processing zabudovaný v Javě
- článek 07 — Kafka Connect: Debezium CDC a integrace DB
- článek 08 — Kafka + Apache Flink: Pipeline Analytics v reálném čase







