Kafka Connect: Source/Sink-connector, Debezium CDC en DB-integratie
Met Kafka Connect kunt u Kafka zonder problemen verbinden met databases, cloudsystemen en SaaS code schrijven. Focus op Debezium voor het vastleggen van wijzigingsgegevens van PostgreSQL en MySQL, met SMT-transformatiepatroon.
Kafka Connect: codevrije integratie
Elk berichtensysteem moet gegevens van en naar externe systemen verplaatsen: databases, objectopslag, Elasticsearch, Salesforce, oudere systemen. Zonder Kafka Connect, elke integratie vereist aangepaste code: een producent die uit de database leest, een consument die op S3 schrijft. Kafka Connect standaardiseert dit met een connectorframework configureerbaar dat automatisch de schaalbaarheid, fouttolerantie en precies één keer beheert.
In 2026 overtreft de catalogus met beschikbare connectoren 1000 connectoren tussen open source en commercieel (Confluent Hub). De meest voorkomende gebruiksscenario's zijn: CDC (Change Data Capture) van relationele databases via Debezium, replicatie naar S3/GCS voor het datameer, synchronisatie met Elasticsearch voor zoeken in volledige tekst.
Wat je gaat leren
- Kafka Connect-architectuur: werknemer, connector, taak, offsetbeheer
- Implementatiemodus: zelfstandig versus gedistribueerd
- Debezium PostgreSQL Source Connector: installatie, WAL-configuratie, momentopname
- Debezium MySQL-bronconnector: binlog-replicatie
- Spoelbakconnectoren: S3, Elasticsearch, JDBC
- Single Message Transforms (SMT): filteren, hernoemen, records verrijken
- Monitoring: JMX-statistieken en foutbeheer in de productie
Architectuur: werknemer, connector en taak
# Componenti di Kafka Connect:
# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test
# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno
# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella
# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata
# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
# image: confluentinc/cp-kafka-connect:7.6.0
# environment:
# CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
# CONNECT_REST_PORT: 8083
# CONNECT_GROUP_ID: "kafka-connect-cluster"
# CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
# CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
# CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# # Plugin path: directory con i JAR dei connector
# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# volumes:
# - ./connect-plugins:/usr/share/confluent-hub-components
Debezium: Wijzig gegevensregistratie van PostgreSQL
Debezium Het is de meest gebruikte CDC-connector bij Kafka. In plaats van te doen database polling (die de prestaties beïnvloedt en verwijderingen niet registreert), Debezium leest het rechtstreeks Vooruitschrijvend logboek (WAL) van PostgreSQL: elke INSERT, UPDATE en DELETE genereert een Kafka-gebeurtenis met de gegevens vóór en na de wijziging.
PostgreSQL-configuratie voor CDC
-- PostgreSQL: abilitare logical replication per Debezium
-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10
-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
LOGIN
PASSWORD 'secure_password_here'
REPLICATION; -- Necessario per leggere il WAL
-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;
-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;
-- 6. Verifica che logical replication sia abilitata
SHOW wal_level; -- deve essere: logical
Debezium PostgreSQL Connector-configuratie
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password_here",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"table.include.list": "public.orders,public.users,public.products",
"topic.prefix": "cdc",
"decimal.handling.mode": "double",
"binary.handling.mode": "base64",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000",
"slot.drop.on.stop": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.retry.timeout": "60000",
"errors.retry.delay.max.ms": "5000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
"errors.deadletterqueue.topic.replication.factor": "3"
}
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
# "name": "postgres-source-connector",
# "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
# "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }
# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
# "id": 123,
# "user_id": "user-42",
# "amount": 99.99,
# "__deleted": "false", # "true" per DELETE events (con delete.handling.mode=rewrite)
# "__op": "u", # c=insert, u=update, d=delete, r=read (snapshot)
# "__source_ts_ms": 1710928200000
# }
Debezium MySQL: Binlog-configuratie
-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.id": "1",
"topic.prefix": "cdc-mysql",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
"schema.history.internal.kafka.topic": "__debezium-schema-history",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
Transformaties van één bericht (SMT)
Le Transformaties van één bericht het zijn toepasbare lichttransformaties naar elke record in de Connect-pijplijn, zonder dat Kafka Streams nodig zijn. Ze zijn ketenbaar (SMT-keten) en erg handig voor eenvoudige gevallen zoals het hernoemen van velden, het toevoegen van tijdstempels, filter records op waarde of routering naar verschillende onderwerpen.
{
"transforms": "renameField,addTimestamp,filterDeleted,routeByTable",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "user_id:userId,order_id:orderId",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",
"transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"transforms.filterDeleted.condition.name": "__deleted",
"transforms.filterDeleted.negate": "true",
"transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
"transforms.routeByTable.replacement": "db.changes.$1"
}
Spoelbakaansluiting: van Kafka tot S3
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "cdc.public.orders,cdc.public.users",
"s3.region": "eu-west-1",
"s3.bucket.name": "data-lake-raw",
"s3.part.size": "5242880",
"flush.size": "10000",
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Monitoring en foutbeheer
# Metriche JMX importanti per Kafka Connect
# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura
# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali
# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml
# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo
# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .
# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart
# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart
# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume
Let op: replicatieslot en schijfruimte
Een PostgreSQL-replicatieslot bevat alle WAL-records zolang de connector aanwezig is
hij consumeerde ze. Als de connector uren of dagen offline gaat, stapelt het slot zich op
WAL voor onbepaalde tijd en kan de schijf van de PostgreSQL-server vullen, wat leidt tot
een databasecrash. Zet altijd een waarschuwing aan pg_replication_slots
en overwegen om op te zetten max_slot_wal_keep_size (PostgreSQL 13+) hoe
bescherming.
Conclusies
Kafka Connect met Debezium en de standaardstack voor Change Data Capture in 2026: legt elke wijziging uit de database vast met een latentie van milliseconden, zonder gevolgen voor de database productiedatabaseprestaties, met fouttolerantie en exact-één-semantiek. De combinatie met SMT maakt lichttransformaties mogelijk zonder toevoeging van componenten gescheiden naar de pijpleiding.
De complete serie: Apache Kafka
- Artikel 01 – Apache Kafka-grondbeginselen
- Artikel 02 — KRaft in Kafka 4.0
- Artikel 03 - Geavanceerde producent en consument
- Artikel 04 - Exactly-Once Semantiek in Kafka
- Artikel 05 — Schemaregister: Avro en Protobuf
- Artikel 06 — Kafka-streams: KTable en Windowing
- Artikel 07 (dit) — Kafka Connect: Source/Sink-connector en Debezium CDC
- Artikel 08 — Kafka + Apache Flink: realtime pijplijnanalyse







