Kafka + Apache Flink: Gerçek Zamanlı Boru Hattı Analitiği ve Buzdağı Havuzu
Kafka + Flink kombinasyonu, gerçek zamanlı analiz hatları için fiili standarttır 2026'da. Bağlayıcı Flink SQL ile uçtan uca bir işlem hattının nasıl oluşturulacağını öğrenin Kafka, Iceberg'in dinamik havuzu ve Confluent Cloud'da Flink'i yönetti.
Neden 2026'da Kafka + Flink
2026'da Apache Kafka + Apache Flink kombinasyonu referans yığını haline geldi olgun teknoloji şirketlerindeki gerçek zamanlı analiz hatları için. Kafka alımı yönetir ve dayanıklılık ve ölçeklenebilirlik garantileriyle veri ara belleğe alma; Flink fırsatları tam olarak bir kez semantik, zamansal pencereler ve karmaşık birleştirmelerle durum bilgisi olan işleme Kafka Streams'in yerel olarak desteklemediği.
2025'teki en önemli değişiklik, Flink SQL analitik işlem hatları için ana arayüz olarak: DataStream API yazmak yerine Java'da veri mühendisleri akışlara ve tablolara standart SQL yazar. Confluent hizmete girdi onun Yönetilen Flink (Apache Flink as a Service) yerel entegrasyonla Confluent Schema Registry ve Apache Iceberg'e geçerek birçok ekip için yığını sıfır işlem haline getiriyor.
Ne Öğreneceksiniz
- Flink mimarisi: JobManager, TaskManager, Kafka ile kontrol noktası oluşturma
- Flink SQL: DDL ile Kafka kaynak ve havuz tabloları oluşturun
- Geçici işlemler: Flink SQL'de TUMBLE, HOP, SESSION pencereleri
- Akış birleştirme: akış akışı birleştirme ve arama tablosuyla geçici birleştirme
- Apache Iceberg: veri gölü için tablo formatı, nesne depolamada ACID
- Uçtan uca ardışık düzen: Kafka -> Flink -> S3'te Buzdağı
- Confluent Managed Flink: yapılandırma ve kendi kendine barındırılanlardan farklar
Kafka ile Flink mimarisi
Apache Flink dağıtılmış bir akış işleme çerçevesidir. Güçlü noktası Kafka Akışları ve yönetme yeteneği ile karşılaştırıldığında sen çok harikasın (TB) RocksDB ile ve dağıtılmış depolamada (S3/GCS) kontrol noktası oluşturma, arasında karmaşık birleşimler çoklu akışlar ve CEP (Karmaşık Olay İşleme).
# Architettura Flink:
# JobManager: coordina l'esecuzione del job, gestisce i checkpoint
# - unico (o 2 con HA tramite ZooKeeper/Kubernetes leader election)
# - scheduler, checkpoint coordinator, metastore
# TaskManager: esegue i task paralleli (equivalente dei worker)
# - ogni TaskManager ha N "slots" (unita di parallelismo)
# - slot = thread dedicato con stato RocksDB locale
# Checkpoint: snapshot periodico dello stato su S3/GCS
# - se un TaskManager crasha, il job riprende dall'ultimo checkpoint
# - integrazione nativa con Kafka: salva l'offset Kafka nel checkpoint
# - garantisce esattamente-once end-to-end
# Deployment su Kubernetes (Flink Operator)
# helm repo add flink-operator-repo \
# https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
# FlinkDeployment CRD:
cat <<'EOF' | kubectl apply -f -
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: analytics-pipeline
spec:
image: apache/flink:1.19-scala_2.12-java17
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: "rocksdb"
state.backend.incremental: "true"
state.checkpoints.dir: "s3://my-flink-checkpoints/analytics-pipeline"
execution.checkpointing.interval: "60000" # checkpoint ogni 60s
execution.checkpointing.mode: "EXACTLY_ONCE"
# Kafka source: committed offset = checkpoint offset
execution.checkpointing.timeout: "300000"
serviceAccount: flink-sa
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 3
resource:
memory: "4096m"
cpu: 2
job:
jarURI: s3://my-flink-jars/analytics-pipeline-1.0.jar
parallelism: 8
upgradeMode: stateful
EOF
Flink SQL: SQL Standardıyla Akış İşleme
Flink SQL, Kafka akışlarını SQL tabloları olarak değerlendirmenize olanak tanır. Büyü ve konsept arasında olay zamanı (olay zamanı): Flink, verinin kendisindeki zaman damgasını kullanır (varış saati değil) pencereler için, geç varışların doğru şekilde yönetilmesine olanak sağlar.
DDL: Kafka Kaynak Tablolarını Tanımlayın
-- Flink SQL: crea una tabella che legge dal topic Kafka "orders"
CREATE TABLE kafka_orders (
order_id VARCHAR,
user_id VARCHAR,
amount DECIMAL(10, 2),
currency VARCHAR,
status VARCHAR,
created_at BIGINT, -- timestamp in milliseconds (dal payload)
-- Campo virtuale: converte il BIGINT in TIMESTAMP per le window
event_time AS TO_TIMESTAMP_LTZ(created_at, 3),
-- Watermark: permette late arrivals fino a 5 secondi
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
'properties.group.id' = 'flink-analytics-consumer',
'scan.startup.mode' = 'latest-offset', -- o 'earliest-offset' per backfill
-- Schema Registry Avro
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081'
);
-- Tabella di riferimento: catalogo prodotti (changelog topic da Debezium)
CREATE TABLE products_table (
product_id VARCHAR,
product_name VARCHAR,
category VARCHAR,
base_price DECIMAL(10, 2),
-- PRIMARY KEY: indica che e una tabella di lookup (upsert semantics)
PRIMARY KEY (product_id) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'cdc.public.products',
'properties.bootstrap.servers' = 'kafka-1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081',
-- upsert-kafka: gestisce insert/update/delete basandosi sulla chiave
'connector' = 'upsert-kafka'
);
Flink SQL'de Pencere Toplamaları
-- Aggregazione: totale vendite per categoria ogni 10 minuti (TUMBLE window)
-- TUMBLE: finestre fisse non sovrapposte
SELECT
window_start,
window_end,
p.category,
COUNT(*) AS order_count,
SUM(o.amount) AS total_revenue,
AVG(o.amount) AS avg_order_value,
MAX(o.amount) AS max_order_value
FROM TABLE(
TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
JOIN products_table AS p ON o.product_id = p.product_id
GROUP BY window_start, window_end, p.category;
-- HOP window: sliding window di 1 ora, avanza ogni 15 minuti
-- Utile per medie mobili
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS orders_in_window,
SUM(amount) AS spend_in_window
FROM TABLE(
HOP(TABLE kafka_orders, DESCRIPTOR(event_time),
INTERVAL '15' MINUTE, -- slide interval
INTERVAL '1' HOUR -- window size
)
)
GROUP BY window_start, window_end, user_id;
-- SESSION window: raggruppa eventi dello stesso utente per sessione
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS orders_in_session,
SUM(amount) AS session_value
FROM TABLE(
SESSION(TABLE kafka_orders, DESCRIPTOR(event_time),
INTERVAL '30' MINUTE -- gap: inattivita > 30 min = nuova sessione
)
)
GROUP BY window_start, window_end, user_id;
Akış-Yayın Birleştirme
-- Join tra due stream entro una finestra temporale
-- Esempio: join ordini con eventi di pagamento (devono arrivare entro 5 minuti)
CREATE TABLE payment_events (
payment_id VARCHAR,
order_id VARCHAR,
payment_time BIGINT,
status VARCHAR,
event_time AS TO_TIMESTAMP_LTZ(payment_time, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'payment-events',
'properties.bootstrap.servers' = 'kafka-1:9092',
'format' = 'json'
);
-- INTERVAL JOIN: join tra stream con condizione temporale
SELECT
o.order_id,
o.user_id,
o.amount,
p.status AS payment_status,
p.payment_time AS paid_at
FROM kafka_orders AS o
JOIN payment_events AS p
ON o.order_id = p.order_id
-- Condizione temporale: il pagamento deve arrivare entro 5 minuti dall'ordine
AND p.event_time BETWEEN o.event_time
AND o.event_time + INTERVAL '5' MINUTE;
Apache Buzdağı: ACID'li Veri Gölü
Apaçi Buzdağı ve veri gölü için açık kaynaklı bir tablo formatı S3 gibi nesne depolamaya ACID (işlem, anlık görüntü izolasyonu) garantileri getiren ve GCS. 2026'da ve Snowflake, Databricks tarafından kullanılan standart veri göl evi formatı (Delta'ya alternatif olarak), AWS Glue ve neredeyse tüm yönetilen sorgu motorları.
Flink ile entegrasyonu, Kafka verilerini Iceberg'e sorunsuz bir şekilde yazmanıza olanak tanır işlem garantileriyle sürekli: S3'teki parke dosyaları anlık görüntüler halinde düzenlenir Devam eden yazma işlemlerini etkilemeden SELECT CURRENT ile değiştirilemez, sorgulanabilir.
-- Crea un Iceberg catalog in Flink SQL
-- (usa Hive Metastore o REST Catalog di Iceberg)
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://iceberg-rest-catalog:8181',
'warehouse' = 's3://data-lake/warehouse',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);
USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS analytics;
-- Crea la tabella Iceberg di destinazione
CREATE TABLE IF NOT EXISTS analytics.orders_enriched (
order_id VARCHAR,
user_id VARCHAR,
amount DECIMAL(10, 2),
currency VARCHAR,
category VARCHAR,
product_name VARCHAR,
payment_status VARCHAR,
order_hour TIMESTAMP(3), -- per partitioning
created_date DATE -- per partitioning
)
PARTITIONED BY (created_date, category)
WITH (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
-- Compaction automatica: merge dei file piccoli
'write.target-file-size-bytes' = '134217728', -- 128MB
-- Snapshot expiration automatica dopo 7 giorni
'history.expire.min-snapshots-to-keep' = '10',
'history.expire.max-snapshot-age-ms' = '604800000'
);
Komple Boru Hattı: Kafka'dan Buzdağına
-- Pipeline end-to-end: leggi ordini da Kafka, arricchisci con prodotti,
-- aggrega per 10 min, scrivi su Iceberg in S3
-- Job Flink SQL continuo (eseguito con flink sql-client)
INSERT INTO analytics.orders_enriched
SELECT
o.order_id,
o.user_id,
o.amount,
o.currency,
COALESCE(p.category, 'unknown') AS category,
COALESCE(p.product_name, 'unknown') AS product_name,
pay.status AS payment_status,
window_start AS order_hour,
CAST(window_start AS DATE) AS created_date
FROM TABLE(
TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
LEFT JOIN products_table AS p ON o.product_id = p.product_id
LEFT JOIN payment_events AS pay
ON o.order_id = pay.order_id
AND pay.event_time BETWEEN o.event_time
AND o.event_time + INTERVAL '5' MINUTE;
-- Query su Iceberg da Athena/Spark/Trino (dopo che Flink ha scritto):
-- SELECT
-- created_date,
-- category,
-- COUNT(*) as orders,
-- SUM(amount) as revenue
-- FROM analytics.orders_enriched
-- WHERE created_date >= CURRENT_DATE - INTERVAL '7' DAY
-- GROUP BY created_date, category
-- ORDER BY created_date DESC, revenue DESC;
Birleşik Yönetilen Flink
Confluent faaliyete geçti Apache Flink yönetildi entegre Confluent Bulut platformunda. Ana avantaj: yönetilecek sıfır altyapı, yerel otomatik ölçeklendirme, Confluent Schema Registry ve Kafka konularıyla doğrudan entegrasyon manuel konfigürasyon olmadan.
# Confluent Cloud Flink: workflow
# 1. Accedi alla Confluent Cloud Console
# 2. Naviga in Flink -> Compute pools -> Create pool
# Scegli: Cloud (AWS/Azure/GCP), Region, Max CFUs
# 3. In Flink SQL shell (Confluent Cloud o CLI):
confluent flink shell
# 4. I topic Kafka esistenti sono automaticamente disponibili come tabelle
-- Visualizza i topic come tabelle
SHOW TABLES IN kafka_cluster;
-- Gli schemi Avro dal Schema Registry vengono convertiti automaticamente
-- Non serve configurare 'connector', 'format', 'bootstrap.servers'
-- Tutto e pre-configurato dal managed service
SELECT * FROM `orders` LIMIT 10;
-- 5. Crea un Flink job (continuous query):
INSERT INTO `orders-enriched`
SELECT
order_id,
user_id,
amount,
category
FROM `orders` o
JOIN `products-catalog` p ON o.product_id = p.product_id;
-- 6. Monitora i job dalla UI Confluent
-- Metriche disponibili: records/sec, checkpoint lag, watermark lag
Flink'i İzleme ve Sorun Giderme
# Metriche Flink chiave da monitorare (via Prometheus)
# 1. Checkpoint latency e dimensione
# flink_jobmanager_job_lastCheckpointDuration (ms)
# flink_jobmanager_job_lastCheckpointSize (bytes)
# Se duration > 60s o size > 10GB: problemi di stato o throughput
# 2. Watermark lag (ritardo nel processing degli eventi)
# flink_taskmanager_job_task_operator_currentInputWatermark
# Confronta con System.currentTimeMillis()
# Lag > 30s: il consumer non riesce a tenere il passo con Kafka
# 3. Back pressure
# flink_taskmanager_job_task_isBackPressured (0 o 1)
# Flink UI mostra le frecce rosse per i task sotto pressure
# 4. Kafka consumer lag (via Flink metrics)
# flink_taskmanager_job_task_operator_KafkaSourceReader_kafkaConsumerMetrics_records_lag_max
# Se lag in crescita: aumenta il parallelismo del source operator
# Accedi alla Flink Web UI:
kubectl port-forward svc/analytics-pipeline-rest 8081:8081
# http://localhost:8081 -> Jobs, Task Managers, Checkpoints
# Trigger checkpoint manuale (per debug)
curl -X POST http://flink-jobmanager:8081/jobs/{job-id}/checkpoints
# Cancella un job in modo sicuro (con savepoint per resume)
curl -X POST "http://flink-jobmanager:8081/jobs/{job-id}/stop" \
-H "Content-Type: application/json" \
-d '{"targetDirectory": "s3://my-savepoints/analytics", "drain": true}'
Kafka Streams vs Flink: Hangisini Ne Zaman Seçmeli
- Kafka Akışlarını şu amaçlarla kullanın: Java uygulamasına gömülü basit toplamalar, KStream-KTable'a (akış + arama tablosu) katılın, deneyimi olmayan küçük ekipler dağıtılmış kümeler, basit dağıtım (ve bir küme değil, bir kitaplık).
- Flink'i kullanarak şunları yapın: birden fazla akış arasındaki zamansal birleşmeler, durum çok büyük (GB/TB), CEP (Karmaşık Olay İşleme), Iceberg/Delta veri gölüne giden boru hattı, Java dışı veri mühendisleri için agnostik ölçeklendirme sağlayan bir arayüz olarak Flink SQL uygulama mikro hizmeti, tam olarak bir kez garantili saniyeden kısa gecikmeler.
Sonuçlar
Kafka + Flink + Iceberg kombinasyonu modern veri yığınını temsil eder 2026'da göl evi gerçek zamanlı: Olay akışının omurgası olarak Kafka, Flink İşlem garantili durum bilgisi işleme, ACID depolama formatı olarak Iceberg nesne depolama konusunda. Confluent'in yönetilen Flink'i ile bu yığına artık erişilebiliyor dağıtılmış kümeler konusunda operasyonel uzmanlığı olmayan ekiplere bile.
Serinin Tamamı: Apache Kafka
- Madde 01 — Apache Kafka Temelleri: Konular, Bölümler ve Tüketici Grupları
- Madde 02 — Kafka 4.0'da KRaft: Elveda ZooKeeper
- Madde 03 — Gelişmiş Üretici ve Tüketici
- Madde 04 — Kafka'da Tam Bir Kez Semantik
- Madde 05 — Şema Kaydı: Avro ve Protobuf
- Madde 06 — Kafka Akışları: KTable ve Pencereleme
- Madde 07 — Kafka Connect: Debezium CDC ve Veritabanı Entegrasyonu
- Madde 08 (bu) — Kafka + Apache Flink: Gerçek Zamanlı Boru Hattı Analitiği ve Buzdağı Havuzu
- Madde 09 — Kafka'yı İzlemek: JMX Exporter, Prometheus ve Grafana
- Madde 10 — Kafka'da Teslim Edilmeyen Mektup Sırası ve Hata İşleme
- Madde 11 — Üretimde Kafka: Boyutlandırma, Saklama ve Olağanüstü Durum Kurtarma







