Rejestr schematów: Avro, Protobuf, zgodność i ewolucja schematu
Rejestr schematów jest kluczowym elementem zarządzania danymi w Kafce: scentralizowane zarządzanie schematami Avro i Protobuf, zasady kompatybilności strategie schematów wstecz/do przodu/pełny i ewolucyjny bez przestojów.
Problem zarządzania schematem
Wyobraź sobie system Kafka z 20 zespołami tworzącymi komunikaty na 50 różnych tematów.
Bez schematu rejestru każdy zespół samodzielnie decyduje o formacie wiadomości:
dzisiaj JSON z polem user_id, jutro zmienią się na userId.
Konsumenci czytający ten temat po cichu się załamują. Dane, które przychodzą
w jeziorze danych i niespójne. Debugowanie niespójności staje się dochodzeniem
kryminalistyki pomiędzy wersjami kodu z różnych zespołów.
Lo Rejestr schematów (Confluent, open source) rozwiązuje ten problem za pomocą a umowa formalna: producent rejestruje schemat, odbiorca zatwierdza przekaz otrzymany jest zgodny z kompatybilnym schematem. Jakakolwiek próba przesłania danych inkompatybilny kończy się natychmiastowym niepowodzeniem z wyraźnym błędem, zamiast powodować uszkodzenie dalszy system po cichu.
Czego się nauczysz
- Architektura rejestru schematu: jak współdziała z producentami i konsumentami
- Schemat Avro vs Protobuf vs JSON: kiedy używać każdego z nich
- Typy kompatybilności: wstecz, do przodu, pełna, przechodnia
- Ewolucja schematu: dodawaj/usuwaj pola bez przerywania zmian
- Konfiguracja Java z serializatorem Avro firmy Avro i Confluent
- Najlepsze praktyki: nazewnictwo podmiotów, wersjonowanie, konfiguracja globalna i dla poszczególnych tematów
Architektura: jak działa rejestr schematów
Rejestr schematu to odrębna usługa HTTP od Kafki, która udostępnia interfejs API REST.
Każdy schemat jest identyfikowany przez temat (zwykle nazwa
temat + przyrostek -value o -key) i wersję numeryczną.
Komunikacja odbywa się w następujący sposób:
# Flusso producer:
# 1. Producer crea un ProducerRecord con un oggetto Avro/Protobuf
# 2. L'Avro Serializer fa una chiamata HTTP GET al Registry:
# "Esiste lo schema X per il subject 'orders-value'?"
# 3. Se non esiste (o e cambiato), POST per registrarlo:
# Schema Registry valida la compatibilita con le versioni precedenti
# Se compatibile: OK, assegna schema ID intero (es: 42)
# 4. Il serializer scrive il messaggio come:
# [0x00] [schema_id: 4 bytes] [payload Avro serializzato]
# 5. I primi 5 byte identificano il formato "magic byte + schema ID"
# Flusso consumer:
# 1. Consumer riceve i byte del messaggio
# 2. L'Avro Deserializer legge i primi 5 byte: magic byte + schema ID
# 3. Chiama il Registry: GET /schemas/ids/42
# 4. Registry risponde con lo schema (cachato localmente dopo la prima chiamata)
# 5. Deserializza il payload usando lo schema writer (come e stato scritto)
# e lo schema reader (come il consumer si aspetta di leggerlo)
# 6. Avro fa la conversione automatica se gli schemi sono compatibili
# Struttura del payload serializzato:
# | 0x00 | schema_id (4 bytes BE) | avro binary payload |
# ^magic byte ^es: 0x0000002A = 42
# Avvia lo Schema Registry (via Docker)
docker run -d \
-p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="kafka-1:9092,kafka-2:9092" \
-e SCHEMA_REGISTRY_KAFKASTORE_TOPIC="_schemas" \
confluentinc/cp-schema-registry:7.6.0
Avro: Schemat i serializacja
Apache Avro Jest to najczęściej używany format serializacji w Kafce ze względu na jego zwartość i silne wsparcie w ekosystemie Confluent. Schemat Avro Jest zdefiniowany w JSON i zapisany w Rejestrze.
// Schema Avro per un ordine - orders-value v1
{
"type": "record",
"namespace": "dev.federicocalo.orders",
"name": "Order",
"doc": "Schema per gli ordini e-commerce",
"fields": [
{
"name": "order_id",
"type": "string",
"doc": "Identificatore univoco dell'ordine"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
},
{
"name": "currency",
"type": "string",
"default": "EUR"
},
{
"name": "created_at",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
},
"default": "PENDING"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unit_price", "type": "double"}
]
}
}
}
]
}
// Producer con Avro serializer e Schema Registry (Maven: io.confluent:kafka-avro-serializer)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Avro Value Serializer (registra automaticamente lo schema nel Registry)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
// URL dello Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");
// Opzionale: autenticazione al Registry (se con Confluent Cloud)
// props.put("basic.auth.credentials.source", "USER_INFO");
// props.put("basic.auth.user.info", "api-key:api-secret");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Carica lo schema da file .avsc
Schema schema = new Schema.Parser().parse(
new File("src/main/avro/Order.avsc")
);
// Crea un record Avro generico
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", UUID.randomUUID().toString());
order.put("user_id", "user-42");
order.put("amount", new BigDecimal("99.99"));
order.put("currency", "EUR");
order.put("created_at", Instant.now().toEpochMilli());
order.put("status", new GenericData.EnumSymbol(schema.getField("status").schema(), "CONFIRMED"));
List<GenericRecord> items = new ArrayList<>();
GenericRecord item = new GenericData.Record(schema.getField("items").schema().getElementType());
item.put("product_id", "prod-789");
item.put("quantity", 2);
item.put("unit_price", 49.99);
items.add(item);
order.put("items", items);
producer.send(new ProducerRecord<>("orders", order.get("order_id").toString(), order));
producer.flush();
Zasady zgodności
Reguła zgodności podmiotu określa, jakie zmiany w schemacie są dozwolone. To jest najważniejsza cecha Rejestru: błędne zrozumienie tego parametr może załamać konsumentów w produkcji.
# Tipi di compatibilita disponibili:
# BACKWARD (default): le nuove versioni dello schema possono leggere
# i dati scritti con la versione precedente.
# Operazioni consentite: aggiungere campi CON default, rimuovere campi senza default
# Use case: consumer viene aggiornato PRIMA del producer
# Esempio: aggiungi campo "shipping_address" con default ""
# FORWARD: le versioni precedenti dello schema possono leggere
# i dati scritti con la nuova versione.
# Operazioni consentite: aggiungere campi senza default, rimuovere campi CON default
# Use case: producer viene aggiornato PRIMA del consumer
# FULL: sia backward che forward. La piu restrittiva.
# Operazioni consentite: SOLO aggiungere/rimuovere campi CON default
# Use case: non sai quale viene aggiornato prima
# NONE: nessun controllo di compatibilita (pericoloso in produzione)
# BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE:
# Come le versioni non-transitive ma la compatibilita e verificata
# rispetto a TUTTE le versioni precedenti, non solo l'ultima
# Configurazione globale e per-subject via REST API:
# Configurazione globale (default per tutti gli subject nuovi):
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/json" \
-d '{"compatibility": "FULL"}'
# Configurazione per subject specifico (override del globale):
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
# Verifica compatibilita prima di registrare (dry-run):
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/json" \
-d '{"schema": "{\"type\": \"record\", \"name\": \"Order\", ...}"}'
# Response: {"is_compatible": true}
Schemat ewolucji: przykład praktyczny
// Schema v1 (attuale in produzione)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
]
}
// Schema v2 - AGGIUNTA backward-compatible:
// Aggiungi campo con default -> OK con BACKWARD e FULL
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"},
// NUOVO: campo con default (null per Avro union o valore stringa)
{"name": "discount_code", "type": ["null", "string"], "default": null}
]
}
// Schema v3 - RIMOZIONE backward-compatible:
// Rimuovi campo che aveva default -> OK con BACKWARD
// (Consumer con schema v2 riceveranno il default per discount_code quando leggono v3)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
// discount_code rimosso: OK perche aveva default null
]
}
// Schema v4 - CAMBIAMENTO di tipo NON COMPATIBILE:
// Cambiare "amount" da double a string ROMPE backward e forward
// -> Rifiutato da Schema Registry con BACKWARD/FORWARD/FULL
// -> Devi usare un nuovo subject (nuovo topic) oppure NONE (rischio!)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "string"}, // BREAKING: double -> string
{"name": "currency", "type": "string", "default": "EUR"}
]
}
Protobuf jako alternatywa dla Avro
Bufory protokołu (Protobuf) oraz wybór Google i wielu zespołów którzy wolą bardziej wyrazisty system typów i IDL z separacją danych. Obsługiwane przez rejestr schematów Confluent od wersji 5.5.
// orders.proto - Schema Protobuf per ordini
syntax = "proto3";
package dev.federicocalo.orders;
option java_package = "dev.federicocalo.orders";
option java_outer_classname = "OrderProto";
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
string currency = 4;
int64 created_at_ms = 5; // timestamp in milliseconds
OrderStatus status = 6;
repeated OrderItem items = 7;
// Campo aggiunto in v2: backward-compatible in Protobuf
// (campi non presenti vengono ignorati)
string shipping_address = 8;
}
enum OrderStatus {
PENDING = 0;
CONFIRMED = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double unit_price = 3;
}
// Producer con Protobuf serializer
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
Order order = Order.newBuilder()
.setOrderId(UUID.randomUUID().toString())
.setUserId("user-42")
.setAmount(99.99)
.setCurrency("EUR")
.setCreatedAtMs(Instant.now().toEpochMilli())
.setStatus(OrderStatus.CONFIRMED)
.addItems(OrderItem.newBuilder()
.setProductId("prod-789")
.setQuantity(2)
.setUnitPrice(49.99)
.build())
.build();
producer.send(new ProducerRecord<>("orders-proto", order.getOrderId(), order));
Schemat Avro vs Protobuf vs JSON: kiedy używać którego
Tabela porównawcza: Formaty serializacji
- Avro: Kompaktowy, wydajny dla schematu Hadoop/Spark ewolucja jest dobrze udokumentowana, świetna do inżynierii danych. Brak tyłu zgodność według typu: Zmiana typu wymaga strategii. Wybierz, jeśli używasz Platforma Confluent lub potok do jeziora danych (natywny Avro na Parquet).
- Protobuf: Doskonały do mikrousług gRPC, bardziej ekspresyjnych typów (oneof, mapa, natywny znacznik czasu), lepsza obsługa IDE. Numery pól gwarantują naturalna kompatybilność wsteczna (dodaj nowe pole = nowy numer). Wybierz jeśli masz już Protobuf w gRPC lub wolisz wpisać IDL.
- Schemat JSON: Interoperacyjny, czytelny dla człowieka, brak kompilacja. Większy ładunek. Wybierz dla zespołów z mniejszym doświadczeniem IaC lub dla interfejsów API, które muszą być czytelne bez użycia narzędzi.
Najlepsze praktyki w zakresie zarządzania schematami
# 1. Naming convention per i subject
# Default (TopicNameStrategy): {topic-name}-value, {topic-name}-key
# Record name strategy (piu flessibile): {namespace}.{record-name}
# Configura su producer:
# props.put("value.subject.name.strategy",
# "io.confluent.kafka.serializers.subject.RecordNameStrategy")
# 2. Schema versioning in CI/CD
# Aggiorna lo schema nel repository -> PR review -> test compatibilita
# pre-merge -> register nel Registry di staging -> deploy producer
# Script di verifica compatibilita in CI:
#!/bin/bash
SCHEMA_FILE="src/main/avro/Order.avsc"
SUBJECT="orders-value"
REGISTRY_URL="http://schema-registry-staging:8081"
# Verifica compatibilita prima del merge
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
-X POST "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
-H "Content-Type: application/json" \
-d "{\"schema\": $(cat $SCHEMA_FILE | jq -Rs .)}")
if [ "$RESPONSE" != "200" ]; then
echo "ERRORE: Schema non compatibile con la versione in produzione"
exit 1
fi
echo "Schema compatibile: OK"
# 3. Usa schema IDs fissi nei test (non schema content)
# Questo rende i test stabili anche se lo schema evolve
# 4. Documenta ogni campo con "doc" in Avro
# Il Registry mostra la documentazione nella UI
# 5. Backup del Registry:
# Lo Schema Registry persiste gli schemi su Kafka (_schemas topic)
# Il backup del topic = backup degli schemi
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic _schemas \
--from-beginning \
> schemas-backup-$(date +%Y%m%d).json
Wnioski
Rejestr schematów przekształca Kafkę z systemu przesyłania wiadomości w prawdziwy platforma danych z zarządzaniem: formalne umowy między producentami a konsumentami, ewolucja kontrolowane wzorce, jawne błędy zamiast cichej korupcji. W organizacji składającej się z wielu zespołów jest to jeden z najważniejszych elementów należy poprawnie skonfigurować przed wejściem do produkcji.
Cała seria: Apache Kafka
- Artykuł 01 — Podstawy Apache Kafki
- Artykuł 02 — KRaft w Kafce 4.0
- Artykuł 03 — Zaawansowany producent i konsument
- Artykuł 04 — Semantyka dokładnie raz w Kafce
- Artykuł 05 (ten) — Rejestr schematów: Avro, Protobuf i Schema Evolution
- Artykuł 06 — Strumienie Kafki: KTable i Windowing
- Artykuł 07 — Kafka Connect: Integracja Debezium CDC i DB
- Artykuł 08 — Kafka + Apache Flink: Analiza rurociągów w czasie rzeczywistym







