Kafka + Apache Flink: Analiza rurociągów w czasie rzeczywistym i Iceberg Sink
Kombinacja Kafka + Flink jest de facto standardem dla potoków analitycznych w czasie rzeczywistym w 2026 r. Dowiedz się, jak zbudować kompleksowy potok za pomocą łącznika Flink SQL Kafka, dynamiczny pochłaniacz Iceberg i zarządzał Flink w Confluent Cloud.
Dlaczego Kafka + Flink w 2026 roku
W 2026 roku stosem referencyjnym stała się kombinacja Apache Kafka + Apache Flink do analiz w czasie rzeczywistym w dojrzałych firmach technologicznych. Kafka obsługuje spożycie oraz buforowanie danych z gwarancją trwałości i skalowalności; Oferty Flink przetwarzania stanowego z semantyką „dokładnie raz”, oknami czasowymi i złożonymi połączeniami których Kafka Streams nie obsługuje natywnie.
Kluczową zmianą w 2025 r. było masowe przyjęcie Przesuń SQL jako główny interfejs potoków analitycznych: zamiast pisać DataStream API w Javie inżynierowie danych piszą standardowy SQL w strumieniach i tabelach. Confluent wystartował jego Zarządzany Flink (Apache Flink jako usługa) z natywną integracją do rejestru Confluent Schema Registry i Apache Iceberg, dzięki czemu dla wielu zespołów stos staje się zerowy.
Czego się nauczysz
- Architektura Flink: JobManager, TaskManager, punkty kontrolne za pomocą Kafki
- Flink SQL: Twórz tabele źródłowe i ujścia Kafki za pomocą DDL
- Operacje czasowe: okna TUMBLE, HOP, SESSION w Flink SQL
- Łączenie strumieniowe: łączenie strumień-strumień i łączenie tymczasowe z tabelą przeglądową
- Apache Iceberg: format tabeli dla jeziora danych, ACID w magazynie obiektów
- Kompleksowy rurociąg: Kafka -> Flink -> Iceberg na S3
- Confluent Managed Flink: konfiguracja i różnice w stosunku do samodzielnego hostowania
Architektura Flink z Kafką
Apache Flink to platforma do przetwarzania strumieni rozproszonych. Jego mocna strona w porównaniu do Kafka Streams i możliwość zarządzania jesteś bardzo wspaniały (TB) z RocksDB i punktami kontrolnymi w rozproszonej pamięci masowej (S3/GCS), złożone połączenia pomiędzy nimi wiele strumieni i CEP (Complex Event Processing).
# Architettura Flink:
# JobManager: coordina l'esecuzione del job, gestisce i checkpoint
# - unico (o 2 con HA tramite ZooKeeper/Kubernetes leader election)
# - scheduler, checkpoint coordinator, metastore
# TaskManager: esegue i task paralleli (equivalente dei worker)
# - ogni TaskManager ha N "slots" (unita di parallelismo)
# - slot = thread dedicato con stato RocksDB locale
# Checkpoint: snapshot periodico dello stato su S3/GCS
# - se un TaskManager crasha, il job riprende dall'ultimo checkpoint
# - integrazione nativa con Kafka: salva l'offset Kafka nel checkpoint
# - garantisce esattamente-once end-to-end
# Deployment su Kubernetes (Flink Operator)
# helm repo add flink-operator-repo \
# https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
# FlinkDeployment CRD:
cat <<'EOF' | kubectl apply -f -
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: analytics-pipeline
spec:
image: apache/flink:1.19-scala_2.12-java17
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
state.backend: "rocksdb"
state.backend.incremental: "true"
state.checkpoints.dir: "s3://my-flink-checkpoints/analytics-pipeline"
execution.checkpointing.interval: "60000" # checkpoint ogni 60s
execution.checkpointing.mode: "EXACTLY_ONCE"
# Kafka source: committed offset = checkpoint offset
execution.checkpointing.timeout: "300000"
serviceAccount: flink-sa
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 3
resource:
memory: "4096m"
cpu: 2
job:
jarURI: s3://my-flink-jars/analytics-pipeline-1.0.jar
parallelism: 8
upgradeMode: stateful
EOF
Flink SQL: przetwarzanie strumieniowe w standardzie SQL
Flink SQL pozwala traktować strumienie Kafki jako tabele SQL. Magia i koncepcja z czas wydarzenia (czas zdarzenia): Flink używa znacznika czasu w samych danych (a nie czas przybycia) dla okien, umożliwiając prawidłowe zarządzanie spóźnieniami.
DDL: Zdefiniuj tabele źródłowe Kafki
-- Flink SQL: crea una tabella che legge dal topic Kafka "orders"
CREATE TABLE kafka_orders (
order_id VARCHAR,
user_id VARCHAR,
amount DECIMAL(10, 2),
currency VARCHAR,
status VARCHAR,
created_at BIGINT, -- timestamp in milliseconds (dal payload)
-- Campo virtuale: converte il BIGINT in TIMESTAMP per le window
event_time AS TO_TIMESTAMP_LTZ(created_at, 3),
-- Watermark: permette late arrivals fino a 5 secondi
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
'properties.group.id' = 'flink-analytics-consumer',
'scan.startup.mode' = 'latest-offset', -- o 'earliest-offset' per backfill
-- Schema Registry Avro
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081'
);
-- Tabella di riferimento: catalogo prodotti (changelog topic da Debezium)
CREATE TABLE products_table (
product_id VARCHAR,
product_name VARCHAR,
category VARCHAR,
base_price DECIMAL(10, 2),
-- PRIMARY KEY: indica che e una tabella di lookup (upsert semantics)
PRIMARY KEY (product_id) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'topic' = 'cdc.public.products',
'properties.bootstrap.servers' = 'kafka-1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081',
-- upsert-kafka: gestisce insert/update/delete basandosi sulla chiave
'connector' = 'upsert-kafka'
);
Agregacje okien w Flink SQL
-- Aggregazione: totale vendite per categoria ogni 10 minuti (TUMBLE window)
-- TUMBLE: finestre fisse non sovrapposte
SELECT
window_start,
window_end,
p.category,
COUNT(*) AS order_count,
SUM(o.amount) AS total_revenue,
AVG(o.amount) AS avg_order_value,
MAX(o.amount) AS max_order_value
FROM TABLE(
TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
JOIN products_table AS p ON o.product_id = p.product_id
GROUP BY window_start, window_end, p.category;
-- HOP window: sliding window di 1 ora, avanza ogni 15 minuti
-- Utile per medie mobili
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS orders_in_window,
SUM(amount) AS spend_in_window
FROM TABLE(
HOP(TABLE kafka_orders, DESCRIPTOR(event_time),
INTERVAL '15' MINUTE, -- slide interval
INTERVAL '1' HOUR -- window size
)
)
GROUP BY window_start, window_end, user_id;
-- SESSION window: raggruppa eventi dello stesso utente per sessione
SELECT
window_start,
window_end,
user_id,
COUNT(*) AS orders_in_session,
SUM(amount) AS session_value
FROM TABLE(
SESSION(TABLE kafka_orders, DESCRIPTOR(event_time),
INTERVAL '30' MINUTE -- gap: inattivita > 30 min = nuova sessione
)
)
GROUP BY window_start, window_end, user_id;
Dołącz do strumienia
-- Join tra due stream entro una finestra temporale
-- Esempio: join ordini con eventi di pagamento (devono arrivare entro 5 minuti)
CREATE TABLE payment_events (
payment_id VARCHAR,
order_id VARCHAR,
payment_time BIGINT,
status VARCHAR,
event_time AS TO_TIMESTAMP_LTZ(payment_time, 3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'payment-events',
'properties.bootstrap.servers' = 'kafka-1:9092',
'format' = 'json'
);
-- INTERVAL JOIN: join tra stream con condizione temporale
SELECT
o.order_id,
o.user_id,
o.amount,
p.status AS payment_status,
p.payment_time AS paid_at
FROM kafka_orders AS o
JOIN payment_events AS p
ON o.order_id = p.order_id
-- Condizione temporale: il pagamento deve arrivare entro 5 minuti dall'ordine
AND p.event_time BETWEEN o.event_time
AND o.event_time + INTERVAL '5' MINUTE;
Apache Iceberg: jezioro danych z ACID
Góra lodowa Apache oraz format tabeli typu open source dla jeziora danych co zapewnia gwarancje ACID (transakcja, izolacja migawki) w pamięci obiektowej, takiej jak S3 i GCS. W 2026 r. oraz standardowy format data Lakehouse — używany przez Snowflake, Databricks (jako alternatywa dla Delta), AWS Glue i praktycznie każdy zarządzany silnik zapytań.
Integracja z Flink pozwala na bezproblemowy zapis danych Kafki do Iceberg ciągły z gwarancjami transakcyjnymi: pliki Parquet na S3 są zorganizowane w migawki niezmienne, z możliwością zapytania za pomocą SELECT CURRENT bez wpływu na trwające zapisy.
-- Crea un Iceberg catalog in Flink SQL
-- (usa Hive Metastore o REST Catalog di Iceberg)
CREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://iceberg-rest-catalog:8181',
'warehouse' = 's3://data-lake/warehouse',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);
USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS analytics;
-- Crea la tabella Iceberg di destinazione
CREATE TABLE IF NOT EXISTS analytics.orders_enriched (
order_id VARCHAR,
user_id VARCHAR,
amount DECIMAL(10, 2),
currency VARCHAR,
category VARCHAR,
product_name VARCHAR,
payment_status VARCHAR,
order_hour TIMESTAMP(3), -- per partitioning
created_date DATE -- per partitioning
)
PARTITIONED BY (created_date, category)
WITH (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy',
-- Compaction automatica: merge dei file piccoli
'write.target-file-size-bytes' = '134217728', -- 128MB
-- Snapshot expiration automatica dopo 7 giorni
'history.expire.min-snapshots-to-keep' = '10',
'history.expire.max-snapshot-age-ms' = '604800000'
);
Kompletny rurociąg: Kafka do góry lodowej
-- Pipeline end-to-end: leggi ordini da Kafka, arricchisci con prodotti,
-- aggrega per 10 min, scrivi su Iceberg in S3
-- Job Flink SQL continuo (eseguito con flink sql-client)
INSERT INTO analytics.orders_enriched
SELECT
o.order_id,
o.user_id,
o.amount,
o.currency,
COALESCE(p.category, 'unknown') AS category,
COALESCE(p.product_name, 'unknown') AS product_name,
pay.status AS payment_status,
window_start AS order_hour,
CAST(window_start AS DATE) AS created_date
FROM TABLE(
TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
LEFT JOIN products_table AS p ON o.product_id = p.product_id
LEFT JOIN payment_events AS pay
ON o.order_id = pay.order_id
AND pay.event_time BETWEEN o.event_time
AND o.event_time + INTERVAL '5' MINUTE;
-- Query su Iceberg da Athena/Spark/Trino (dopo che Flink ha scritto):
-- SELECT
-- created_date,
-- category,
-- COUNT(*) as orders,
-- SUM(amount) as revenue
-- FROM analytics.orders_enriched
-- WHERE created_date >= CURRENT_DATE - INTERVAL '7' DAY
-- GROUP BY created_date, category
-- ORDER BY created_date DESC, revenue DESC;
Confluent zarządzany Flink
Confluent uruchomił swoją Zarządzany Apache Flink zintegrowany na platformie Confluent Cloud. Główna zaleta: zero infrastruktury do zarządzania, natywne autoskalowanie, bezpośrednia integracja z Confluent Schema Registry i tematami Kafki bez ręcznej konfiguracji.
# Confluent Cloud Flink: workflow
# 1. Accedi alla Confluent Cloud Console
# 2. Naviga in Flink -> Compute pools -> Create pool
# Scegli: Cloud (AWS/Azure/GCP), Region, Max CFUs
# 3. In Flink SQL shell (Confluent Cloud o CLI):
confluent flink shell
# 4. I topic Kafka esistenti sono automaticamente disponibili come tabelle
-- Visualizza i topic come tabelle
SHOW TABLES IN kafka_cluster;
-- Gli schemi Avro dal Schema Registry vengono convertiti automaticamente
-- Non serve configurare 'connector', 'format', 'bootstrap.servers'
-- Tutto e pre-configurato dal managed service
SELECT * FROM `orders` LIMIT 10;
-- 5. Crea un Flink job (continuous query):
INSERT INTO `orders-enriched`
SELECT
order_id,
user_id,
amount,
category
FROM `orders` o
JOIN `products-catalog` p ON o.product_id = p.product_id;
-- 6. Monitora i job dalla UI Confluent
-- Metriche disponibili: records/sec, checkpoint lag, watermark lag
Monitorowanie i rozwiązywanie problemów Flink
# Metriche Flink chiave da monitorare (via Prometheus)
# 1. Checkpoint latency e dimensione
# flink_jobmanager_job_lastCheckpointDuration (ms)
# flink_jobmanager_job_lastCheckpointSize (bytes)
# Se duration > 60s o size > 10GB: problemi di stato o throughput
# 2. Watermark lag (ritardo nel processing degli eventi)
# flink_taskmanager_job_task_operator_currentInputWatermark
# Confronta con System.currentTimeMillis()
# Lag > 30s: il consumer non riesce a tenere il passo con Kafka
# 3. Back pressure
# flink_taskmanager_job_task_isBackPressured (0 o 1)
# Flink UI mostra le frecce rosse per i task sotto pressure
# 4. Kafka consumer lag (via Flink metrics)
# flink_taskmanager_job_task_operator_KafkaSourceReader_kafkaConsumerMetrics_records_lag_max
# Se lag in crescita: aumenta il parallelismo del source operator
# Accedi alla Flink Web UI:
kubectl port-forward svc/analytics-pipeline-rest 8081:8081
# http://localhost:8081 -> Jobs, Task Managers, Checkpoints
# Trigger checkpoint manuale (per debug)
curl -X POST http://flink-jobmanager:8081/jobs/{job-id}/checkpoints
# Cancella un job in modo sicuro (con savepoint per resume)
curl -X POST "http://flink-jobmanager:8081/jobs/{job-id}/stop" \
-H "Content-Type: application/json" \
-d '{"targetDirectory": "s3://my-savepoints/analytics", "drain": true}'
Strumienie Kafki a Flink: kiedy wybrać
- Użyj strumieni Kafka, aby: proste agregacje osadzone w aplikacji Java, dołącz do KStream-KTable (strumień + tabela przeglądowa), małe zespoły bez doświadczenia rozproszonych klastrów, proste wdrożenie (i biblioteka, a nie klaster).
- Użyj Flink, aby: tymczasowe połączenia między wieloma strumieniami, stan bardzo duży (GB/TB), CEP (Complex Event Processing), rurociąg do jeziora danych Iceberg/Delta, Flink SQL jako interfejs dla inżynierów danych innych niż Java, bez skalowania mikrousługa aplikacji, opóźnienia poniżej sekundy z gwarancją dokładnie jednorazową.
Wnioski
Kombinacja Kafka + Flink + Iceberg reprezentuje nowoczesny stos danych Lakehouse w czasie rzeczywistym w 2026 r.: Kafka jako szkielet transmisji wydarzeń, Flink dla Przetwarzanie stanowe z gwarancjami transakcyjnymi, Iceberg jako format przechowywania ACID na temat przechowywania obiektów. Dzięki zarządzanemu przez Confluent Flink ten stos jest teraz dostępny nawet zespołom nieposiadającym specjalistycznej wiedzy operacyjnej w zakresie rozproszonych klastrów.
Cała seria: Apache Kafka
- Artykuł 01 — Podstawy Apache Kafka: tematy, partycje i grupy konsumentów
- Artykuł 02 — KRaft w Kafce 4.0: Żegnaj ZooStrażniku
- Artykuł 03 — Zaawansowany producent i konsument
- Artykuł 04 — Semantyka dokładnie raz w Kafce
- Artykuł 05 — Rejestr schematów: Avro i Protobuf
- Artykuł 06 — Strumienie Kafki: KTable i Windowing
- Artykuł 07 — Kafka Connect: Integracja Debezium CDC i DB
- Artykuł 08 (ten) — Kafka + Apache Flink: Analiza rurociągów w czasie rzeczywistym i Iceberg Sink
- Artykuł 09 — Monitorowanie Kafki: JMX Exporter, Prometheus i Grafana
- Artykuł 10 — Kolejka niedostarczonych listów i obsługa błędów w Kafce
- Artykuł 11 — Kafka w produkcji: rozmiar, przechowywanie i odzyskiwanie po awarii







