Kafka Streams: Streamverwerking ingebed in Java, KTable en Windowing
Kafka Streams is een Java-bibliotheek voor het rechtstreeks verwerken van streams in de applicatie zonder speciaal cluster. Ontdek Streams DSL, KTables, State Stores met RocksDB en vensterbewerkingen voor tijdelijke aggregaties.
Waarom Kafka Streams in plaats van Flink of Spark
Als je het met Kafka over streamverwerking hebt, is de natuurlijke vraag: waarom zou je het gebruiken? Kafka Streams in plaats van Apache Flink of Spark Streaming? Het antwoord hangt ervan af context. Kafka Streams heeft een uniek voordeel: en a Java-bibliotheek (geen apart raamwerk) dat binnen uw applicatie draait, geen noodzaak van een dedicated cluster, zonder jobmanager, zonder aparte inzet.
Voor teams die streamverwerking willen toevoegen aan hun Java/Kotlin-applicatie zonder operationele complexiteit toe te voegen, is Kafka Streams de logische keuze. Via pijpleiding complexe analyses met tijdelijke joins van meerdere stromen, complexe aggregaties van gegevens gedistribueerd of voor workloads die onafhankelijke schaalbaarheid vereisen, is Flink de beste keuze (zie artikel 08 van de serie).
Wat je gaat leren
- Kafka Streams-architectuur: topologie, streamthreads, taken
- KStream versus KTable: het conceptuele en praktische verschil
- DSL-streams: filter, kaart, flatMap, groupBy, aggregaat
- State Store: RocksDB ingebed, in het geheugen, changelogs
- Vensterbewerkingen: tuimelen, springen, sessievensters
- KStream-KTable join: Streams verrijken met referentiegegevens
- Foutafhandeling en wachtrijen voor dode letters in Kafka Streams
Architectuur: topologie, taak en partitie
Een Kafka Streams-applicatie definieert a topologie: een DAG (Directed Acyclic Graph) van verwerkers. Elk knooppunt is een bewerking (filter, kaart, aggregatie), elke rand is een stroom records. Kafka-stromen compileer deze topologie in taken, één voor elke partitie van het brononderwerp.
// Dipendenze Maven
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.7.0</version>
// </dependency>
// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KStream versus KTable: fundamenteel dualisme
Het belangrijkste concept van Kafka Streams is het onderscheid tussen KStream en KTable. Het begrijpen van dit verschil is van fundamenteel belang voor het ontwerpen van correcte topologieën.
StreamsBuilder builder = new StreamsBuilder();
// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- evento diverso
// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED
// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");
DSL-streams: fundamentele operaties
// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();
// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");
// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
.filter((key, value) -> value != null && value.contains("order_id"));
// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
.mapValues(value -> parseOrder(value));
// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
.flatMapValues(order -> order.getItems());
// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId());
// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
.peek((userId, order) ->
log.debug("Processing order {} for user {}", order.getOrderId(), userId));
// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
.split(Named.as("branch-"))
.branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
.branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
KStream<String, Order> highValueOrders = branches.get("branch-high-value");
// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
.mapValues(Order::toJson)
.to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));
Aggregaties met State Store
Stateful aggregaties vormen het hart van Kafka Streams. Elke aggregatie blijft behouden één Staatswinkel local (standaard RocksDB) die de waarden behoudt tussenproducten. De Staatswinkel heeft een changelog-onderwerp correspondent op Kafka voor fouttolerantie: als een instance crasht, kan de status opnieuw worden samengesteld.
// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey() // Raggruppa per chiave corrente (user_id)
.count(Materialized.as("orders-count-store"));
// Il risultato e un KTable: user_id -> count
// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey(Grouped.with(Serdes.String(),
// Serve Serde custom per Order
new OrderSerde()))
.aggregate(
() -> 0.0, // Initializer: valore iniziale
(userId, order, total) -> // Aggregator: combina accumulatore e nuovo record
total + order.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
.toStream()
.mapValues(total -> String.format("{\"total\": %.2f}", total))
.to("user-spend-summary");
Vensterbewerkingen: tijdelijke aggregaties
Met vensterbewerkingen kunt u records aggregeren op basis van tijdsintervallen. Kafka Streams ondersteunt vier venstertypen, elk met een verschillende semantiek.
// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("orders-per-5min-store"));
ordersPerWindow.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(), // user_id
String.format(
"{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
windowedKey.key(),
windowedKey.window().start(),
count
)
))
.to("orders-per-window");
// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // Dimensione finestra
Duration.ofMinutes(1) // Grace period per late arrivals
).advanceBy(Duration.ofMinutes(5)) // Avanza di 5 min (hopping)
)
.count();
// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(30),
Duration.ofMinutes(5)
))
.count(Materialized.as("session-store"));
// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
KStream-KTable Join: Streamverrijking
Een van de meest voorkomende gebruiksscenario's in Kafka Streams is het verrijken van een evenementenstream met statische of langzaam veranderende referentiegegevens: verrijk orders met de productgegevens, voeg de gebruikersnaam toe aan de inloggebeurtenis, enz.
// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();
// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
"products-catalog",
Consumed.with(Serdes.String(), new ProductSerde())
);
// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
.selectKey((orderId, order) ->
order.getItems().get(0).getProductId() // Reparticiona per product_id
);
// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
products,
(order, product) -> {
if (product == null) {
// Prodotto non trovato: usa dati parziali
return EnrichedOrder.fromOrder(order);
}
return EnrichedOrder.fromOrderAndProduct(order, product);
}
);
enrichedOrders
.selectKey((productId, enriched) -> enriched.getOrderId())
.mapValues(EnrichedOrder::toJson)
.to("orders-enriched");
Foutafhandeling in Kafka-streams
// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione
// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
private final KafkaProducer<byte[], byte[]> dlqProducer;
public DLQProductionExceptionHandler() {
Properties dlqProps = new Properties();
dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
this.dlqProducer = new KafkaProducer<>(dlqProps);
}
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
log.error("Errore produzione per record key={}", new String(record.key()), exception);
// Invia il record problematico alla DLQ con metadata dell'errore
Headers headers = new RecordHeaders();
headers.add("error-message", exception.getMessage().getBytes());
headers.add("error-type", exception.getClass().getSimpleName().getBytes());
headers.add("source-topic", record.topic().getBytes());
headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".dlq", // {topic-name}.dlq
null,
record.key(),
record.value(),
headers
);
dlqProducer.send(dlqRecord);
// CONTINUE: skippa il record e va avanti (non blocca lo stream)
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
State Store: toegang via interactieve zoekopdrachten
// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType(
"orders-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);
// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
KeyValue<String, Long> kv = allCounts.next();
System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close(); // Importante: chiudi sempre l'iterator
// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()
Conclusies en wanneer Kafka-streams moeten worden gebruikt
Kafka Streams schittert in scenario's waaraan u streamverwerking wilt toevoegen een bestaande Java/Kotlin-applicatie zonder een dedicated cluster: aggregaties real-time, verbindingen tussen streams en referentietabellen, transformaties en verrijking van de gegevens. De bibliotheek beheert fouttolerantie, partitionering en schaling horizontaal op een transparante manier.
Voor complexere analysepijplijnen die cross-stream tijdelijke joins vereisen, CEP (Complex Event Processing), of integratie met een datameer (Iceberg, Delta), Artikel 08 laat zien hoe Kafka + Apache Flink de de facto standaard is in 2026.
De complete serie: Apache Kafka
- Artikel 01 – Apache Kafka-grondbeginselen
- Artikel 02 — KRaft in Kafka 4.0
- Artikel 03 - Geavanceerde producent en consument
- Artikel 04 - Exactly-Once Semantiek in Kafka
- Artikel 05 — Schemaregister: Avro en Protobuf
- Artikel 06 (dit) — Kafka Streams: streamverwerking ingebed in Java
- Artikel 07 — Kafka Connect: Debezium CDC- en DB-integratie
- Artikel 08 — Kafka + Apache Flink: realtime pijplijnanalyse







