Kafka Streams: Java, KTable 및 Windowing에 내장된 스트림 처리
Kafka Streams는 애플리케이션에서 직접 스트림을 처리하기 위한 Java 라이브러리입니다. 전용 클러스터가 없습니다. RocksDB를 통해 Streams DSL, KTables, State Store를 찾아보세요. 시간적 집계를 위한 창 작업.
Flink나 Spark 대신 Kafka Streams를 사용하는 이유
Kafka를 사용한 스트림 처리에 관해 이야기할 때 자연스럽게 떠오르는 질문은 바로 Kafka를 사용하는 이유입니다. Apache Flink 또는 Spark Streaming 대신 Kafka Streams를 사용하시겠습니까? 대답은 다음에 따라 달라집니다. 맥락. Kafka Streams에는 고유한 장점이 있습니다. 자바 라이브러리 (별도의 프레임워크가 아님) 애플리케이션 내부에서 실행되므로 필요하지 않습니다. 작업 관리자 없이, 별도의 배포 없이 전용 클러스터를 구축합니다.
Java/Kotlin 애플리케이션에 스트림 처리를 추가하려는 팀의 경우 운영상의 복잡성을 추가하지 않으면 Kafka Streams가 자연스러운 선택입니다. 파이프라인별 멀티 스트림 임시 조인을 통한 복잡한 분석, 데이터에 대한 복잡한 집계 분산된 워크로드 또는 독립적인 확장이 필요한 워크로드에는 Flink가 최선의 선택입니다. (시리즈의 기사 08 참조).
무엇을 배울 것인가
- Kafka Streams 아키텍처: 토폴로지, 스트림 스레드, 작업
- KStream과 KTable: 개념적, 실제적 차이
- DSL 스트림: 필터, 맵, flatMap, groupBy, 집계
- 상태 저장소: RocksDB 내장, 메모리 내, 변경 로그
- 창 작업: 텀블링, 호핑, 세션 창
- KStream-KTable 조인: 참조 데이터로 스트림 강화
- Kafka Streams의 오류 처리 및 배달 못한 편지 대기열
아키텍처: 토폴로지, 작업 및 파티션
Kafka Streams 애플리케이션은 다음을 정의합니다. 토폴로지: DAG (방향성 비순환 그래프) 처리 연산자. 각 노드는 작업입니다. (필터, 맵, 집계), 각 가장자리는 레코드 스트림입니다. 카프카 스트림 이 토폴로지를 다음에서 컴파일합니다. 작업, 각 파티션마다 하나씩 소스 주제의
// Dipendenze Maven
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-streams</artifactId>
// <version>3.7.0</version>
// </dependency>
// Configurazione base di Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092");
// Serializer/Deserializer di default per chiavi e valori
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Semantica exactly-once (vedi Articolo 04)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Numero di thread per istanza (parallelismo locale)
// Ogni thread gestisce uno o piu task (partizioni)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Intervallo di commit degli offset
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
// Directory per i dati dello State Store (RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
KStream 대 KTable: 기본 이원론
Kafka Streams의 가장 중요한 개념은 KStream과 KTable의 구분입니다. 이러한 차이점을 이해하는 것은 올바른 토폴로지를 설계하는 데 필수적입니다.
StreamsBuilder builder = new StreamsBuilder();
// KStream: rappresenta un flusso di eventi immutabili
// Ogni record e un evento indipendente con la stessa chiave
// Semantica: "E successa questa cosa in questo momento"
// Esempio: ogni ordine e un nuovo evento, anche se ha lo stesso order_id
KStream<String, String> ordersStream = builder.stream("orders");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- evento diverso
// KTable: rappresenta una tabella mutabile (changelog)
// Ogni record con la stessa chiave AGGIORNA il valore precedente
// Semantica: "Lo stato attuale di questa entita e questo"
// Esempio: lo stato corrente di ogni ordine
KTable<String, String> ordersTable = builder.table("orders-state");
// Record: [key="order-123", value="{status:PENDING}"]
// Record: [key="order-123", value="{status:CONFIRMED}"] <-- AGGIORNA il precedente
// Lo stato finale: order-123 = CONFIRMED
// GlobalKTable: come KTable ma replicato su tutti i thread/istanze
// Utile per dati di riferimento (prodotti, utenti) che non cambiano spesso
// Non partizionato: ogni istanza ha l'intera tabella
KTable<String, String> productsTable = builder.globalTable("products-catalog");
DSL 스트림: 기본 작업
// Pipeline completa di analisi ordini
StreamsBuilder builder = new StreamsBuilder();
// Leggi lo stream degli ordini (JSON come String)
KStream<String, String> rawOrders = builder.stream("orders-raw");
// 1. FILTER: filtra record nulli o malformati
KStream<String, String> validOrders = rawOrders
.filter((key, value) -> value != null && value.contains("order_id"));
// 2. MAP: trasforma il record (key e value)
KStream<String, Order> parsedOrders = validOrders
.mapValues(value -> parseOrder(value));
// 3. FLATMAP: produce 0 o N record da 1 record in input
// Utile per espandere arrays o filtrare con output multiplo
KStream<String, OrderItem> orderItems = parsedOrders
.flatMapValues(order -> order.getItems());
// 4. SELECTKEY: cambia la chiave del record
// ATTENZIONE: reparticiona i dati (shuffles attraverso Kafka)
KStream<String, Order> ordersByUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId());
// 5. PEEK: side effect senza modifica (logging, metrics)
KStream<String, Order> loggedOrders = ordersByUser
.peek((userId, order) ->
log.debug("Processing order {} for user {}", order.getOrderId(), userId));
// 6. BRANCH: divide lo stream in piu stream basandosi su condizioni
// Deprecato in 2.8+: usa split() che e piu flessibile
Map<String, KStream<String, Order>> branches = ordersByUser
.split(Named.as("branch-"))
.branch((key, order) -> order.getAmount() > 1000.0, Branched.as("high-value"))
.branch((key, order) -> order.getAmount() > 100.0, Branched.as("medium-value"))
.defaultBranch(Branched.as("low-value"));
KStream<String, Order> highValueOrders = branches.get("branch-high-value");
// 7. TO: scrivi lo stream su un topic Kafka
parsedOrders
.mapValues(Order::toJson)
.to("orders-processed", Produced.with(Serdes.String(), Serdes.String()));
State Store를 사용한 집계
상태 저장 집계는 Kafka Streams의 핵심입니다. 각 집계는 다음을 유지합니다. 하나 스테이트 스토어 값을 유지하는 로컬(기본적으로 RocksDB) 중간체. 스테이트 스토어에는 변경 로그 주제 특파원 내결함성을 위한 Kafka: 인스턴스가 충돌하면 상태를 재구성할 수 있습니다.
// Aggregazione: conta ordini per utente
KTable<String, Long> ordersPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey() // Raggruppa per chiave corrente (user_id)
.count(Materialized.as("orders-count-store"));
// Il risultato e un KTable: user_id -> count
// Aggregazione: calcola totale spesa per utente
KTable<String, Double> totalSpendPerUser = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey(Grouped.with(Serdes.String(),
// Serve Serde custom per Order
new OrderSerde()))
.aggregate(
() -> 0.0, // Initializer: valore iniziale
(userId, order, total) -> // Aggregator: combina accumulatore e nuovo record
total + order.getAmount(),
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("spend-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Double())
);
// Scrivi il risultato su un topic per monitoring o downstream
totalSpendPerUser
.toStream()
.mapValues(total -> String.format("{\"total\": %.2f}", total))
.to("user-spend-summary");
창 작업: 시간 집계
창 작업을 사용하면 시간 간격을 기준으로 레코드를 집계할 수 있습니다. Kafka Streams는 각각 고유한 의미를 지닌 네 가지 창 유형을 지원합니다.
// 1. TUMBLING WINDOW: finestre fisse non sovrapposte
// Esempio: conta ordini ogni 5 minuti (00:00-00:05, 00:05-00:10, ...)
KTable<Windowed<String>, Long> ordersPerWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("orders-per-5min-store"));
ordersPerWindow.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(), // user_id
String.format(
"{\"user\": \"%s\", \"window_start\": %d, \"count\": %d}",
windowedKey.key(),
windowedKey.window().start(),
count
)
))
.to("orders-per-window");
// 2. HOPPING WINDOW: finestre di dimensione fissa che si sovrappongono
// Esempio: media mobile degli ordini negli ultimi 10 minuti, aggiornata ogni 5 min
// (00:00-00:10, 00:05-00:15, 00:10-00:20, ...)
KTable<Windowed<String>, Long> slidingCount = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(
Duration.ofMinutes(10), // Dimensione finestra
Duration.ofMinutes(1) // Grace period per late arrivals
).advanceBy(Duration.ofMinutes(5)) // Avanza di 5 min (hopping)
)
.count();
// 3. SESSION WINDOW: finestre basate sull'attivita dell'utente
// Raggruppa eventi dello stesso utente se arrivano entro 30 minuti l'uno dall'altro
// Le sessioni si chiudono dopo 30 minuti di inattivita
KTable<Windowed<String>, Long> sessionCounts = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(30),
Duration.ofMinutes(5)
))
.count(Materialized.as("session-store"));
// 4. SLIDING WINDOW: finestre che si spostano su ogni nuovo evento
// Ogni evento definisce una finestra di [event_time - window_size, event_time]
KTable<Windowed<String>, Long> slidingWindow = parsedOrders
.selectKey((orderId, order) -> order.getUserId())
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
.count();
KStream-KTable Join: 스트림 강화
Kafka Streams의 가장 일반적인 사용 사례 중 하나는 이벤트 스트림을 강화하는 것입니다. 정적이거나 느리게 변화하는 참조 데이터: 다음을 통해 주문 강화 제품 세부정보, 로그인 이벤트에 사용자 이름 추가 등
// Join tra KStream (ordini) e KTable (catalogo prodotti)
StreamsBuilder builder = new StreamsBuilder();
// Topic con gli ordini (stream di eventi)
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), new OrderSerde())
);
// Topic con il catalogo prodotti (tabella di stato)
// Quando un prodotto viene aggiornato, il topic riceve un nuovo record
KTable<String, Product> products = builder.table(
"products-catalog",
Consumed.with(Serdes.String(), new ProductSerde())
);
// Per fare il join, la chiave dell'ordine deve essere il product_id
// Usiamo il primo item dell'ordine come esempio
KStream<String, Order> ordersByProduct = orders
.selectKey((orderId, order) ->
order.getItems().get(0).getProductId() // Reparticiona per product_id
);
// LEFT JOIN: mantiene tutti gli ordini anche se il prodotto non esiste nel catalog
KStream<String, EnrichedOrder> enrichedOrders = ordersByProduct.leftJoin(
products,
(order, product) -> {
if (product == null) {
// Prodotto non trovato: usa dati parziali
return EnrichedOrder.fromOrder(order);
}
return EnrichedOrder.fromOrderAndProduct(order, product);
}
);
enrichedOrders
.selectKey((productId, enriched) -> enriched.getOrderId())
.mapValues(EnrichedOrder::toJson)
.to("orders-enriched");
Kafka Streams의 오류 처리
// Handler per errori di deserializzazione
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
// Alternativa: LogAndFailExceptionHandler (ferma l'app) - piu sicura in produzione
// Handler personalizzato per errori di produzione
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
// Implementazione handler custom con Dead Letter Queue
public class DLQProductionExceptionHandler implements ProductionExceptionHandler {
private final KafkaProducer<byte[], byte[]> dlqProducer;
public DLQProductionExceptionHandler() {
Properties dlqProps = new Properties();
dlqProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
dlqProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
dlqProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
this.dlqProducer = new KafkaProducer<>(dlqProps);
}
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record,
Exception exception) {
log.error("Errore produzione per record key={}", new String(record.key()), exception);
// Invia il record problematico alla DLQ con metadata dell'errore
Headers headers = new RecordHeaders();
headers.add("error-message", exception.getMessage().getBytes());
headers.add("error-type", exception.getClass().getSimpleName().getBytes());
headers.add("source-topic", record.topic().getBytes());
headers.add("timestamp", String.valueOf(System.currentTimeMillis()).getBytes());
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".dlq", // {topic-name}.dlq
null,
record.key(),
record.value(),
headers
);
dlqProducer.send(dlqRecord);
// CONTINUE: skippa il record e va avanti (non blocca lo stream)
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
상태 저장소: 대화형 쿼리를 통한 액세스
// Interactive Queries: esponi lo stato locale via HTTP
// Utile per dashboard real-time, lookup diretti
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Leggi dal Key-Value Store locale
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType(
"orders-count-store",
QueryableStoreTypes.keyValueStore()
)
);
// Lookup singolo: conta ordini per user "user-42"
Long userOrderCount = keyValueStore.get("user-42");
System.out.println("Ordini utente: " + userOrderCount);
// Scan di tutti i valori (attenzione: puo essere costoso)
KeyValueIterator<String, Long> allCounts = keyValueStore.all();
while (allCounts.hasNext()) {
KeyValue<String, Long> kv = allCounts.next();
System.out.printf("User: %s, Count: %d%n", kv.key, kv.value);
}
allCounts.close(); // Importante: chiudi sempre l'iterator
// Range query: utenti da "user-100" a "user-200"
KeyValueIterator<String, Long> rangeIter =
keyValueStore.range("user-100", "user-200");
// usa rangeIter..., poi close()
결론 및 Kafka Streams 사용 시기
Kafka Streams는 스트림 처리를 추가하려는 시나리오에서 빛을 발합니다. 전용 클러스터가 없는 기존 Java/Kotlin 애플리케이션: 집계 실시간, 스트림과 참조 테이블 간 조인, 변환 및 강화 데이터의. 라이브러리는 내결함성, 파티셔닝 및 확장을 관리합니다. 수평으로 투명하게.
크로스 스트림 임시 조인이 필요한 보다 복잡한 분석 파이프라인의 경우 CEP(복잡한 이벤트 처리) 또는 데이터 레이크(Iceberg, Delta)와의 통합, 기사 08은 Kafka + Apache Flink가 2026년의 사실상 표준이 되는 방법을 보여줍니다.







