Kafka Connect: ソース/シンク コネクタ、Debezium CDC、DB の統合
Kafka Connect を使用すると、Kafka をデータベース、クラウド システム、SaaS に接続する必要がなくなります。 コードを書きます。 PostgreSQL および MySQL からの変更データ キャプチャのための Debezium に焦点を当てる SMT 変換パターンを使用します。
Kafka Connect: コード不要の統合
すべてのメッセージング システムは、外部システムとの間でデータを移動する必要があります。 データベース、オブジェクト ストレージ、Elasticsearch、Salesforce、レガシー システム。 Kafka Connect がなければ、 各統合にはカスタム コード (データベースから読み取るプロデューサー、コンシューマー) が必要です。 S3に書いている人。 Kafka Connect はコネクタ フレームワークを使用してこれを標準化します。 スケーラビリティ、フォールト トレランス、および 1 回だけを自動的に管理する構成が可能です。
2026 年には、利用可能なコネクタのカタログが 1000 コネクタ オープンソースと商用 (Confluent Hub) の間。最も一般的な使用例は次のとおりです。 Debezium 経由のリレーショナル データベースからの CDC (変更データ キャプチャ)、レプリケーション データレイクには S3/GCS、全文検索には Elasticsearch に同期します。
何を学ぶか
- Kafka Connect アーキテクチャ: ワーカー、コネクタ、タスク、オフセット管理
- 導入モード: スタンドアロンと分散
- Debezium PostgreSQL ソース コネクタ: セットアップ、WAL 構成、スナップショット
- Debezium MySQL ソース コネクタ: バイナリログ レプリケーション
- シンクコネクタ: S3、Elasticsearch、JDBC
- シングル メッセージ変換 (SMT): レコードのフィルター、名前変更、強化
- モニタリング: 本番環境での JMX メトリクスとエラー管理
アーキテクチャ: ワーカー、コネクタ、タスク
# Componenti di Kafka Connect:
# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test
# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno
# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella
# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata
# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
# image: confluentinc/cp-kafka-connect:7.6.0
# environment:
# CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
# CONNECT_REST_PORT: 8083
# CONNECT_GROUP_ID: "kafka-connect-cluster"
# CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
# CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
# CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# # Plugin path: directory con i JAR dei connector
# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# volumes:
# - ./connect-plugins:/usr/share/confluent-hub-components
Debezium: PostgreSQL からの変更データ キャプチャ
デベジウム これは、Kafka で最も使用される CDC コネクタです。する代わりに データベースのポーリング (パフォーマンスに影響し、削除はキャプチャされません)、 Debezium は直接読み取ります 先行書き込みログ (WAL) PostgreSQLの: INSERT、UPDATE、DELETE ごとに、変更の前後のデータを含む Kafka イベントが生成されます。
CDC 用の PostgreSQL 構成
-- PostgreSQL: abilitare logical replication per Debezium
-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10
-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
LOGIN
PASSWORD 'secure_password_here'
REPLICATION; -- Necessario per leggere il WAL
-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;
-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;
-- 6. Verifica che logical replication sia abilitata
SHOW wal_level; -- deve essere: logical
Debezium PostgreSQL コネクタの構成
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password_here",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"table.include.list": "public.orders,public.users,public.products",
"topic.prefix": "cdc",
"decimal.handling.mode": "double",
"binary.handling.mode": "base64",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000",
"slot.drop.on.stop": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.retry.timeout": "60000",
"errors.retry.delay.max.ms": "5000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
"errors.deadletterqueue.topic.replication.factor": "3"
}
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
# "name": "postgres-source-connector",
# "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
# "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }
# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
# "id": 123,
# "user_id": "user-42",
# "amount": 99.99,
# "__deleted": "false", # "true" per DELETE events (con delete.handling.mode=rewrite)
# "__op": "u", # c=insert, u=update, d=delete, r=read (snapshot)
# "__source_ts_ms": 1710928200000
# }
Debezium MySQL: バイナリログ設定
-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.id": "1",
"topic.prefix": "cdc-mysql",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
"schema.history.internal.kafka.topic": "__debezium-schema-history",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
単一メッセージ変換 (SMT)
Le 単一メッセージの変換 それらは適用可能な光変換です Kafka ストリームを必要とせずに、Connect パイプライン内のすべてのレコードにアクセスできます。それらは連鎖可能です (SMT チェーン) で、フィールド名の変更、タイムスタンプの追加などの単純な場合に非常に役立ちます。 レコードを値でフィルタリングするか、別のトピックにルーティングします。
{
"transforms": "renameField,addTimestamp,filterDeleted,routeByTable",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "user_id:userId,order_id:orderId",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",
"transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"transforms.filterDeleted.condition.name": "__deleted",
"transforms.filterDeleted.negate": "true",
"transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
"transforms.routeByTable.replacement": "db.changes.$1"
}
シンクコネクタ: Kafka から S3 へ
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "cdc.public.orders,cdc.public.users",
"s3.region": "eu-west-1",
"s3.bucket.name": "data-lake-raw",
"s3.part.size": "5242880",
"flush.size": "10000",
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
監視とエラー管理
# Metriche JMX importanti per Kafka Connect
# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura
# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali
# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml
# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo
# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .
# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart
# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart
# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume
注意: レプリケーションスロットとディスク容量
PostgreSQL レプリケーション スロットは、コネクタが接続されている限り、すべての WAL レコードを保持します。
彼はそれらを消費した。コネクタが数時間または数日間オフラインになると、スロットが蓄積されます
WAL は無期限に実行されるため、PostgreSQL サーバーのディスクがいっぱいになり、次のような問題が発生する可能性があります。
データベースのクラッシュ。常にアラートを設定する pg_replication_slots
セットアップを検討してください max_slot_wal_keep_size (PostgreSQL 13+) 方法
保護。
結論
Kafka Connect と Debezium および 2026 年の変更データ キャプチャの標準スタック: データベースに影響を与えることなく、ミリ秒の遅延でデータベースからすべての変更をキャプチャします。 耐障害性と 1 回限りのセマンティクスを備えた運用データベースのパフォーマンス。 SMTとの組み合わせにより、コンポーネントを追加せずに光の変換が可能 パイプラインに分離されます。







