Şema Kaydı: Avro, Protobuf, Uyumluluk ve Şema Gelişimi
Şema Kaydı, Kafka'da veri yönetiminin kritik bileşenidir: Avro ve Protobuf şemalarının merkezi yönetimi, uyumluluk kuralları kesinti olmadan geri/ileri/tam ve evrim şeması stratejileri.
Program Yönetişim Sorunu
20 ekibin 50 farklı konuda mesaj ürettiği bir Kafka sistemi hayal edin.
Kayıt şeması olmadan her ekip mesaj biçimine bağımsız olarak karar verir:
bugün alanlı JSON user_id, yarın şu şekilde değişecekler: userId.
Bu konuyu sessizce okuyan tüketiciler yıkılıyor. Gelen veriler
veri gölünde ve tutarsız. Bir tutarsızlıkta hata ayıklamak bir soruşturmaya dönüşür
farklı ekiplerin kod sürümleri arasındaki adli tıp.
Lo Şema Kaydı (Birleşik, açık kaynak) bunu bir çözümle çözer Resmi sözleşme: Üretici şemayı kaydeder, tüketici mesajı doğrular Alınan uyumlu bir şemaya uygundur. Herhangi bir veri gönderme girişimi uyumsuzluk, bozulmak yerine açık bir hatayla hemen başarısız oluyor aşağı akış sistemi sessizce.
Ne Öğreneceksiniz
- Schema Registry mimarisi: üreticiler ve tüketicilerle nasıl etkileşime giriyor?
- Avro vs Protobuf vs JSON Şeması: her biri ne zaman kullanılmalı
- Uyumluluk türleri: geriye, ileriye, tam, geçişli
- Şema gelişimi: değişikliği bozmadan alan ekleme/kaldırma
- Avro ve Confluent'in Avro serileştiricisi ile Java kurulumu
- En iyi uygulamalar: konu adlandırma, sürüm oluşturma, genel ve konuya özel yapılandırma
Mimari: Şema Kayıt Defteri Nasıl Çalışır?
Şema Kaydı, REST API'sini kullanıma sunan Kafka'dan ayrı bir HTTP hizmetidir.
Her şema bir kodla tanımlanır ders (normalde adı
konu + sonek -value o -key) ve sayısal bir versiyon.
İletişim şu şekilde gerçekleşir:
# Flusso producer:
# 1. Producer crea un ProducerRecord con un oggetto Avro/Protobuf
# 2. L'Avro Serializer fa una chiamata HTTP GET al Registry:
# "Esiste lo schema X per il subject 'orders-value'?"
# 3. Se non esiste (o e cambiato), POST per registrarlo:
# Schema Registry valida la compatibilita con le versioni precedenti
# Se compatibile: OK, assegna schema ID intero (es: 42)
# 4. Il serializer scrive il messaggio come:
# [0x00] [schema_id: 4 bytes] [payload Avro serializzato]
# 5. I primi 5 byte identificano il formato "magic byte + schema ID"
# Flusso consumer:
# 1. Consumer riceve i byte del messaggio
# 2. L'Avro Deserializer legge i primi 5 byte: magic byte + schema ID
# 3. Chiama il Registry: GET /schemas/ids/42
# 4. Registry risponde con lo schema (cachato localmente dopo la prima chiamata)
# 5. Deserializza il payload usando lo schema writer (come e stato scritto)
# e lo schema reader (come il consumer si aspetta di leggerlo)
# 6. Avro fa la conversione automatica se gli schemi sono compatibili
# Struttura del payload serializzato:
# | 0x00 | schema_id (4 bytes BE) | avro binary payload |
# ^magic byte ^es: 0x0000002A = 42
# Avvia lo Schema Registry (via Docker)
docker run -d \
-p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="kafka-1:9092,kafka-2:9092" \
-e SCHEMA_REGISTRY_KAFKASTORE_TOPIC="_schemas" \
confluentinc/cp-schema-registry:7.6.0
Avro: Şema ve Serileştirme
Apaçi Avro Kafka ile en çok kullanılan serileştirme formatıdır Confluent ekosistemindeki kompaktlığı ve güçlü desteği nedeniyle. Avro şeması JSON'da tanımlanır ve Kayıt Defterine kaydedilir.
// Schema Avro per un ordine - orders-value v1
{
"type": "record",
"namespace": "dev.federicocalo.orders",
"name": "Order",
"doc": "Schema per gli ordini e-commerce",
"fields": [
{
"name": "order_id",
"type": "string",
"doc": "Identificatore univoco dell'ordine"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
},
{
"name": "currency",
"type": "string",
"default": "EUR"
},
{
"name": "created_at",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
},
"default": "PENDING"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unit_price", "type": "double"}
]
}
}
}
]
}
// Producer con Avro serializer e Schema Registry (Maven: io.confluent:kafka-avro-serializer)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Avro Value Serializer (registra automaticamente lo schema nel Registry)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
// URL dello Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");
// Opzionale: autenticazione al Registry (se con Confluent Cloud)
// props.put("basic.auth.credentials.source", "USER_INFO");
// props.put("basic.auth.user.info", "api-key:api-secret");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Carica lo schema da file .avsc
Schema schema = new Schema.Parser().parse(
new File("src/main/avro/Order.avsc")
);
// Crea un record Avro generico
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", UUID.randomUUID().toString());
order.put("user_id", "user-42");
order.put("amount", new BigDecimal("99.99"));
order.put("currency", "EUR");
order.put("created_at", Instant.now().toEpochMilli());
order.put("status", new GenericData.EnumSymbol(schema.getField("status").schema(), "CONFIRMED"));
List<GenericRecord> items = new ArrayList<>();
GenericRecord item = new GenericData.Record(schema.getField("items").schema().getElementType());
item.put("product_id", "prod-789");
item.put("quantity", 2);
item.put("unit_price", 49.99);
items.add(item);
order.put("items", items);
producer.send(new ProducerRecord<>("orders", order.get("order_id").toString(), order));
producer.flush();
Uyumluluk Kuralları
Bir konunun uyumluluk kuralı şemada hangi değişikliklerin yapılacağını belirler izin verilir. Bu, Kayıt Defterinin en kritik özelliğidir: bunu yanlış anlamak parametresi üretimde tüketiciyi kırabilir.
# Tipi di compatibilita disponibili:
# BACKWARD (default): le nuove versioni dello schema possono leggere
# i dati scritti con la versione precedente.
# Operazioni consentite: aggiungere campi CON default, rimuovere campi senza default
# Use case: consumer viene aggiornato PRIMA del producer
# Esempio: aggiungi campo "shipping_address" con default ""
# FORWARD: le versioni precedenti dello schema possono leggere
# i dati scritti con la nuova versione.
# Operazioni consentite: aggiungere campi senza default, rimuovere campi CON default
# Use case: producer viene aggiornato PRIMA del consumer
# FULL: sia backward che forward. La piu restrittiva.
# Operazioni consentite: SOLO aggiungere/rimuovere campi CON default
# Use case: non sai quale viene aggiornato prima
# NONE: nessun controllo di compatibilita (pericoloso in produzione)
# BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE:
# Come le versioni non-transitive ma la compatibilita e verificata
# rispetto a TUTTE le versioni precedenti, non solo l'ultima
# Configurazione globale e per-subject via REST API:
# Configurazione globale (default per tutti gli subject nuovi):
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/json" \
-d '{"compatibility": "FULL"}'
# Configurazione per subject specifico (override del globale):
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
# Verifica compatibilita prima di registrare (dry-run):
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/json" \
-d '{"schema": "{\"type\": \"record\", \"name\": \"Order\", ...}"}'
# Response: {"is_compatible": true}
Evrim Şeması: Pratik Örnek
// Schema v1 (attuale in produzione)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
]
}
// Schema v2 - AGGIUNTA backward-compatible:
// Aggiungi campo con default -> OK con BACKWARD e FULL
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"},
// NUOVO: campo con default (null per Avro union o valore stringa)
{"name": "discount_code", "type": ["null", "string"], "default": null}
]
}
// Schema v3 - RIMOZIONE backward-compatible:
// Rimuovi campo che aveva default -> OK con BACKWARD
// (Consumer con schema v2 riceveranno il default per discount_code quando leggono v3)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
// discount_code rimosso: OK perche aveva default null
]
}
// Schema v4 - CAMBIAMENTO di tipo NON COMPATIBILE:
// Cambiare "amount" da double a string ROMPE backward e forward
// -> Rifiutato da Schema Registry con BACKWARD/FORWARD/FULL
// -> Devi usare un nuovo subject (nuovo topic) oppure NONE (rischio!)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "string"}, // BREAKING: double -> string
{"name": "currency", "type": "string", "default": "EUR"}
]
}
Avro'ya Alternatif Olarak Protobuf
Protokol Tamponları (Protobuf) Google'ın ve birçok ekibin tercihi daha anlamlı bir sistem ve veriyle ayrılmış bir IDL'yi tercih edenler. Confluent'in Şema Kaydı tarafından 5.5 sürümünden beri desteklenmektedir.
// orders.proto - Schema Protobuf per ordini
syntax = "proto3";
package dev.federicocalo.orders;
option java_package = "dev.federicocalo.orders";
option java_outer_classname = "OrderProto";
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
string currency = 4;
int64 created_at_ms = 5; // timestamp in milliseconds
OrderStatus status = 6;
repeated OrderItem items = 7;
// Campo aggiunto in v2: backward-compatible in Protobuf
// (campi non presenti vengono ignorati)
string shipping_address = 8;
}
enum OrderStatus {
PENDING = 0;
CONFIRMED = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double unit_price = 3;
}
// Producer con Protobuf serializer
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
Order order = Order.newBuilder()
.setOrderId(UUID.randomUUID().toString())
.setUserId("user-42")
.setAmount(99.99)
.setCurrency("EUR")
.setCreatedAtMs(Instant.now().toEpochMilli())
.setStatus(OrderStatus.CONFIRMED)
.addItems(OrderItem.newBuilder()
.setProductId("prod-789")
.setQuantity(2)
.setUnitPrice(49.99)
.build())
.build();
producer.send(new ProducerRecord<>("orders-proto", order.getOrderId(), order));
Avro vs Protobuf vs JSON Şeması: Hangisi Ne Zaman Kullanılmalı
Karşılaştırma Tablosu: Serileştirme Formatları
- Avro: Hadoop/Spark şeması için kompakt ve verimli evrim iyi belgelenmiştir, veri mühendisliği için mükemmeldir. Geriye dönük eksiklik türe göre uyumluluk: Bir türün değiştirilmesi strateji gerektirir. Kullanıyorsanız seçin Birleşik Platform veya veri gölüne giden boru hattı var (Parquet'te yerel Avro).
- Protobuf: GRPC mikro hizmetleri ve daha etkileyici türler için mükemmel (biri, harita, yerel zaman damgası), daha iyi IDE desteği. Alan numaraları garanti doğal geriye dönük uyumluluk (yeni bir alan ekle = yeni sayı). Seç gRPC'de zaten Protobuf'unuz varsa veya IDL yazmayı tercih ediyorsanız.
- JSON Şeması: Birlikte çalışabilir, insanlar tarafından okunabilir, hiçbiri derleme. Daha büyük yük. Daha az IaC deneyimi olan takımları seçin veya Araçlar olmadan okunabilmesi gereken API'ler için.
Program Yönetişimine İlişkin En İyi Uygulamalar
# 1. Naming convention per i subject
# Default (TopicNameStrategy): {topic-name}-value, {topic-name}-key
# Record name strategy (piu flessibile): {namespace}.{record-name}
# Configura su producer:
# props.put("value.subject.name.strategy",
# "io.confluent.kafka.serializers.subject.RecordNameStrategy")
# 2. Schema versioning in CI/CD
# Aggiorna lo schema nel repository -> PR review -> test compatibilita
# pre-merge -> register nel Registry di staging -> deploy producer
# Script di verifica compatibilita in CI:
#!/bin/bash
SCHEMA_FILE="src/main/avro/Order.avsc"
SUBJECT="orders-value"
REGISTRY_URL="http://schema-registry-staging:8081"
# Verifica compatibilita prima del merge
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
-X POST "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
-H "Content-Type: application/json" \
-d "{\"schema\": $(cat $SCHEMA_FILE | jq -Rs .)}")
if [ "$RESPONSE" != "200" ]; then
echo "ERRORE: Schema non compatibile con la versione in produzione"
exit 1
fi
echo "Schema compatibile: OK"
# 3. Usa schema IDs fissi nei test (non schema content)
# Questo rende i test stabili anche se lo schema evolve
# 4. Documenta ogni campo con "doc" in Avro
# Il Registry mostra la documentazione nella UI
# 5. Backup del Registry:
# Lo Schema Registry persiste gli schemi su Kafka (_schemas topic)
# Il backup del topic = backup degli schemi
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic _schemas \
--from-beginning \
> schemas-backup-$(date +%Y%m%d).json
Sonuçlar
Şema Kaydı, Kafka'yı bir mesajlaşma sisteminden gerçek bir sisteme dönüştürüyor yönetişimli veri platformu: üreticiler ve tüketiciler arasındaki resmi sözleşmeler, evrim kontrollü modeller, sessiz bozulma yerine açık hatalar. Birden fazla ekibin bulunduğu bir organizasyonda en kritik bileşenlerden biridir. üretime geçmeden önce doğru şekilde yapılandırılması gerekir.
Serinin Tamamı: Apache Kafka
- Madde 01 — Apache Kafka'nın Temelleri
- Madde 02 — Kafka 4.0'da KRaft
- Madde 03 — Gelişmiş Üretici ve Tüketici
- Madde 04 — Kafka'da Tam Bir Kez Semantik
- Madde 05 (bu) — Şema Kaydı: Avro, Protobuf ve Schema Evolution
- Madde 06 — Kafka Akışları: KTable ve Pencereleme
- Madde 07 — Kafka Connect: Debezium CDC ve Veritabanı Entegrasyonu
- Madde 08 — Kafka + Apache Flink: Gerçek Zamanlı İşlem Hattı Analitiği







