Kafka Connect: Integrace bez kódu

Každý systém zasílání zpráv potřebuje přesouvat data do az externích systémů: databáze, objektové úložiště, Elasticsearch, Salesforce, starší systémy. Bez Kafka Connect, každá integrace vyžaduje vlastní kód: výrobce, který čte z databáze, spotřebitel kdo píše na S3. Kafka Connect to standardizuje pomocí konektorového rámce konfigurovatelné, které automaticky řídí škálovatelnost, odolnost proti chybám a to přesně jednou.

V roce 2026 katalog dostupných konektorů překračuje 1000 konektorů mezi open-source a komerčním (Confluent Hub). Nejběžnější případy použití jsou: CDC (Change Data Capture) z relačních databází přes Debezium, replikace směrem S3/GCS pro datové jezero, synchronizace s Elasticsearch pro fulltextové vyhledávání.

Co se naučíte

  • Architektura Kafka Connect: worker, konektor, úkol, offset management
  • Režim nasazení: samostatný vs distribuovaný
  • Debezium PostgreSQL Source Connector: nastavení, konfigurace WAL, snímek
  • Debezium MySQL Source Connector: replikace binlogu
  • Dřezové konektory: S3, Elasticsearch, JDBC
  • Single Message Transforms (SMT): filtrování, přejmenování, obohacení záznamů
  • Monitorování: JMX metriky a správa chyb ve výrobě

Architektura: Worker, Connector a Task

# Componenti di Kafka Connect:

# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test

# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno

# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella

# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata

# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
#   image: confluentinc/cp-kafka-connect:7.6.0
#   environment:
#     CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
#     CONNECT_REST_PORT: 8083
#     CONNECT_GROUP_ID: "kafka-connect-cluster"
#     CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
#     CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
#     CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
#     CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
#     # Plugin path: directory con i JAR dei connector
#     CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
#   volumes:
#     - ./connect-plugins:/usr/share/confluent-hub-components

Debezium: Změna Data Capture z PostgreSQL

Debezium Je to nejpoužívanější CDC konektor u Kafky. Místo dělat dotazování databáze (které má vliv na výkon a nezachycuje smazání), Debezium čte přímo Protokol zápisu dopředu (WAL) PostgreSQL: each INSERT, UPDATE and DELETE generates a Kafka event with the data before and after the modification.

Konfigurace PostgreSQL pro CDC

-- PostgreSQL: abilitare logical replication per Debezium

-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10

-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
  LOGIN
  PASSWORD 'secure_password_here'
  REPLICATION;  -- Necessario per leggere il WAL

-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;

-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;

-- 6. Verifica che logical replication sia abilitata
SHOW wal_level;  -- deve essere: logical

Konfigurace konektoru Debezium PostgreSQL

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-host",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password_here",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce",

    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "slot.name": "debezium_slot",

    "table.include.list": "public.orders,public.users,public.products",

    "topic.prefix": "cdc",

    "decimal.handling.mode": "double",
    "binary.handling.mode": "base64",

    "snapshot.mode": "initial",

    "heartbeat.interval.ms": "10000",
    "slot.drop.on.stop": "false",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "errors.retry.timeout": "60000",
    "errors.retry.delay.max.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
    "errors.deadletterqueue.topic.replication.factor": "3"
  }
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-source-connector.json

# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
#   "name": "postgres-source-connector",
#   "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
#   "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }

# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
#   "id": 123,
#   "user_id": "user-42",
#   "amount": 99.99,
#   "__deleted": "false",     # "true" per DELETE events (con delete.handling.mode=rewrite)
#   "__op": "u",              # c=insert, u=update, d=delete, r=read (snapshot)
#   "__source_ts_ms": 1710928200000
# }

Debezium MySQL: Konfigurace Binlog

-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id         = 1
-- log_bin           = mysql-bin
-- binlog_format     = ROW
-- binlog_row_image  = FULL
-- expire_logs_days  = 10

-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "1",
    "topic.prefix": "cdc-mysql",
    "database.include.list": "shop",
    "table.include.list": "shop.orders,shop.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
    "schema.history.internal.kafka.topic": "__debezium-schema-history",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

Transformace jedné zprávy (SMT)

Le Transformace jedné zprávy jsou to použitelné světelné transformace ke každému záznamu v potrubí Connect, aniž byste potřebovali Kafka Streams. Jsou řetězitelné (řetězec SMT) a velmi užitečné pro jednoduché případy, jako je přejmenování polí, přidání časových razítek, filtrovat záznamy podle hodnoty nebo směrování k různým tématům.

{
  "transforms": "renameField,addTimestamp,filterDeleted,routeByTable",

  "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.renameField.renames": "user_id:userId,order_id:orderId",

  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",

  "transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "transforms.filterDeleted.condition.name": "__deleted",
  "transforms.filterDeleted.negate": "true",

  "transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
  "transforms.routeByTable.replacement": "db.changes.$1"
}

Dřezový konektor: Od Kafky po S3

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "cdc.public.orders,cdc.public.users",

    "s3.region": "eu-west-1",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": "5242880",

    "flush.size": "10000",
    "rotate.interval.ms": "600000",
    "rotate.schedule.interval.ms": "3600000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",

    "schema.compatibility": "FULL",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",

    "s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",

    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Monitorování a správa chyb

# Metriche JMX importanti per Kafka Connect

# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura

# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali

# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml

# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo

# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .

# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart

# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart

# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume

Pozor: Replikační slot a místo na disku

Replikační slot PostgreSQL obsahuje všechny záznamy WAL tak dlouho jako konektor konzumoval je. Pokud konektor na hodiny nebo dny přejde do režimu offline, nahromadí se slot WAL neomezeně a může zaplnit disk PostgreSQL serveru, což vede k havárie databáze. Vždy si nastavte upozornění pg_replication_slots a zvážit nastavení max_slot_wal_keep_size (PostgreSQL 13+) jak ochranu.

Závěry

Kafka Connect s Debezium a standardním stackem pro Change Data Capture v roce 2026: zachycuje každou změnu z databáze s milisekundovou latencí, aniž by to ovlivnilo databázi výkon produkční databáze s odolností proti chybám a sémantikou přesně jednou. Kombinace s SMT umožňuje světelné transformace bez přidávání komponent oddělené od potrubí.

Kompletní série: Apache Kafka

  • článek 01 — Základy Apache Kafky
  • článek 02 — KRaft v Kafce 4.0
  • článek 03 — Pokročilý výrobce a spotřebitel
  • článek 04 — Sémantika přesně jednou u Kafky
  • článek 05 — Registr schémat: Avro a Protobuf
  • článek 06 — Kafka Streams: KTable a Windowing
  • Článek 07 (tento) — Kafka Connect: Source/Sink Connector a Debezium CDC
  • článek 08 — Kafka + Apache Flink: Pipeline Analytics v reálném čase