2026년에 Kafka와 Flink를 합친 이유

2026년에는 Apache Kafka + Apache Flink 조합이 참조 스택이 되었습니다. 성숙한 기술 기업의 실시간 분석 파이프라인을 위한 것입니다. Kafka가 수집을 처리합니다. 내구성과 확장성을 보장하는 데이터 버퍼링; Flink 거래 정확히 한 번 의미론, 임시 창 및 복잡한 조인을 사용한 상태 저장 처리 Kafka Streams는 기본적으로 지원하지 않습니다.

2025년의 중요한 변화는 대규모 채택이었습니다. 플링크 SQL 분석 파이프라인의 기본 인터페이스로: DataStream API를 작성하는 대신 Java에서 데이터 엔지니어는 스트림과 테이블에 표준 SQL을 작성합니다. 컨플루언트(Confluent)가 출시되었습니다. 그의 관리형 플링크 (Apache Flink as a Service) 기본 통합 Confluent Schema Registry 및 Apache Iceberg에 추가되어 많은 팀이 스택을 제로옵스로 만들 수 있습니다.

무엇을 배울 것인가

  • Flink 아키텍처: JobManager, TaskManager, Kafka를 사용한 체크포인트
  • Flink SQL: DDL을 사용하여 Kafka 소스 및 싱크 테이블 만들기
  • 임시 작업: Flink SQL의 TUMBLE, HOP, SESSION 창
  • 스트리밍 조인: 조회 테이블을 사용한 스트림-스트림 조인 및 임시 조인
  • Apache Iceberg: 데이터 레이크의 테이블 형식, 객체 스토리지의 ACID
  • 엔드투엔드 파이프라인: Kafka -> Flink -> S3의 Iceberg
  • Confluent Managed Flink: 자체 호스팅과의 구성 및 차이점

Kafka를 사용한 Flink 아키텍처

Apache Flink는 분산 스트림 처리 프레임워크입니다. 그 강점은 Kafka Streams 및 관리 능력과 비교 당신은 정말 대단해요 (TB) RocksDB와 분산 스토리지(S3/GCS)의 체크포인트, 다중 스트림 및 CEP(복합 이벤트 처리).

# Architettura Flink:
# JobManager: coordina l'esecuzione del job, gestisce i checkpoint
#   - unico (o 2 con HA tramite ZooKeeper/Kubernetes leader election)
#   - scheduler, checkpoint coordinator, metastore

# TaskManager: esegue i task paralleli (equivalente dei worker)
#   - ogni TaskManager ha N "slots" (unita di parallelismo)
#   - slot = thread dedicato con stato RocksDB locale

# Checkpoint: snapshot periodico dello stato su S3/GCS
#   - se un TaskManager crasha, il job riprende dall'ultimo checkpoint
#   - integrazione nativa con Kafka: salva l'offset Kafka nel checkpoint
#   - garantisce esattamente-once end-to-end

# Deployment su Kubernetes (Flink Operator)
# helm repo add flink-operator-repo \
#   https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

# FlinkDeployment CRD:
cat <<'EOF' | kubectl apply -f -
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: analytics-pipeline
spec:
  image: apache/flink:1.19-scala_2.12-java17
  flinkVersion: v1_19
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: "rocksdb"
    state.backend.incremental: "true"
    state.checkpoints.dir: "s3://my-flink-checkpoints/analytics-pipeline"
    execution.checkpointing.interval: "60000"    # checkpoint ogni 60s
    execution.checkpointing.mode: "EXACTLY_ONCE"
    # Kafka source: committed offset = checkpoint offset
    execution.checkpointing.timeout: "300000"
  serviceAccount: flink-sa
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    replicas: 3
    resource:
      memory: "4096m"
      cpu: 2
  job:
    jarURI: s3://my-flink-jars/analytics-pipeline-1.0.jar
    parallelism: 8
    upgradeMode: stateful
EOF

Flink SQL: SQL 표준을 사용한 스트림 처리

Flink SQL을 사용하면 Kafka 스트림을 SQL 테이블로 처리할 수 있습니다. 마술과 개념 의 이벤트 시간 (이벤트 시간): Flink는 데이터 자체의 타임스탬프를 사용합니다. (도착시간 아님) 창구를 마련해 늦게 도착하는 손님도 제대로 관리할 수 있도록 해준다.

DDL: Kafka 소스 테이블 정의

-- Flink SQL: crea una tabella che legge dal topic Kafka "orders"

CREATE TABLE kafka_orders (
    order_id    VARCHAR,
    user_id     VARCHAR,
    amount      DECIMAL(10, 2),
    currency    VARCHAR,
    status      VARCHAR,
    created_at  BIGINT,            -- timestamp in milliseconds (dal payload)
    -- Campo virtuale: converte il BIGINT in TIMESTAMP per le window
    event_time  AS TO_TIMESTAMP_LTZ(created_at, 3),
    -- Watermark: permette late arrivals fino a 5 secondi
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
    'properties.group.id' = 'flink-analytics-consumer',
    'scan.startup.mode' = 'latest-offset',  -- o 'earliest-offset' per backfill
    -- Schema Registry Avro
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081'
);

-- Tabella di riferimento: catalogo prodotti (changelog topic da Debezium)
CREATE TABLE products_table (
    product_id    VARCHAR,
    product_name  VARCHAR,
    category      VARCHAR,
    base_price    DECIMAL(10, 2),
    -- PRIMARY KEY: indica che e una tabella di lookup (upsert semantics)
    PRIMARY KEY (product_id) NOT ENFORCED
)
WITH (
    'connector' = 'kafka',
    'topic' = 'cdc.public.products',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro-confluent',
    'avro-confluent.url' = 'http://schema-registry:8081',
    -- upsert-kafka: gestisce insert/update/delete basandosi sulla chiave
    'connector' = 'upsert-kafka'
);

Flink SQL의 창 집계

-- Aggregazione: totale vendite per categoria ogni 10 minuti (TUMBLE window)
-- TUMBLE: finestre fisse non sovrapposte

SELECT
    window_start,
    window_end,
    p.category,
    COUNT(*)            AS order_count,
    SUM(o.amount)       AS total_revenue,
    AVG(o.amount)       AS avg_order_value,
    MAX(o.amount)       AS max_order_value
FROM TABLE(
    TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
JOIN products_table AS p ON o.product_id = p.product_id
GROUP BY window_start, window_end, p.category;

-- HOP window: sliding window di 1 ora, avanza ogni 15 minuti
-- Utile per medie mobili
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)     AS orders_in_window,
    SUM(amount)  AS spend_in_window
FROM TABLE(
    HOP(TABLE kafka_orders, DESCRIPTOR(event_time),
        INTERVAL '15' MINUTE,   -- slide interval
        INTERVAL '1' HOUR       -- window size
    )
)
GROUP BY window_start, window_end, user_id;

-- SESSION window: raggruppa eventi dello stesso utente per sessione
SELECT
    window_start,
    window_end,
    user_id,
    COUNT(*)     AS orders_in_session,
    SUM(amount)  AS session_value
FROM TABLE(
    SESSION(TABLE kafka_orders, DESCRIPTOR(event_time),
        INTERVAL '30' MINUTE   -- gap: inattivita > 30 min = nuova sessione
    )
)
GROUP BY window_start, window_end, user_id;

스트림-스트림 조인

-- Join tra due stream entro una finestra temporale
-- Esempio: join ordini con eventi di pagamento (devono arrivare entro 5 minuti)

CREATE TABLE payment_events (
    payment_id   VARCHAR,
    order_id     VARCHAR,
    payment_time BIGINT,
    status       VARCHAR,
    event_time   AS TO_TIMESTAMP_LTZ(payment_time, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'payment-events',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'format' = 'json'
);

-- INTERVAL JOIN: join tra stream con condizione temporale
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    p.status         AS payment_status,
    p.payment_time   AS paid_at
FROM kafka_orders AS o
JOIN payment_events AS p
ON o.order_id = p.order_id
-- Condizione temporale: il pagamento deve arrivare entro 5 minuti dall'ordine
AND p.event_time BETWEEN o.event_time
    AND o.event_time + INTERVAL '5' MINUTE;

Apache Iceberg: ACID를 사용한 데이터 레이크

아파치 빙산 데이터 레이크를 위한 오픈 소스 테이블 형식 S3와 같은 객체 스토리지에 ACID(트랜잭션, 스냅샷 격리) 보장을 제공합니다. 그리고 GCS. 2026년에는 표준 데이터 레이크하우스 형식(Snowflake, Databricks에서 사용) (델타의 대안으로), AWS Glue 및 거의 모든 관리형 쿼리 엔진.

Flink와의 통합을 통해 Kafka 데이터를 Iceberg에 원활하게 쓸 수 있습니다. 지속적인 트랜잭션 보장: S3의 Parquet 파일은 스냅샷으로 구성됩니다. 진행 중인 쓰기에 영향을 주지 않고 SELECT CURRENT를 사용하여 변경할 수 없고 쿼리할 수 있습니다.

-- Crea un Iceberg catalog in Flink SQL
-- (usa Hive Metastore o REST Catalog di Iceberg)

CREATE CATALOG iceberg_catalog WITH (
    'type' = 'iceberg',
    'catalog-type' = 'rest',
    'uri' = 'http://iceberg-rest-catalog:8181',
    'warehouse' = 's3://data-lake/warehouse',
    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO'
);

USE CATALOG iceberg_catalog;
CREATE DATABASE IF NOT EXISTS analytics;

-- Crea la tabella Iceberg di destinazione
CREATE TABLE IF NOT EXISTS analytics.orders_enriched (
    order_id        VARCHAR,
    user_id         VARCHAR,
    amount          DECIMAL(10, 2),
    currency        VARCHAR,
    category        VARCHAR,
    product_name    VARCHAR,
    payment_status  VARCHAR,
    order_hour      TIMESTAMP(3),     -- per partitioning
    created_date    DATE              -- per partitioning
)
PARTITIONED BY (created_date, category)
WITH (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy',
    -- Compaction automatica: merge dei file piccoli
    'write.target-file-size-bytes' = '134217728',  -- 128MB
    -- Snapshot expiration automatica dopo 7 giorni
    'history.expire.min-snapshots-to-keep' = '10',
    'history.expire.max-snapshot-age-ms' = '604800000'
);

완전한 파이프라인: Kafka에서 Iceberg까지

-- Pipeline end-to-end: leggi ordini da Kafka, arricchisci con prodotti,
-- aggrega per 10 min, scrivi su Iceberg in S3

-- Job Flink SQL continuo (eseguito con flink sql-client)
INSERT INTO analytics.orders_enriched
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    o.currency,
    COALESCE(p.category, 'unknown')       AS category,
    COALESCE(p.product_name, 'unknown')   AS product_name,
    pay.status                             AS payment_status,
    window_start                           AS order_hour,
    CAST(window_start AS DATE)             AS created_date
FROM TABLE(
    TUMBLE(TABLE kafka_orders, DESCRIPTOR(event_time), INTERVAL '10' MINUTE)
) AS o
LEFT JOIN products_table AS p ON o.product_id = p.product_id
LEFT JOIN payment_events AS pay
    ON o.order_id = pay.order_id
    AND pay.event_time BETWEEN o.event_time
        AND o.event_time + INTERVAL '5' MINUTE;

-- Query su Iceberg da Athena/Spark/Trino (dopo che Flink ha scritto):
-- SELECT
--     created_date,
--     category,
--     COUNT(*) as orders,
--     SUM(amount) as revenue
-- FROM analytics.orders_enriched
-- WHERE created_date >= CURRENT_DATE - INTERVAL '7' DAY
-- GROUP BY created_date, category
-- ORDER BY created_date DESC, revenue DESC;

Confluent 관리형 Flink

Confluent가 출시한 Apache Flink 관리 통합 Confluent Cloud 플랫폼에서. 주요 이점: 관리할 인프라가 전혀 없으며, 기본 자동 크기 조정, Confluent Schema Registry 및 Kafka 주제와의 직접 통합 수동 구성 없이.

# Confluent Cloud Flink: workflow

# 1. Accedi alla Confluent Cloud Console
# 2. Naviga in Flink -> Compute pools -> Create pool
# Scegli: Cloud (AWS/Azure/GCP), Region, Max CFUs

# 3. In Flink SQL shell (Confluent Cloud o CLI):
confluent flink shell

# 4. I topic Kafka esistenti sono automaticamente disponibili come tabelle
-- Visualizza i topic come tabelle
SHOW TABLES IN kafka_cluster;

-- Gli schemi Avro dal Schema Registry vengono convertiti automaticamente
-- Non serve configurare 'connector', 'format', 'bootstrap.servers'
-- Tutto e pre-configurato dal managed service
SELECT * FROM `orders` LIMIT 10;

-- 5. Crea un Flink job (continuous query):
INSERT INTO `orders-enriched`
SELECT
    order_id,
    user_id,
    amount,
    category
FROM `orders` o
JOIN `products-catalog` p ON o.product_id = p.product_id;

-- 6. Monitora i job dalla UI Confluent
-- Metriche disponibili: records/sec, checkpoint lag, watermark lag

Flink 모니터링 및 문제 해결

# Metriche Flink chiave da monitorare (via Prometheus)

# 1. Checkpoint latency e dimensione
# flink_jobmanager_job_lastCheckpointDuration (ms)
# flink_jobmanager_job_lastCheckpointSize (bytes)
# Se duration > 60s o size > 10GB: problemi di stato o throughput

# 2. Watermark lag (ritardo nel processing degli eventi)
# flink_taskmanager_job_task_operator_currentInputWatermark
# Confronta con System.currentTimeMillis()
# Lag > 30s: il consumer non riesce a tenere il passo con Kafka

# 3. Back pressure
# flink_taskmanager_job_task_isBackPressured (0 o 1)
# Flink UI mostra le frecce rosse per i task sotto pressure

# 4. Kafka consumer lag (via Flink metrics)
# flink_taskmanager_job_task_operator_KafkaSourceReader_kafkaConsumerMetrics_records_lag_max
# Se lag in crescita: aumenta il parallelismo del source operator

# Accedi alla Flink Web UI:
kubectl port-forward svc/analytics-pipeline-rest 8081:8081
# http://localhost:8081 -> Jobs, Task Managers, Checkpoints

# Trigger checkpoint manuale (per debug)
curl -X POST http://flink-jobmanager:8081/jobs/{job-id}/checkpoints

# Cancella un job in modo sicuro (con savepoint per resume)
curl -X POST "http://flink-jobmanager:8081/jobs/{job-id}/stop" \
  -H "Content-Type: application/json" \
  -d '{"targetDirectory": "s3://my-savepoints/analytics", "drain": true}'

Kafka Streams와 Flink: 언제 무엇을 선택해야 할까요?

  • Kafka Streams를 사용하여 다음을 수행합니다. Java 앱에 포함된 간단한 집계, 경험이 없는 소규모 팀인 KStream-KTable(스트림 + 조회 테이블)에 참여하세요. 분산 클러스터, 간단한 배포(및 클러스터가 아닌 라이브러리).
  • Flink를 사용하여 다음을 수행하십시오. 여러 스트림 간의 임시 조인, 상태가 매우 큼 (GB/TB), CEP(복합 이벤트 처리), Iceberg/Delta 데이터 레이크로의 파이프라인, 비Java 데이터 엔지니어를 위한 인터페이스인 Flink SQL, 확장에 구애받지 않음 애플리케이션 마이크로서비스, 정확히 한 번 보장되는 1초 미만의 대기 시간.

결론

Kafka + Flink + Iceberg 조합은 최신 데이터 스택을 나타냅니다. 2026년 실시간 Lakehouse: 이벤트 스트리밍 백본으로서의 Kafka, 트랜잭션 보장을 통한 상태 저장 처리, ACID 저장 형식인 Iceberg 객체 스토리지에 대해 Confluent의 관리형 Flink를 사용하면 이제 이 스택에 액세스할 수 있습니다. 분산 클러스터에 대한 운영 전문 지식이 없는 팀에게도 마찬가지입니다.

전체 시리즈: Apache Kafka

  • 제01조 — Apache Kafka 기본 사항: 주제, 파티션 및 소비자 그룹
  • 제02조 — Kafka 4.0의 KRaft: 안녕 ZooKeeper
  • 제03조 — 고급 생산자 및 소비자
  • 제04조 — Kafka의 정확히 한 번 의미론
  • 제05조 — 스키마 레지스트리: Avro 및 Protobuf
  • 제06조 — Kafka Streams: KTable 및 Windowing
  • 제07조 — Kafka Connect: Debezium CDC 및 DB 통합
  • 제08조(본) — Kafka + Apache Flink: 실시간 파이프라인 분석 및 Iceberg Sink
  • 제09조 — Kafka 모니터링: JMX 내보내기, Prometheus 및 Grafana
  • 제10조 — Kafka의 배달 못한 편지 대기열 및 오류 처리
  • 제11조 — 프로덕션 중인 Kafka: 크기 조정, 보존 및 재해 복구