Kafka Connect: 소스/싱크 커넥터, Debezium CDC 및 DB 통합
Kafka Connect를 사용하면 Kafka를 데이터베이스, 클라우드 시스템 및 SaaS에 연결하지 않고도 연결할 수 있습니다. 코드를 작성하세요. PostgreSQL 및 MySQL의 변경 데이터 캡처를 위해 Debezium에 중점을 두고 있습니다. SMT 변환 패턴을 사용합니다.
Kafka Connect: 코드 없는 통합
모든 메시징 시스템은 외부 시스템과 데이터를 주고 받아야 합니다. 데이터베이스, 객체 스토리지, Elasticsearch, Salesforce, 레거시 시스템. Kafka Connect가 없으면 각 통합에는 사용자 정의 코드가 필요합니다. 즉, 데이터베이스에서 읽는 생산자, 소비자 S3에 글을 쓰는 사람. Kafka Connect는 커넥터 프레임워크를 사용하여 이를 표준화합니다. 확장성, 내결함성 및 정확히 한 번만 자동으로 관리하도록 구성 가능합니다.
2026년에는 사용 가능한 커넥터 카탈로그가 다음을 초과합니다. 커넥터 1000개 오픈 소스와 상용 사이(Confluent Hub) 가장 일반적인 사용 사례는 다음과 같습니다. Debezium을 통한 관계형 데이터베이스의 CDC(Change Data Capture), 복제 데이터 레이크용 S3/GCS, 전체 텍스트 검색을 위해 Elasticsearch와 동기화합니다.
무엇을 배울 것인가
- Kafka Connect 아키텍처: 작업자, 커넥터, 작업, 오프셋 관리
- 배포 모드: 독립형 대 분산형
- Debezium PostgreSQL 소스 커넥터: 설정, WAL 구성, 스냅샷
- Debezium MySQL 소스 커넥터: binlog 복제
- 싱크 커넥터: S3, Elasticsearch, JDBC
- 단일 메시지 변환(SMT): 레코드 필터링, 이름 바꾸기, 강화
- 모니터링: 프로덕션에서의 JMX 메트릭 및 오류 관리
아키텍처: 작업자, 커넥터 및 작업
# Componenti di Kafka Connect:
# WORKER: processo JVM che esegue i connector
# Distributed mode: piu worker formano un cluster (consigliato per produzione)
# Standalone mode: singolo worker, utile per sviluppo/test
# CONNECTOR: unita logica di integrazione (es: "connetti PostgreSQL orders a Kafka")
# Ogni connector crea 1 o piu TASK
# Source connector: legge da un sistema esterno -> scrive su Kafka
# Sink connector: legge da Kafka -> scrive su un sistema esterno
# TASK: unita di parallelismo effettiva (thread)
# Ogni task e responsabile di un sottoinsieme dei dati
# Esempio: connector con 4 task per PostgreSQL con 4 tabelle = 1 task per tabella
# OFFSET: traccia la posizione nel source (WAL LSN per Postgres, binlog position per MySQL)
# Salvato su topic interno: __connect-offsets
# Garantisce che dopo un restart, la replica riprende da dove si era fermata
# Deploy del Kafka Connect cluster (Docker Compose):
# kafka-connect:
# image: confluentinc/cp-kafka-connect:7.6.0
# environment:
# CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"
# CONNECT_REST_PORT: 8083
# CONNECT_GROUP_ID: "kafka-connect-cluster"
# CONNECT_CONFIG_STORAGE_TOPIC: "__connect-configs"
# CONNECT_OFFSET_STORAGE_TOPIC: "__connect-offsets"
# CONNECT_STATUS_STORAGE_TOPIC: "__connect-status"
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
# # Plugin path: directory con i JAR dei connector
# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
# volumes:
# - ./connect-plugins:/usr/share/confluent-hub-components
Debezium: PostgreSQL에서 변경 데이터 캡처
데베지움 Kafka와 함께 가장 많이 사용되는 CDC 커넥터입니다. 하는 대신 데이터베이스 폴링(성능에 영향을 미치고 삭제를 캡처하지 않음) Debezium은 직접 읽습니다. 미리 쓰기 로그(WAL) PostgreSQL의: 각 INSERT, UPDATE 및 DELETE는 수정 전후의 데이터로 Kafka 이벤트를 생성합니다.
CDC용 PostgreSQL 구성
-- PostgreSQL: abilitare logical replication per Debezium
-- 1. postgresql.conf: cambia wal_level (richiede restart)
-- wal_level = logical
-- max_wal_senders = 10
-- max_replication_slots = 10
-- 2. Crea un utente dedicato per Debezium con i permessi minimi necessari
CREATE ROLE debezium_user WITH
LOGIN
PASSWORD 'secure_password_here'
REPLICATION; -- Necessario per leggere il WAL
-- 3. Assegna permessi di SELECT sulle tabelle da replicare
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
-- 4. Permesso per leggere i pg_publication (richiesto da Debezium 2.x)
GRANT USAGE ON SCHEMA public TO debezium_user;
-- 5. Crea una publication (Debezium 2.x usa pgoutput come plugin)
-- Debezium la crea automaticamente, ma puoi crearla manualmente:
CREATE PUBLICATION debezium_pub FOR TABLE orders, users, products;
-- 6. Verifica che logical replication sia abilitata
SHOW wal_level; -- deve essere: logical
Debezium PostgreSQL 커넥터 구성
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password_here",
"database.dbname": "ecommerce",
"database.server.name": "ecommerce",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub",
"slot.name": "debezium_slot",
"table.include.list": "public.orders,public.users,public.products",
"topic.prefix": "cdc",
"decimal.handling.mode": "double",
"binary.handling.mode": "base64",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000",
"slot.drop.on.stop": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.retry.timeout": "60000",
"errors.retry.delay.max.ms": "5000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.postgres-source-connector",
"errors.deadletterqueue.topic.replication.factor": "3"
}
}
# Registra il connector via REST API
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @postgres-source-connector.json
# Verifica lo stato del connector
curl http://kafka-connect:8083/connectors/postgres-source-connector/status
# {
# "name": "postgres-source-connector",
# "connector": {"state": "RUNNING", "worker_id": "kafka-connect-1:8083"},
# "tasks": [{"id": 0, "state": "RUNNING", "worker_id": "kafka-connect-1:8083"}]
# }
# Struttura del messaggio CDC in output (con Debezium Avro):
# Topic: cdc.public.orders
# Key: {"id": 123}
# Value (con ExtractNewRecordState unwrap):
# {
# "id": 123,
# "user_id": "user-42",
# "amount": 99.99,
# "__deleted": "false", # "true" per DELETE events (con delete.handling.mode=rewrite)
# "__op": "u", # c=insert, u=update, d=delete, r=read (snapshot)
# "__source_ts_ms": 1710928200000
# }
Debezium MySQL: Binlog 구성
-- MySQL: abilitare binlog in formato ROW
-- Aggiungi in /etc/mysql/mysql.conf.d/mysqld.cnf:
-- [mysqld]
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- Crea utente Debezium MySQL
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz_password",
"database.server.id": "1",
"topic.prefix": "cdc-mysql",
"database.include.list": "shop",
"table.include.list": "shop.orders,shop.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka-1:9092",
"schema.history.internal.kafka.topic": "__debezium-schema-history",
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
단일 메시지 변환(SMT)
Le 단일 메시지 변환 적용 가능한 빛 변환입니다. Kafka Streams가 필요 없이 Connect 파이프라인의 모든 레코드에 연결됩니다. 그들은 연결 가능하다 (SMT 체인) 필드 이름 바꾸기, 타임스탬프 추가와 같은 간단한 경우에 매우 유용합니다. 값별로 레코드를 필터링하거나 다른 주제로 라우팅합니다.
{
"transforms": "renameField,addTimestamp,filterDeleted,routeByTable",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "user_id:userId,order_id:orderId",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "kafka_ingestion_time",
"transforms.filterDeleted.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterDeleted.condition": "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"transforms.filterDeleted.condition.name": "__deleted",
"transforms.filterDeleted.negate": "true",
"transforms.routeByTable.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByTable.regex": "cdc\\.public\\.(.*)",
"transforms.routeByTable.replacement": "db.changes.$1"
}
싱크 커넥터: Kafka에서 S3까지
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "cdc.public.orders,cdc.public.users",
"s3.region": "eu-west-1",
"s3.bucket.name": "data-lake-raw",
"s3.part.size": "5242880",
"flush.size": "10000",
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "created_at",
"s3.credentials.provider.class": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
모니터링 및 오류 관리
# Metriche JMX importanti per Kafka Connect
# kafka.connect:type=connector-metrics,connector="{connector-name}"
# -> source-record-poll-total: record letti dalla sorgente
# -> source-record-poll-rate: rate di lettura (records/sec)
# -> source-record-write-total: record scritti su Kafka
# -> source-record-write-rate: rate di scrittura
# kafka.connect:type=connector-task-metrics
# -> offset-commit-success-percentage: % di commit offset riusciti
# -> batch-size-avg: dimensione media dei batch
# -> task-error-total: numero di errori totali
# Prometheus scraping via JMX Exporter
# jmx_prometheus_javaagent.jar configurato con kafka-connect-rules.yml
# Verifica ritardo di replica (lag) per Debezium
# Confronta il timestamp del WAL con il timestamp dell'evento su Kafka
# In caso di lag, controlla:
# - Slot replication lag su PostgreSQL:
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_slot';
# Se lag_bytes > 1GB: il connector non riesce a tenere il passo
# Lista connector e loro stato
curl http://kafka-connect:8083/connectors?expand=status | jq .
# Riavvia un connector (in caso di errore)
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/restart
# Riavvia solo un task specifico
curl -X POST http://kafka-connect:8083/connectors/postgres-source-connector/tasks/0/restart
# Pause/Resume
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/pause
curl -X PUT http://kafka-connect:8083/connectors/postgres-source-connector/resume
주의: 복제 슬롯 및 디스크 공간
PostgreSQL 복제 슬롯은 커넥터가 있는 한 모든 WAL 레코드를 보유합니다.
그는 그것을 소비했습니다. 커넥터가 몇 시간 또는 며칠 동안 오프라인 상태가 되면 슬롯이 누적됩니다.
WAL은 무기한으로 PostgreSQL 서버의 디스크를 채울 수 있으므로
데이터베이스 충돌. 항상 알림을 설정하세요. pg_replication_slots
그리고 설정을 고려해보세요 max_slot_wal_keep_size (PostgreSQL 13+) 어떻게
보호.
결론
Kafka Connect with Debezium 및 2026년 변경 데이터 캡처용 표준 스택: 데이터베이스에 영향을 주지 않고 밀리초의 대기 시간으로 데이터베이스의 모든 변경 사항을 캡처합니다. 내결함성과 정확히 한 번 의미 체계를 갖춘 프로덕션 데이터베이스 성능. SMT와의 결합으로 부품 추가 없이 가벼운 변형 가능 파이프라인으로 분리됩니다.







