Kafka Connect: Kodsuz Entegrasyon

Her mesajlaşma sisteminin verileri harici sistemlere ve sistemlerden taşıması gerekir: veritabanları, nesne depolama, Elasticsearch, Salesforce, eski sistemler. Kafka Connect olmadan, her entegrasyon özel kod gerektirir: veritabanından okuyan bir üretici, bir tüketici S3'te kim yazıyor? Kafka Connect bunu bir bağlayıcı çerçevesiyle standartlaştırıyor ölçeklenebilirliği, hata toleransını ve tam olarak bir kez otomatik olarak yöneten yapılandırılabilir.

2026'da mevcut konnektörlerin kataloğu şunu aşıyor: 1000 konnektör açık kaynak ve ticari (Confluent Hub) arasında. En yaygın kullanım durumları şunlardır: Debezium aracılığıyla ilişkisel veritabanlarından CDC (Veri Yakalamayı Değiştir), çoğaltma Veri gölü için S3/GCS, tam metin araması için Elasticsearch ile senkronize edin.

Ne Öğreneceksiniz

  • Kafka Connect mimarisi: çalışan, bağlayıcı, görev, ofset yönetimi
  • Dağıtım modu: bağımsız ve dağıtılmış
  • Debezium PostgreSQL Kaynak Bağlayıcısı: kurulum, WAL yapılandırması, anlık görüntü
  • Debezium MySQL Kaynak Bağlayıcısı: binlog çoğaltması
  • Lavabo Konektörleri: S3, Elasticsearch, JDBC
  • Tek Mesaj Dönüşümleri (SMT): kayıtları filtreleyin, yeniden adlandırın, zenginleştirin
  • İzleme: Üretimde JMX ölçümleri ve hata yönetimi

Mimari: Çalışan, Bağlayıcı ve Görev

# Componenti di Kafka Connect:

# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test

# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno

# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella

# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata

# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
#   image: confluentinc/cp-kafka-connect:7.6.0
#   environment:
#     CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
#     CONNECT_REST_PORT: 8083
#     CONNECT_GROUP_ID: "kafka-connect-cluster"
#     CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
#     CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
#     CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
#     CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
#     CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
#     # Plugin path: directory con i JAR dei connector
#     CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
#   volumes:
#     - ./connect-plugins:/usr/share/confluent-hub-components

Debezium: PostgreSQL'den Veri Yakalamayı Değiştirin

Debezyum Kafka ile en çok kullanılan CDC konektörüdür. Yapmak yerine veritabanı yoklaması (performansı etkiler ve silme işlemlerini yakalamaz), Debezium doğrudan okur Yazma Öncesi Günlüğü (WAL) PostgreSQL'in: her INSERT, UPDATE ve DELETE, değişiklikten önceki ve sonraki verileri içeren bir Kafka olayı oluşturur.

CDC için PostgreSQL yapılandırması

-- PostgreSQL: abilitare logical replication per Debezium

-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10

-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
  LOGIN
  PASSWORD 'secure_password_here'
  REPLICATION;  -- Necessario per leggere il WAL

-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;

-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;

-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;

-- 6. Verifica che logical replication sia abilitata
SHOW wal_level;  -- deve essere: logical

Debezium PostgreSQL Bağlayıcı yapılandırması

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-host",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password_here",
    "database.dbname": "ecommerce",
    "database.server.name": "ecommerce",

    "plugin.name": "pgoutput",
    "publication.name": "debezium_pub",
    "slot.name": "debezium_slot",

    "table.include.list": "public.orders,public.users,public.products",

    "topic.prefix": "cdc",

    "decimal.handling.mode": "double",
    "binary.handling.mode": "base64",

    "snapshot.mode": "initial",

    "heartbeat.interval.ms": "10000",
    "slot.drop.on.stop": "false",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "errors.retry.timeout": "60000",
    "errors.retry.delay.max.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
    "errors.deadletterqueue.topic.replication.factor": "3"
  }
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @postgres-source-connector.json

# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
#   "name": "postgres-source-connector",
#   "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
#   "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }

# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
#   "id": 123,
#   "user_id": "user-42",
#   "amount": 99.99,
#   "__deleted": "false",     # "true" per DELETE events (con delete.handling.mode=rewrite)
#   "__op": "u",              # c=insert, u=update, d=delete, r=read (snapshot)
#   "__source_ts_ms": 1710928200000
# }

Debezium MySQL: Binlog Yapılandırması

-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id         = 1
-- log_bin           = mysql-bin
-- binlog_format     = ROW
-- binlog_row_image  = FULL
-- expire_logs_days  = 10

-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-host",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz_password",
    "database.server.id": "1",
    "topic.prefix": "cdc-mysql",
    "database.include.list": "shop",
    "table.include.list": "shop.orders,shop.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
    "schema.history.internal.kafka.topic": "__debezium-schema-history",
    "snapshot.mode": "initial",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

Tek Mesaj Dönüşümleri (SMT)

Le Tek Mesaj Dönüşümleri bunlar uygulanabilir ışık dönüşümleridir Kafka Akışlarına ihtiyaç duymadan Connect işlem hattındaki her kayda. Zincirlenebilirler (SMT zinciri) ve alanları yeniden adlandırma, zaman damgaları ekleme gibi basit durumlar için çok kullanışlıdır. Kayıtları değere veya farklı konulara yönlendirmeye göre filtreleyin.

{
  "transforms": "renameField,addTimestamp,filterDeleted,routeByTable",

  "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.renameField.renames": "user_id:userId,order_id:orderId",

  "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",

  "transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
  "transforms.filterDeleted.condition.name": "__deleted",
  "transforms.filterDeleted.negate": "true",

  "transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
  "transforms.routeByTable.replacement": "db.changes.$1"
}

Lavabo Konektörü: Kafka'dan S3'e

{
  "name": "s3-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "cdc.public.orders,cdc.public.users",

    "s3.region": "eu-west-1",
    "s3.bucket.name": "data-lake-raw",
    "s3.part.size": "5242880",

    "flush.size": "10000",
    "rotate.interval.ms": "600000",
    "rotate.schedule.interval.ms": "3600000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",

    "schema.compatibility": "FULL",

    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "created_at",

    "s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",

    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

İzleme ve Hata Yönetimi

# Metriche JMX importanti per Kafka Connect

# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura

# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali

# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml

# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo

# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .

# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart

# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart

# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume

Dikkat: Çoğaltma Yuvası ve Disk Alanı

PostgreSQL çoğaltma yuvası, bağlayıcı sürece tüm WAL kayıtlarını tutar. onları tüketti. Konektör saatlerce veya günlerce çevrimdışı kalırsa yuvada birikme olur WAL süresiz olarak kullanılabilir ve PostgreSQL sunucusunun diskini doldurabilir. bir veritabanı çökmesi. Her zaman şu tarihte bir uyarı ayarla: pg_replication_slots ve kurmayı düşünün max_slot_wal_keep_size (PostgreSQL 13+) nasıl koruma.

Sonuçlar

Debezium ile Kafka Connect ve 2026'da Veri Yakalama Değişimi için standart yığın: veritabanındaki her değişikliği, veritabanını etkilemeden milisaniyelik bir gecikmeyle yakalar Hata toleransı ve tam olarak bir kez semantiği ile üretim veritabanı performansı. SMT ile kombinasyon, bileşen eklemeden hafif dönüşümlere olanak tanır boru hattına ayrılmıştır.

Serinin Tamamı: Apache Kafka

  • Madde 01 — Apache Kafka'nın Temelleri
  • Madde 02 — Kafka 4.0'da KRaft
  • Madde 03 — Gelişmiş Üretici ve Tüketici
  • Madde 04 — Kafka'da Tam Bir Kez Semantik
  • Madde 05 — Şema Kaydı: Avro ve Protobuf
  • Madde 06 — Kafka Akışları: KTable ve Pencereleme
  • Madde 07 (bu) — Kafka Connect: Kaynak/Hava Konektörü ve Debezium CDC
  • Madde 08 — Kafka + Apache Flink: Gerçek Zamanlı İşlem Hattı Analitiği