Kafka Connect: złącze Source/Sink, integracja Debezium CDC i DB
Kafka Connect umożliwia połączenie Kafki z bazami danych, systemami chmurowymi i SaaS bez napisz kod. Skoncentruj się na Debezium do przechwytywania danych zmian z PostgreSQL i MySQL, ze wzorem transformacji SMT.
Kafka Connect: integracja bez kodu
Każdy system przesyłania wiadomości musi przenosić dane do i z systemów zewnętrznych: bazy danych, pamięć obiektowa, Elasticsearch, Salesforce, starsze systemy. Bez Kafki Connect, każda integracja wymaga własnego kodu: producenta czytającego z bazy, konsumenta kto pisze na S3. Kafka Connect standaryzuje to za pomocą struktury łączników konfigurowalne, które automatycznie zarządzają skalowalnością, odpornością na awarie i dokładnie raz.
W roku 2026 katalog dostępnych złącz przekracza 1000 złączy pomiędzy oprogramowaniem open source a komercją (Confluent Hub). Najczęstsze przypadki użycia to: CDC (Change Data Capture) z relacyjnych baz danych poprzez Debezium, replikacja do S3/GCS dla jeziora danych, zsynchronizuj z Elasticsearch w celu wyszukiwania pełnotekstowego.
Czego się nauczysz
- Architektura Kafka Connect: zarządzanie procesem roboczym, łącznikiem, zadaniem, offsetem
- Tryb wdrożenia: samodzielny lub rozproszony
- Debezium PostgreSQL Source Connector: konfiguracja, konfiguracja WAL, migawka
- Łącznik źródłowy Debezium MySQL: replikacja binlogu
- Złącza zlewowe: S3, Elasticsearch, JDBC
- Transformacje pojedynczej wiadomości (SMT): filtruj, zmieniaj nazwę, wzbogacaj rekordy
- Monitoring: metryki JMX i zarządzanie błędami w produkcji
Architektura: proces roboczy, łącznik i zadanie
# Componenti di Kafka Connect:
# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test
# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno
# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella
# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata
# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
# image: confluentinc/cp-kafka-connect:7.6.0
# environment:
# CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
# CONNECT_REST_PORT: 8083
# CONNECT_GROUP_ID: "kafka-connect-cluster"
# CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
# CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
# CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# # Plugin path: directory con i JAR dei connector
# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# volumes:
# - ./connect-plugins:/usr/share/confluent-hub-components
Debezium: Zmień przechwytywanie danych z PostgreSQL
Debezium Jest to najczęściej używane złącze CDC z Kafką. Zamiast robić odpytywanie bazy danych (co wpływa na wydajność i nie przechwytuje usunięć), Debezium czyta bezpośrednio Dziennik zapisu z wyprzedzeniem (WAL) PostgreSQL'a: każde INSERT, UPDATE i DELETE generuje zdarzenie Kafka z danymi przed i po modyfikacji.
Konfiguracja PostgreSQL dla CDC
-- PostgreSQL: abilitare logical replication per Debezium
-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10
-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
LOGIN
PASSWORD 'secure_password_here'
REPLICATION; -- Necessario per leggere il WAL
-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;
-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;
-- 6. Verifica che logical replication sia abilitata
SHOW wal_level; -- deve essere: logical
Konfiguracja łącznika Debezium PostgreSQL
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password_here",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"table.include.list": "public.orders,public.users,public.products",
"topic.prefix": "cdc",
"decimal.handling.mode": "double",
"binary.handling.mode": "base64",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000",
"slot.drop.on.stop": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.retry.timeout": "60000",
"errors.retry.delay.max.ms": "5000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
"errors.deadletterqueue.topic.replication.factor": "3"
}
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
# "name": "postgres-source-connector",
# "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
# "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }
# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
# "id": 123,
# "user_id": "user-42",
# "amount": 99.99,
# "__deleted": "false", # "true" per DELETE events (con delete.handling.mode=rewrite)
# "__op": "u", # c=insert, u=update, d=delete, r=read (snapshot)
# "__source_ts_ms": 1710928200000
# }
Debezium MySQL: Konfiguracja Binlog
-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.id": "1",
"topic.prefix": "cdc-mysql",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
"schema.history.internal.kafka.topic": "__debezium-schema-history",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
Transformacje pojedynczej wiadomości (SMT)
Le Transformacja pojedynczej wiadomości są to stosowane transformacje świetlne do każdego rekordu w potoku Connect, bez potrzeby korzystania ze strumieni Kafka. Można je łączyć łańcuchowo (łańcuch SMT) i bardzo przydatny w prostych przypadkach, takich jak zmiana nazw pól, dodawanie znaczników czasu, filtruj rekordy według wartości lub kierowania do różnych tematów.
{
"transforms": "renameField,addTimestamp,filterDeleted,routeByTable",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "user_id:userId,order_id:orderId",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",
"transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"transforms.filterDeleted.condition.name": "__deleted",
"transforms.filterDeleted.negate": "true",
"transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
"transforms.routeByTable.replacement": "db.changes.$1"
}
Złącze zlewu: od Kafki do S3
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "cdc.public.orders,cdc.public.users",
"s3.region": "eu-west-1",
"s3.bucket.name": "data-lake-raw",
"s3.part.size": "5242880",
"flush.size": "10000",
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Monitorowanie i zarządzanie błędami
# Metriche JMX importanti per Kafka Connect
# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura
# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali
# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml
# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo
# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .
# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart
# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart
# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume
Uwaga: miejsce na replikację i miejsce na dysku
Szczelina replikacji PostgreSQL przechowuje wszystkie rekordy WAL tak długo, jak łącznik
skonsumował je. Jeśli łącznik przejdzie w tryb offline na kilka godzin lub dni, miejsce będzie kumulowane
WAL w nieskończoność i może zapełnić dysk serwera PostgreSQL, co prowadzi do
awaria bazy danych. Zawsze ustawiaj alert na pg_replication_slots
i rozważ konfigurację max_slot_wal_keep_size (PostgreSQL 13+) jak
ochrona.
Wnioski
Kafka Connect z Debezium i standardowym stosem do przechwytywania danych zmian w 2026 r.: przechwytuje każdą zmianę z bazy danych z milisekundowym opóźnieniem, bez wpływu na bazę danych wydajność produkcyjnej bazy danych, odporność na błędy i semantykę „dokładnie raz”. Połączenie z SMT umożliwia transformację światła bez dodawania komponentów oddzielony od rurociągu.
Cała seria: Apache Kafka
- Artykuł 01 — Podstawy Apache Kafki
- Artykuł 02 — KRaft w Kafce 4.0
- Artykuł 03 — Zaawansowany producent i konsument
- Artykuł 04 — Semantyka dokładnie raz w Kafce
- Artykuł 05 — Rejestr schematów: Avro i Protobuf
- Artykuł 06 — Strumienie Kafki: KTable i Windowing
- Artykuł 07 (ten) — Kafka Connect: złącze Source/Sink i Debezium CDC
- Artykuł 08 — Kafka + Apache Flink: Analiza rurociągów w czasie rzeczywistym







