Kafka Connect: integracja bez kodu

Każdy system przesyłania wiadomości musi przenosić dane do i z systemów zewnętrznych: bazy danych, pamięć obiektowa, Elasticsearch, Salesforce, starsze systemy. Bez Kafki Connect, każda integracja wymaga własnego kodu: producenta czytającego z bazy, konsumenta kto pisze na S3. Kafka Connect standaryzuje to za pomocą struktury łączników konfigurowalne, które automatycznie zarządzają skalowalnością, odpornością na awarie i dokładnie raz.

W roku 2026 katalog dostępnych złącz przekracza 1000 złączy pomiędzy oprogramowaniem open source a komercją (Confluent Hub). Najczęstsze przypadki użycia to: CDC (Change Data Capture) z relacyjnych baz danych poprzez Debezium, replikacja do S3/GCS dla jeziora danych, zsynchronizuj z Elasticsearch w celu wyszukiwania pełnotekstowego.

Czego się nauczysz

  • Architektura Kafka Connect: zarządzanie procesem roboczym, łącznikiem, zadaniem, offsetem
  • Tryb wdrożenia: samodzielny lub rozproszony
  • Debezium PostgreSQL Source Connector: konfiguracja, konfiguracja WAL, migawka
  • Łącznik źródłowy Debezium MySQL: replikacja binlogu
  • Złącza zlewowe: S3, Elasticsearch, JDBC
  • Transformacje pojedynczej wiadomości (SMT): filtruj, zmieniaj nazwę, wzbogacaj rekordy
  • Monitoring: metryki JMX i zarządzanie błędami w produkcji

Architektura: proces roboczy, łącznik i zadanie

# Componenti di Kafka Connect:

# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test

# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno

# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella

# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata

# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
#   image: confluentinc/cp-kafka-connect:7.6.0
#   environment:
#     CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
#     CONNECT_REST_PORT: 8083
#     CONNECT_GROUP_ID: "kafka-connect-cluster"
#     CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
#     CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
#     CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
#     CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
#     # Plugin path: directory con i JAR dei connector
#     CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
#   volumes:
#     - ./connect-plugins:/usr/share/confluent-hub-components

Debezium: Zmień przechwytywanie danych z PostgreSQL

Debezium Jest to najczęściej używane złącze CDC z Kafką. Zamiast robić odpytywanie bazy danych (co wpływa na wydajność i nie przechwytuje usunięć), Debezium czyta bezpośrednio Dziennik zapisu z wyprzedzeniem (WAL) PostgreSQL'a: każde INSERT, UPDATE i DELETE generuje zdarzenie Kafka z danymi przed i po modyfikacji.

Konfiguracja PostgreSQL dla CDC

-- PostgreSQL: abilitare logical replication per Debezium

-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10

-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
  LOGIN
  PASSWORD 'secure_password_here'
  REPLICATION;  -- Necessario per leggere il WAL

-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;

-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;

-- 6. Verifica che logical replication sia abilitata
SHOW wal_level;  -- deve essere: logical

Konfiguracja łącznika Debezium PostgreSQL

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-host",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password_here",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce",

    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "slot.name": "debezium_slot",

    "table.include.list": "public.orders,public.users,public.products",

    "topic.prefix": "cdc",

    "decimal.handling.mode": "double",
    "binary.handling.mode": "base64",

    "snapshot.mode": "initial",

    "heartbeat.interval.ms": "10000",
    "slot.drop.on.stop": "false",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "errors.retry.timeout": "60000",
    "errors.retry.delay.max.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
    "errors.deadletterqueue.topic.replication.factor": "3"
  }
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-source-connector.json

# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
#   "name": "postgres-source-connector",
#   "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
#   "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }

# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
#   "id": 123,
#   "user_id": "user-42",
#   "amount": 99.99,
#   "__deleted": "false",     # "true" per DELETE events (con delete.handling.mode=rewrite)
#   "__op": "u",              # c=insert, u=update, d=delete, r=read (snapshot)
#   "__source_ts_ms": 1710928200000
# }

Debezium MySQL: Konfiguracja Binlog

-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id         = 1
-- log_bin           = mysql-bin
-- binlog_format     = ROW
-- binlog_row_image  = FULL
-- expire_logs_days  = 10

-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "1",
    "topic.prefix": "cdc-mysql",
    "database.include.list": "shop",
    "table.include.list": "shop.orders,shop.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
    "schema.history.internal.kafka.topic": "__debezium-schema-history",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

Transformacje pojedynczej wiadomości (SMT)

Le Transformacja pojedynczej wiadomości są to stosowane transformacje świetlne do każdego rekordu w potoku Connect, bez potrzeby korzystania ze strumieni Kafka. Można je łączyć łańcuchowo (łańcuch SMT) i bardzo przydatny w prostych przypadkach, takich jak zmiana nazw pól, dodawanie znaczników czasu, filtruj rekordy według wartości lub kierowania do różnych tematów.

{
  "transforms": "renameField,addTimestamp,filterDeleted,routeByTable",

  "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.renameField.renames": "user_id:userId,order_id:orderId",

  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",

  "transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "transforms.filterDeleted.condition.name": "__deleted",
  "transforms.filterDeleted.negate": "true",

  "transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
  "transforms.routeByTable.replacement": "db.changes.$1"
}

Złącze zlewu: od Kafki do S3

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "cdc.public.orders,cdc.public.users",

    "s3.region": "eu-west-1",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": "5242880",

    "flush.size": "10000",
    "rotate.interval.ms": "600000",
    "rotate.schedule.interval.ms": "3600000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",

    "schema.compatibility": "FULL",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",

    "s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",

    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Monitorowanie i zarządzanie błędami

# Metriche JMX importanti per Kafka Connect

# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura

# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali

# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml

# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo

# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .

# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart

# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart

# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume

Uwaga: miejsce na replikację i miejsce na dysku

Szczelina replikacji PostgreSQL przechowuje wszystkie rekordy WAL tak długo, jak łącznik skonsumował je. Jeśli łącznik przejdzie w tryb offline na kilka godzin lub dni, miejsce będzie kumulowane WAL w nieskończoność i może zapełnić dysk serwera PostgreSQL, co prowadzi do awaria bazy danych. Zawsze ustawiaj alert na pg_replication_slots i rozważ konfigurację max_slot_wal_keep_size (PostgreSQL 13+) jak ochrona.

Wnioski

Kafka Connect z Debezium i standardowym stosem do przechwytywania danych zmian w 2026 r.: przechwytuje każdą zmianę z bazy danych z milisekundowym opóźnieniem, bez wpływu na bazę danych wydajność produkcyjnej bazy danych, odporność na błędy i semantykę „dokładnie raz”. Połączenie z SMT umożliwia transformację światła bez dodawania komponentów oddzielony od rurociągu.

Cała seria: Apache Kafka

  • Artykuł 01 — Podstawy Apache Kafki
  • Artykuł 02 — KRaft w Kafce 4.0
  • Artykuł 03 — Zaawansowany producent i konsument
  • Artykuł 04 — Semantyka dokładnie raz w Kafce
  • Artykuł 05 — Rejestr schematów: Avro i Protobuf
  • Artykuł 06 — Strumienie Kafki: KTable i Windowing
  • Artykuł 07 (ten) — Kafka Connect: złącze Source/Sink i Debezium CDC
  • Artykuł 08 — Kafka + Apache Flink: Analiza rurociągów w czasie rzeczywistym