Kafka Akışları: Java, KTable ve Pencerelemeye Gömülü Akış İşleme
Kafka Streams, akışları doğrudan uygulamada işlemeye yönelik bir Java kitaplığıdır özel küme olmadan. RocksDB ile Streams DSL, KTables ve State Store'ları keşfedin ve zamansal toplamalar için pencere işlemleri.
Neden Kafka Flink veya Spark yerine Yayın Yapıyor?
Kafka ile akış işlemeden bahsederken doğal soru şudur: neden onu kullanalım? Apache Flink veya Spark Streaming yerine Kafka Akışı mı? Cevap bağlıdır bağlam. Kafka Streams'in benzersiz bir avantajı var: ve Java kütüphanesi (ayrı bir çerçeve değil) uygulamanızın içinde çalışır, gerek yok ayrı bir dağıtım olmadan, bir iş yöneticisi olmadan, özel bir kümenin.
Java/Kotlin uygulamalarına akış işlemeyi eklemek isteyen ekipler için Operasyonel karmaşıklık eklemeden Kafka Streams doğal seçimdir. Boru hattına göre çok akışlı zamansal birleştirmelerle karmaşık analitikler, veriler üzerinde karmaşık toplamalar dağıtılmış veya bağımsız ölçeklendirme gerektiren iş yükleri için Flink en iyi seçimdir (bkz. serinin 08. Maddesi).
Ne Öğreneceksiniz
- Kafka Akış mimarisi: topoloji, akış iş parçacıkları, görevler
- KStream ve KTable: kavramsal ve pratik fark
- DSL Akışları: filtre, harita, flatMap, groupBy, toplama
- Devlet Mağazası: RocksDB yerleşik, bellek içi, değişiklik günlükleri
- Pencere işlemleri: yuvarlanma, atlama, oturum pencereleri
- KStream-KTable birleşimi: Akışları referans verileriyle zenginleştirme
- Kafka Akışlarında hata işleme ve geçersiz mektup kuyrukları
Mimari: Topoloji, Görev ve Bölüm
Kafka Streams uygulaması bir tanımlar topoloji: bir DAG İşleme operatörlerinin (Yönlendirilmiş Döngüsel Grafik). Her düğüm bir işlemdir (filtre, harita, toplama), her kenar bir kayıt akışıdır. Kafka Akışları bu topolojiyi derleyin görevler, her bölüm için bir tane kaynak konunun.
// Dipendenze Maven
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.7.0</version>
// </dependency>
// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KStream ve KTable: Temel Dualizm
Kafka Streams'in en önemli konsepti KStream ve KTable arasındaki ayrımdır. Bu farkı anlamak, doğru topolojileri tasarlamanın temelidir.
StreamsBuilder builder = new StreamsBuilder();
// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- evento diverso
// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED
// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");
DSL Akışları: Temel İşlemler
// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();
// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");
// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
.filter((key, value) -> value != null && value.contains("order_id"));
// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
.mapValues(value -> parseOrder(value));
// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
.flatMapValues(order -> order.getItems());
// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId());
// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
.peek((userId, order) ->
log.debug("Processing order {} for user {}", order.getOrderId(), userId));
// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
.split(Named.as("branch-"))
.branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
.branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
KStream<String, Order> highValueOrders = branches.get("branch-high-value");
// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
.mapValues(Order::toJson)
.to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));
Devlet Mağazasıyla Toplamalar
Durum bilgisi olan toplamalar Kafka Akışlarının kalbidir. Her toplama korunur bir Devlet Mağazası değerleri koruyan yerel (varsayılan olarak RocksDB) ara ürünler. Devlet Mağazası'nın bir değişiklik günlüğü konusu muhabir Hata toleransı için Kafka: Bir örnek çökerse durum yeniden oluşturulabilir.
// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey() // Raggruppa per chiave corrente (user_id)
.count(Materialized.as("orders-count-store"));
// Il risultato e un KTable: user_id -> count
// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey(Grouped.with(Serdes.String(),
// Serve Serde custom per Order
new OrderSerde()))
.aggregate(
() -> 0.0, // Initializer: valore iniziale
(userId, order, total) -> // Aggregator: combina accumulatore e nuovo record
total + order.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
.toStream()
.mapValues(total -> String.format("{\"total\": %.2f}", total))
.to("user-spend-summary");
Pencere İşlemleri: Geçici Toplamalar
Pencere işlemleri, kayıtları zaman aralıklarına göre toplamanıza olanak tanır. Kafka Streams, her biri farklı anlamlara sahip dört pencere türünü destekler.
// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("orders-per-5min-store"));
ordersPerWindow.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(), // user_id
String.format(
"{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
windowedKey.key(),
windowedKey.window().start(),
count
)
))
.to("orders-per-window");
// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // Dimensione finestra
Duration.ofMinutes(1) // Grace period per late arrivals
).advanceBy(Duration.ofMinutes(5)) // Avanza di 5 min (hopping)
)
.count();
// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(30),
Duration.ofMinutes(5)
))
.count(Materialized.as("session-store"));
// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
KStream-KTable Katılın: Akış zenginleştirme
Kafka Akışlarındaki en yaygın kullanım durumlarından biri olay akışını zenginleştirmektir Statik veya yavaş değişen referans verileriyle: siparişleri zenginleştirin ürün ayrıntıları, kullanıcı adını oturum açma etkinliğine ekleme vb.
// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();
// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
"products-catalog",
Consumed.with(Serdes.String(), new ProductSerde())
);
// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
.selectKey((orderId, order) ->
order.getItems().get(0).getProductId() // Reparticiona per product_id
);
// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
products,
(order, product) -> {
if (product == null) {
// Prodotto non trovato: usa dati parziali
return EnrichedOrder.fromOrder(order);
}
return EnrichedOrder.fromOrderAndProduct(order, product);
}
);
enrichedOrders
.selectKey((productId, enriched) -> enriched.getOrderId())
.mapValues(EnrichedOrder::toJson)
.to("orders-enriched");
Kafka Akışlarında Hata İşleme
// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione
// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
private final KafkaProducer<byte[], byte[]> dlqProducer;
public DLQProductionExceptionHandler() {
Properties dlqProps = new Properties();
dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
this.dlqProducer = new KafkaProducer<>(dlqProps);
}
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
log.error("Errore produzione per record key={}", new String(record.key()), exception);
// Invia il record problematico alla DLQ con metadata dell'errore
Headers headers = new RecordHeaders();
headers.add("error-message", exception.getMessage().getBytes());
headers.add("error-type", exception.getClass().getSimpleName().getBytes());
headers.add("source-topic", record.topic().getBytes());
headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".dlq", // {topic-name}.dlq
null,
record.key(),
record.value(),
headers
);
dlqProducer.send(dlqRecord);
// CONTINUE: skippa il record e va avanti (non blocca lo stream)
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
Durum Deposu: Etkileşimli Sorgular Yoluyla Erişim
// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType(
"orders-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);
// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
KeyValue<String, Long> kv = allCounts.next();
System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close(); // Importante: chiudi sempre l'iterator
// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()
Sonuçlar ve Kafka Akışları Ne Zaman Kullanılmalı?
Kafka Akışları, akış işlemeyi eklemek istediğiniz senaryolarda parlıyor özel bir kümeye sahip olmayan mevcut bir Java/Kotlin uygulaması: toplamalar gerçek zamanlı, akışlar ve referans tabloları arasında birleştirmeler, dönüşümler ve zenginleştirme verilerden. Kütüphane hata toleransını, bölümlemeyi ve ölçeklendirmeyi yönetir şeffaf bir şekilde yatay.
Çapraz akışlı zamansal birleştirmeler gerektiren daha karmaşık analitik işlem hatları için, CEP (Karmaşık Olay İşleme) veya bir veri gölü (Buzdağı, Delta) ile entegrasyon, 08. Madde, Kafka + Apache Flink'in 2026'da fiili standart haline geldiğini gösteriyor.
Serinin Tamamı: Apache Kafka
- Madde 01 — Apache Kafka'nın Temelleri
- Madde 02 — Kafka 4.0'da KRaft
- Madde 03 — Gelişmiş Üretici ve Tüketici
- Madde 04 — Kafka'da Tam Bir Kez Semantik
- Madde 05 — Şema Kaydı: Avro ve Protobuf
- Madde 06 (bu) — Kafka Akışları: Java'ya Gömülü Akış İşleme
- Madde 07 — Kafka Connect: Debezium CDC ve Veritabanı Entegrasyonu
- Madde 08 — Kafka + Apache Flink: Gerçek Zamanlı İşlem Hattı Analitiği







