Kafka + Apache Flink: Pipeline Analytics Real-Time and Iceberg Sink
Combinația Kafka + Flink este standardul de facto pentru conductele de analiză în timp real în 2026. Aflați cum să construiți o conductă end-to-end cu Flink SQL, conectorul Kafka, chiuveta dinamică Iceberg și a gestionat Flink pe Confluent Cloud.
De ce Kafka + Flink în 2026
În 2026, combinația Apache Kafka + Apache Flink a devenit stiva de referință pentru conducte de analiză în timp real în companiile de tehnologie mature. Kafka se ocupă de ingerare și tamponarea datelor cu garanțiile sale de durabilitate și scalabilitate; Oferte Flink de procesare stateful cu semantică exact o dată, ferestre temporale și îmbinări complexe pe care Kafka Streams nu le suportă în mod nativ.
Schimbarea crucială în 2025 a fost adoptarea masivă a Flink SQL ca interfață principală pentru conductele de analiză: în loc să scrie DataStream API în Java, inginerii de date scriu SQL standard pe fluxuri și tabele. Confluent a fost lansat lui Flink gestionat (Apache Flink ca serviciu) cu integrare nativă la Confluent Schema Registry și Apache Iceberg, făcând stiva zero-ops pentru multe echipe.
Ce vei învăța
- Arhitectură Flink: JobManager, TaskManager, punct de control cu Kafka
- Flink SQL: Creați tabele sursă și absorbție Kafka cu DDL
- Operații temporale: ferestre TUMBLE, HOP, SESSION în Flink SQL
- Unire în flux: alăturare flux-stream și alăturare temporală cu tabelul de căutare
- Apache Iceberg: format de tabel pentru data lake, ACID pe stocarea obiectelor
- Conductă de la capăt la capăt: Kafka -> Flink -> Iceberg pe S3
- Confluent Managed Flink: configurație și diferențe față de auto-găzduit
Flink arhitectura cu Kafka
Apache Flink este un cadru de procesare a fluxurilor distribuite. Punctul său forte în comparație cu Kafka Streams și capacitatea de a gestiona esti foarte grozav (TB) cu RocksDB și punct de control pe stocarea distribuită (S3/GCS), îmbinări complexe între fluxuri multiple și CEP (Procesare complexă a evenimentelor).
# Architettura Flink:
# JobManager: coordina l'esecuzione del job, gestisce i checkpoint
# - unico (o 2 con HA tramite ZooKeeper/Kubernetes leader election)
# - scheduler, checkpoint coordinator, metastore
# TaskManager: esegue i task paralleli (equivalente dei worker)
# - ogni TaskManager ha N "slots" (unita di parallelismo)
# - slot = thread dedicato con stato RocksDB locale
# Checkpoint: snapshot periodico dello stato su S3/GCS
# - se un TaskManager crasha, il job riprende dall'ultimo checkpoint
# - integrazione nativa con Kafka: salva l'offset Kafka nel checkpoint
# - garantisce esattamente-once end-to-end
# Deployment su Kubernetes (Flink Operator)
# helm repo add flink-operator-repo \
# https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
# FlinkDeployment CRD:
cat <<'EOF' | kubectl apply -f -
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: analytics-pipeline
spec:
image: apache/flink:1.19-scala_2.12-java17
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: "rocksdb"
state.backend.incremental: "true"
state.checkpoints.dir: "s3://my-flink-checkpoints/analytics-pipeline"
execution.checkpointing.interval: "60000" # checkpoint ogni 60s
execution.checkpointing.mode: "EXACTLY_ONCE"
# Kafka source: committed offset = checkpoint offset
execution.checkpointing.timeout: "300000"
serviceAccount: flink-sa
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 3
resource:
memory: "4096m"
cpu: 2
job:
jarURI: s3://my-flink-jars/analytics-pipeline-1.0.jar
parallelism: 8
upgradeMode: stateful
EOF
Flink SQL: Procesarea fluxului cu standardul SQL
Flink SQL vă permite să tratați fluxurile Kafka ca tabele SQL. Magia și conceptul de ora evenimentului (ora evenimentului): Flink folosește marcajul de timp în datele în sine (nu ora sosirii) pentru ferestre, permițând gestionarea corectă a sosirilor târzii.
DDL: Definiți tabelele sursă Kafka
-- Flink SQL: crea una tabella che legge dal topic Kafka "orders"
CREATE TABLE kafka_orders (
order_id VARCHAR,
user_id VARCHAR,
amount DECIMAL(10, 2),
currency VARCHAR,
status VARCHAR,
created_at BIGINT, -- timestamp in milliseconds (dal payload)
-- Campo virtuale: converte il BIGINT in TIMESTAMP per le window
event_time AS TO_TIMESTAMP_LTZ(created_at, 3),
-- Watermark: permette late arrivals fino a 5 secondi
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
'properties.group.id' = 'flink-analytics-consumer',
'scan.startup.mode' = 'latest-offset', -- o 'earliest-offset' per backfill
-- Schema Registry Avro
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081'
);
-- Tabella di riferimento: catalogo prodotti (changelog topic da Debezium)
CREATE TABLE products_table (
product_id VARCHAR,
product_name VARCHAR,
category VARCHAR,
base_price DECIMAL(10, 2),
-- PRIMARY KEY: indica che e una tabella di lookup (upsert semantics)
PRIMARY KEY (product_id) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'cdc.public.products',
'properties.bootstrap.servers' = 'kafka-1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081',
-- upsert-kafka: gestisce insert/update/delete basandosi sulla chiave
'connector' = 'upsert-kafka'
);
Agregații de ferestre în Flink SQL
-- Aggregazione: totale vendite per categoria ogni 10 minuti (TUMBLE window)
-- TUMBLE: finestre fisse non sovrapposte
SELECT
window_start,
window_end,
p.category,
COUNT(*) AS order_count,
SUM(o.amount) AS total_revenue,
AVG(o.amount) AS avg_order_value,
MAX(o.amount) AS max_order_value
FROM TABLE(
TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
JOIN products_table AS p ON o.product_id = p.product_id
GROUP BY window_start, window_end, p.category;
-- HOP window: sliding window di 1 ora, avanza ogni 15 minuti
-- Utile per medie mobili
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS orders_in_window,
SUM(amount) AS spend_in_window
FROM TABLE(
HOP(TABLE kafka_orders, DESCRIPTOR(event_time),
INTERVAL '15' MINUTE, -- slide interval
INTERVAL '1' HOUR -- window size
)
)
GROUP BY window_start, window_end, user_id;
-- SESSION window: raggruppa eventi dello stesso utente per sessione
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS orders_in_session,
SUM(amount) AS session_value
FROM TABLE(
SESSION(TABLE kafka_orders, DESCRIPTOR(event_time),
INTERVAL '30' MINUTE -- gap: inattivita > 30 min = nuova sessione
)
)
GROUP BY window_start, window_end, user_id;
Stream-Stream Join
-- Join tra due stream entro una finestra temporale
-- Esempio: join ordini con eventi di pagamento (devono arrivare entro 5 minuti)
CREATE TABLE payment_events (
payment_id VARCHAR,
order_id VARCHAR,
payment_time BIGINT,
status VARCHAR,
event_time AS TO_TIMESTAMP_LTZ(payment_time, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'payment-events',
'properties.bootstrap.servers' = 'kafka-1:9092',
'format' = 'json'
);
-- INTERVAL JOIN: join tra stream con condizione temporale
SELECT
o.order_id,
o.user_id,
o.amount,
p.status AS payment_status,
p.payment_time AS paid_at
FROM kafka_orders AS o
JOIN payment_events AS p
ON o.order_id = p.order_id
-- Condizione temporale: il pagamento deve arrivare entro 5 minuti dall'ordine
AND p.event_time BETWEEN o.event_time
AND o.event_time + INTERVAL '5' MINUTE;
Apache Iceberg: Lacul de date cu ACID
Apache Iceberg și un format de tabel open-source pentru lacul de date care aduce ACID (tranzacție, izolare instantanee) garanții pentru stocarea obiectelor precum S3 și GCS. În 2026, și formatul standard de data lakehouse — folosit de Snowflake, Databricks (ca alternativă la Delta), AWS Glue și practic fiecare motor de interogare gestionat.
Integrarea sa cu Flink vă permite să scrieți fără probleme date Kafka pe Iceberg continuă cu garanții tranzacționale: fișierele parchet pe S3 sunt organizate în instantanee imuabil, interogabil cu SELECT CURRENT fără impact asupra scrierilor în curs.
-- Crea un Iceberg catalog in Flink SQL
-- (usa Hive Metastore o REST Catalog di Iceberg)
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://iceberg-rest-catalog:8181',
'warehouse' = 's3://data-lake/warehouse',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);
USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS analytics;
-- Crea la tabella Iceberg di destinazione
CREATE TABLE IF NOT EXISTS analytics.orders_enriched (
order_id VARCHAR,
user_id VARCHAR,
amount DECIMAL(10, 2),
currency VARCHAR,
category VARCHAR,
product_name VARCHAR,
payment_status VARCHAR,
order_hour TIMESTAMP(3), -- per partitioning
created_date DATE -- per partitioning
)
PARTITIONED BY (created_date, category)
WITH (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
-- Compaction automatica: merge dei file piccoli
'write.target-file-size-bytes' = '134217728', -- 128MB
-- Snapshot expiration automatica dopo 7 giorni
'history.expire.min-snapshots-to-keep' = '10',
'history.expire.max-snapshot-age-ms' = '604800000'
);
Conductă completă: de la Kafka la Iceberg
-- Pipeline end-to-end: leggi ordini da Kafka, arricchisci con prodotti,
-- aggrega per 10 min, scrivi su Iceberg in S3
-- Job Flink SQL continuo (eseguito con flink sql-client)
INSERT INTO analytics.orders_enriched
SELECT
o.order_id,
o.user_id,
o.amount,
o.currency,
COALESCE(p.category, 'unknown') AS category,
COALESCE(p.product_name, 'unknown') AS product_name,
pay.status AS payment_status,
window_start AS order_hour,
CAST(window_start AS DATE) AS created_date
FROM TABLE(
TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
LEFT JOIN products_table AS p ON o.product_id = p.product_id
LEFT JOIN payment_events AS pay
ON o.order_id = pay.order_id
AND pay.event_time BETWEEN o.event_time
AND o.event_time + INTERVAL '5' MINUTE;
-- Query su Iceberg da Athena/Spark/Trino (dopo che Flink ha scritto):
-- SELECT
-- created_date,
-- category,
-- COUNT(*) as orders,
-- SUM(amount) as revenue
-- FROM analytics.orders_enriched
-- WHERE created_date >= CURRENT_DATE - INTERVAL '7' DAY
-- GROUP BY created_date, category
-- ORDER BY created_date DESC, revenue DESC;
Confluent Managed Flink
Confluent și-a lansat Apache Flink gestionat integrat în platforma Confluent Cloud. Principalul avantaj: infrastructură zero de gestionat, autoscaling nativ, integrare directă cu Confluent Schema Registry și subiecte Kafka fără configurare manuală.
# Confluent Cloud Flink: workflow
# 1. Accedi alla Confluent Cloud Console
# 2. Naviga in Flink -> Compute pools -> Create pool
# Scegli: Cloud (AWS/Azure/GCP), Region, Max CFUs
# 3. In Flink SQL shell (Confluent Cloud o CLI):
confluent flink shell
# 4. I topic Kafka esistenti sono automaticamente disponibili come tabelle
-- Visualizza i topic come tabelle
SHOW TABLES IN kafka_cluster;
-- Gli schemi Avro dal Schema Registry vengono convertiti automaticamente
-- Non serve configurare 'connector', 'format', 'bootstrap.servers'
-- Tutto e pre-configurato dal managed service
SELECT * FROM `orders` LIMIT 10;
-- 5. Crea un Flink job (continuous query):
INSERT INTO `orders-enriched`
SELECT
order_id,
user_id,
amount,
category
FROM `orders` o
JOIN `products-catalog` p ON o.product_id = p.product_id;
-- 6. Monitora i job dalla UI Confluent
-- Metriche disponibili: records/sec, checkpoint lag, watermark lag
Monitorizare și depanare Flink
# Metriche Flink chiave da monitorare (via Prometheus)
# 1. Checkpoint latency e dimensione
# flink_jobmanager_job_lastCheckpointDuration (ms)
# flink_jobmanager_job_lastCheckpointSize (bytes)
# Se duration > 60s o size > 10GB: problemi di stato o throughput
# 2. Watermark lag (ritardo nel processing degli eventi)
# flink_taskmanager_job_task_operator_currentInputWatermark
# Confronta con System.currentTimeMillis()
# Lag > 30s: il consumer non riesce a tenere il passo con Kafka
# 3. Back pressure
# flink_taskmanager_job_task_isBackPressured (0 o 1)
# Flink UI mostra le frecce rosse per i task sotto pressure
# 4. Kafka consumer lag (via Flink metrics)
# flink_taskmanager_job_task_operator_KafkaSourceReader_kafkaConsumerMetrics_records_lag_max
# Se lag in crescita: aumenta il parallelismo del source operator
# Accedi alla Flink Web UI:
kubectl port-forward svc/analytics-pipeline-rest 8081:8081
# http://localhost:8081 -> Jobs, Task Managers, Checkpoints
# Trigger checkpoint manuale (per debug)
curl -X POST http://flink-jobmanager:8081/jobs/{job-id}/checkpoints
# Cancella un job in modo sicuro (con savepoint per resume)
curl -X POST "http://flink-jobmanager:8081/jobs/{job-id}/stop" \
-H "Content-Type: application/json" \
-d '{"targetDirectory": "s3://my-savepoints/analytics", "drain": true}'
Kafka Streams vs Flink: Când să alegi pe care
- Utilizați Kafka Streams pentru a: agregari simple încorporate în aplicația Java, alăturați-vă KStream-KTable (stream + tabel de căutare), echipe mici fără experiență de clustere distribuite, implementare simplă (și o bibliotecă, nu un cluster).
- Folosiți Flink pentru a: îmbinări temporale între fluxuri multiple, stare foarte mare (GB/TB), CEP (Procesare complexă a evenimentelor), conductă către lacul de date Iceberg/Delta, Flink SQL ca interfață pentru inginerii de date non-Java, scalare agnostică microserviciu de aplicație, latențe sub secunde cu garanții exact o dată.
Concluzii
Combinația Kafka + Flink + Iceberg reprezintă stiva modernă de date Lakehouse în timp real în 2026: Kafka ca coloană vertebrală a fluxului de evenimente, Flink pentru Procesare cu stat cu garanții tranzacționale, Iceberg ca format de stocare ACID privind depozitarea obiectelor. Cu Flink gestionat de Confluent, această stivă este acum accesibilă chiar și echipelor fără expertiză operațională pe clustere distribuite.
Seria completă: Apache Kafka
- Articolul 01 — Fundamentele Apache Kafka: subiecte, partiții și grupuri de consumatori
- Articolul 02 — KRaft în Kafka 4.0: La revedere ZooKeeper
- Articolul 03 — Producător și Consumator avansat
- Articolul 04 — Exact-O dată Semantică în Kafka
- Articolul 05 — Schema Registry: Avro și Protobuf
- Articolul 06 — Fluxuri Kafka: KTable și Windowing
- Articolul 07 — Kafka Connect: Debezium CDC și integrare DB
- Articolul 08 (acest) — Kafka + Apache Flink: Pipeline Analytics Real-Time and Iceberg Sink
- Articolul 09 — Monitorizarea Kafka: JMX Exporter, Prometheus și Grafana
- Articolul 10 — Coada de scrisori moarte și gestionarea erorilor în Kafka
- Articolul 11 — Kafka în producție: dimensionare, reținere și recuperare în caz de dezastru







