Kafka Connect: コード不要の統合

すべてのメッセージング システムは、外部システムとの間でデータを移動する必要があります。 データベース、オブジェクト ストレージ、Elasticsearch、Salesforce、レガシー システム。 Kafka Connect がなければ、 各統合にはカスタム コード (データベースから読み取るプロデューサー、コンシューマー) が必要です。 S3に書いている人。 Kafka Connect はコネクタ フレームワークを使用してこれを標準化します。 スケーラビリティ、フォールト トレランス、および 1 回だけを自動的に管理する構成が可能です。

2026 年には、利用可能なコネクタのカタログが 1000 コネクタ オープンソースと商用 (Confluent Hub) の間。最も一般的な使用例は次のとおりです。 Debezium 経由のリレーショナル データベースからの CDC (変更データ キャプチャ)、レプリケーション データレイクには S3/GCS、全文検索には Elasticsearch に同期します。

何を学ぶか

  • Kafka Connect アーキテクチャ: ワーカー、コネクタ、タスク、オフセット管理
  • 導入モード: スタンドアロンと分散
  • Debezium PostgreSQL ソース コネクタ: セットアップ、WAL 構成、スナップショット
  • Debezium MySQL ソース コネクタ: バイナリログ レプリケーション
  • シンクコネクタ: S3、Elasticsearch、JDBC
  • シングル メッセージ変換 (SMT): レコードのフィルター、名前変更、強化
  • モニタリング: 本番環境での JMX メトリクスとエラー管理

アーキテクチャ: ワーカー、コネクタ、タスク

# Componenti di Kafka Connect:

# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test

# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno

# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella

# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata

# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
#   image: confluentinc/cp-kafka-connect:7.6.0
#   environment:
#     CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
#     CONNECT_REST_PORT: 8083
#     CONNECT_GROUP_ID: "kafka-connect-cluster"
#     CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
#     CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
#     CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
#     CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
#     # Plugin path: directory con i JAR dei connector
#     CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
#   volumes:
#     - ./connect-plugins:/usr/share/confluent-hub-components

Debezium: PostgreSQL からの変更データ キャプチャ

デベジウム これは、Kafka で最も使用される CDC コネクタです。する代わりに データベースのポーリング (パフォーマンスに影響し、削除はキャプチャされません)、 Debezium は直接読み取ります 先行書き込みログ (WAL) PostgreSQLの: INSERT、UPDATE、DELETE ごとに、変更の前後のデータを含む Kafka イベントが生成されます。

CDC 用の PostgreSQL 構成

-- PostgreSQL: abilitare logical replication per Debezium

-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10

-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
  LOGIN
  PASSWORD 'secure_password_here'
  REPLICATION;  -- Necessario per leggere il WAL

-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;

-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;

-- 6. Verifica che logical replication sia abilitata
SHOW wal_level;  -- deve essere: logical

Debezium PostgreSQL コネクタの構成

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-host",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password_here",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce",

    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "slot.name": "debezium_slot",

    "table.include.list": "public.orders,public.users,public.products",

    "topic.prefix": "cdc",

    "decimal.handling.mode": "double",
    "binary.handling.mode": "base64",

    "snapshot.mode": "initial",

    "heartbeat.interval.ms": "10000",
    "slot.drop.on.stop": "false",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "errors.retry.timeout": "60000",
    "errors.retry.delay.max.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
    "errors.deadletterqueue.topic.replication.factor": "3"
  }
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-source-connector.json

# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
#   "name": "postgres-source-connector",
#   "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
#   "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }

# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
#   "id": 123,
#   "user_id": "user-42",
#   "amount": 99.99,
#   "__deleted": "false",     # "true" per DELETE events (con delete.handling.mode=rewrite)
#   "__op": "u",              # c=insert, u=update, d=delete, r=read (snapshot)
#   "__source_ts_ms": 1710928200000
# }

Debezium MySQL: バイナリログ設定

-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id         = 1
-- log_bin           = mysql-bin
-- binlog_format     = ROW
-- binlog_row_image  = FULL
-- expire_logs_days  = 10

-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "1",
    "topic.prefix": "cdc-mysql",
    "database.include.list": "shop",
    "table.include.list": "shop.orders,shop.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
    "schema.history.internal.kafka.topic": "__debezium-schema-history",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

単一メッセージ変換 (SMT)

Le 単一メッセージの変換 それらは適用可能な光変換です Kafka ストリームを必要とせずに、Connect パイプライン内のすべてのレコードにアクセスできます。それらは連鎖可能です (SMT チェーン) で、フィールド名の変更、タイムスタンプの追加などの単純な場合に非常に役立ちます。 レコードを値でフィルタリングするか、別のトピックにルーティングします。

{
  "transforms": "renameField,addTimestamp,filterDeleted,routeByTable",

  "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.renameField.renames": "user_id:userId,order_id:orderId",

  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",

  "transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "transforms.filterDeleted.condition.name": "__deleted",
  "transforms.filterDeleted.negate": "true",

  "transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
  "transforms.routeByTable.replacement": "db.changes.$1"
}

シンクコネクタ: Kafka から S3 へ

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "cdc.public.orders,cdc.public.users",

    "s3.region": "eu-west-1",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": "5242880",

    "flush.size": "10000",
    "rotate.interval.ms": "600000",
    "rotate.schedule.interval.ms": "3600000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",

    "schema.compatibility": "FULL",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",

    "s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",

    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

監視とエラー管理

# Metriche JMX importanti per Kafka Connect

# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura

# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali

# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml

# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo

# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .

# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart

# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart

# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume

注意: レプリケーションスロットとディスク容量

PostgreSQL レプリケーション スロットは、コネクタが接続されている限り、すべての WAL レコードを保持します。 彼はそれらを消費した。コネクタが数時間または数日間オフラインになると、スロットが蓄積されます WAL は無期限に実行されるため、PostgreSQL サーバーのディスクがいっぱいになり、次のような問題が発生する可能性があります。 データベースのクラッシュ。常にアラートを設定する pg_replication_slots セットアップを検討してください max_slot_wal_keep_size (PostgreSQL 13+) 方法 保護。

結論

Kafka Connect と Debezium および 2026 年の変更データ キャプチャの標準スタック: データベースに影響を与えることなく、ミリ秒の遅延でデータベースからすべての変更をキャプチャします。 耐障害性と 1 回限りのセマンティクスを備えた運用データベースのパフォーマンス。 SMTとの組み合わせにより、コンポーネントを追加せずに光の変換が可能 パイプラインに分離されます。

完全なシリーズ: Apache Kafka

  • 第01条 — Apache Kafka の基礎
  • 第02条 — Kafka 4.0 の KRaft
  • 第03条 — 先進的なプロデューサーとコンシューマー
  • 第04条 — Kafka の 1 回限りのセマンティクス
  • 第05条 — スキーマ レジストリ: Avro および Protobuf
  • 第06条 — Kafka ストリーム: KTable とウィンドウ処理
  • 第07条(本) — Kafka Connect: ソース/シンク コネクタと Debezium CDC
  • 第08条 — Kafka + Apache Flink: リアルタイムのパイプライン分析