Schema Registry: Avro, Protobuf, Compatibility and Schema Evolution
Schema Registry este componenta critică pentru guvernarea datelor în Kafka: management centralizat al schemelor Avro și Protobuf, reguli de compatibilitate înapoi/înainte/complet și strategii de schemă de evoluție fără timpi de nefuncționare.
Problema guvernării schemei
Imaginați-vă un sistem Kafka cu 20 de echipe care produc mesaje pe 50 de subiecte diferite.
Fără o schemă de registru, fiecare echipă decide independent formatul mesajului:
astăzi JSON cu câmp user_id, mâine se schimbă în userId.
Consumatorii care citesc acel subiect în tăcere se descompun. Datele care ajung
pe lacul de date și inconsecvente. Depanarea unei inconsecvențe devine o investigație
criminalistică între versiunile de cod de la diferite echipe.
Lo Registrul Schemei (Confluent, open-source) rezolvă acest lucru cu a contract formal: producătorul înregistrează schema, consumatorul validează mesajul primite se conformează unei scheme compatibile. Orice încercare de a trimite date incompatibil eșuează imediat cu o eroare explicită, în loc să corupă sistemul din aval în tăcere.
Ce vei învăța
- Arhitectura Schema Registry: cum interacționează cu producătorii și consumatorii
- Avro vs Protobuf vs Schema JSON: când să folosiți fiecare
- Tipuri de compatibilitate: înapoi, înainte, complet, tranzitiv
- Evoluția schemei: adăugați/eliminați câmpuri fără a întrerupe modificarea
- Configurare Java cu serializatorul Avro de la Avro și Confluent
- Cele mai bune practici: denumirea subiectului, versiunea, configurația globală vs pe subiect
Arhitectură: Cum funcționează Schema Registry
Schema Registry este un serviciu HTTP separat de Kafka, care expune un API REST.
Fiecare schemă este identificată prin a subiect (de obicei, numele lui
subiect + sufix -value o -key) și o versiune numerică.
Comunicarea se întâmplă astfel:
# Flusso producer:
# 1. Producer crea un ProducerRecord con un oggetto Avro/Protobuf
# 2. L'Avro Serializer fa una chiamata HTTP GET al Registry:
# "Esiste lo schema X per il subject 'orders-value'?"
# 3. Se non esiste (o e cambiato), POST per registrarlo:
# Schema Registry valida la compatibilita con le versioni precedenti
# Se compatibile: OK, assegna schema ID intero (es: 42)
# 4. Il serializer scrive il messaggio come:
# [0x00] [schema_id: 4 bytes] [payload Avro serializzato]
# 5. I primi 5 byte identificano il formato "magic byte + schema ID"
# Flusso consumer:
# 1. Consumer riceve i byte del messaggio
# 2. L'Avro Deserializer legge i primi 5 byte: magic byte + schema ID
# 3. Chiama il Registry: GET /schemas/ids/42
# 4. Registry risponde con lo schema (cachato localmente dopo la prima chiamata)
# 5. Deserializza il payload usando lo schema writer (come e stato scritto)
# e lo schema reader (come il consumer si aspetta di leggerlo)
# 6. Avro fa la conversione automatica se gli schemi sono compatibili
# Struttura del payload serializzato:
# | 0x00 | schema_id (4 bytes BE) | avro binary payload |
# ^magic byte ^es: 0x0000002A = 42
# Avvia lo Schema Registry (via Docker)
docker run -d \
-p 8081:8081 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="kafka-1:9092,kafka-2:9092" \
-e SCHEMA_REGISTRY_KAFKASTORE_TOPIC="_schemas" \
confluentinc/cp-schema-registry:7.6.0
Avro: Schemă și Serializare
Apache Avro Este cel mai folosit format de serializare cu Kafka pentru compactitatea și sprijinul puternic în ecosistemul Confluent. Schema Avro Este definit în JSON și este salvat în Registry.
// Schema Avro per un ordine - orders-value v1
{
"type": "record",
"namespace": "dev.federicocalo.orders",
"name": "Order",
"doc": "Schema per gli ordini e-commerce",
"fields": [
{
"name": "order_id",
"type": "string",
"doc": "Identificatore univoco dell'ordine"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
},
{
"name": "currency",
"type": "string",
"default": "EUR"
},
{
"name": "created_at",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
},
"default": "PENDING"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unit_price", "type": "double"}
]
}
}
}
]
}
// Producer con Avro serializer e Schema Registry (Maven: io.confluent:kafka-avro-serializer)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Avro Value Serializer (registra automaticamente lo schema nel Registry)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
// URL dello Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");
// Opzionale: autenticazione al Registry (se con Confluent Cloud)
// props.put("basic.auth.credentials.source", "USER_INFO");
// props.put("basic.auth.user.info", "api-key:api-secret");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// Carica lo schema da file .avsc
Schema schema = new Schema.Parser().parse(
new File("src/main/avro/Order.avsc")
);
// Crea un record Avro generico
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", UUID.randomUUID().toString());
order.put("user_id", "user-42");
order.put("amount", new BigDecimal("99.99"));
order.put("currency", "EUR");
order.put("created_at", Instant.now().toEpochMilli());
order.put("status", new GenericData.EnumSymbol(schema.getField("status").schema(), "CONFIRMED"));
List<GenericRecord> items = new ArrayList<>();
GenericRecord item = new GenericData.Record(schema.getField("items").schema().getElementType());
item.put("product_id", "prod-789");
item.put("quantity", 2);
item.put("unit_price", 49.99);
items.add(item);
order.put("items", items);
producer.send(new ProducerRecord<>("orders", order.get("order_id").toString(), order));
producer.flush();
Reguli de compatibilitate
Regula de compatibilitate a unui subiect determină ce modificări ale schemei sunt permise. Aceasta este cea mai critică caracteristică a Registrului: greșit parametrul poate rupe consumatorii în producție.
# Tipi di compatibilita disponibili:
# BACKWARD (default): le nuove versioni dello schema possono leggere
# i dati scritti con la versione precedente.
# Operazioni consentite: aggiungere campi CON default, rimuovere campi senza default
# Use case: consumer viene aggiornato PRIMA del producer
# Esempio: aggiungi campo "shipping_address" con default ""
# FORWARD: le versioni precedenti dello schema possono leggere
# i dati scritti con la nuova versione.
# Operazioni consentite: aggiungere campi senza default, rimuovere campi CON default
# Use case: producer viene aggiornato PRIMA del consumer
# FULL: sia backward che forward. La piu restrittiva.
# Operazioni consentite: SOLO aggiungere/rimuovere campi CON default
# Use case: non sai quale viene aggiornato prima
# NONE: nessun controllo di compatibilita (pericoloso in produzione)
# BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE:
# Come le versioni non-transitive ma la compatibilita e verificata
# rispetto a TUTTE le versioni precedenti, non solo l'ultima
# Configurazione globale e per-subject via REST API:
# Configurazione globale (default per tutti gli subject nuovi):
curl -X PUT http://schema-registry:8081/config \
-H "Content-Type: application/json" \
-d '{"compatibility": "FULL"}'
# Configurazione per subject specifico (override del globale):
curl -X PUT http://schema-registry:8081/config/orders-value \
-H "Content-Type: application/json" \
-d '{"compatibility": "BACKWARD"}'
# Verifica compatibilita prima di registrare (dry-run):
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/json" \
-d '{"schema": "{\"type\": \"record\", \"name\": \"Order\", ...}"}'
# Response: {"is_compatible": true}
Schema de evoluție: Exemplu practic
// Schema v1 (attuale in produzione)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
]
}
// Schema v2 - AGGIUNTA backward-compatible:
// Aggiungi campo con default -> OK con BACKWARD e FULL
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"},
// NUOVO: campo con default (null per Avro union o valore stringa)
{"name": "discount_code", "type": ["null", "string"], "default": null}
]
}
// Schema v3 - RIMOZIONE backward-compatible:
// Rimuovi campo che aveva default -> OK con BACKWARD
// (Consumer con schema v2 riceveranno il default per discount_code quando leggono v3)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"}
// discount_code rimosso: OK perche aveva default null
]
}
// Schema v4 - CAMBIAMENTO di tipo NON COMPATIBILE:
// Cambiare "amount" da double a string ROMPE backward e forward
// -> Rifiutato da Schema Registry con BACKWARD/FORWARD/FULL
// -> Devi usare un nuovo subject (nuovo topic) oppure NONE (rischio!)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "string"}, // BREAKING: double -> string
{"name": "currency", "type": "string", "default": "EUR"}
]
}
Protobuf ca alternativă la Avro
Protocol tampon (Protobuf) și alegerea Google și a multor echipe care preferă un sistem de tip mai expresiv și un IDL separat de date. Sprijinit de Confluent's Schema Registry începând cu versiunea 5.5.
// orders.proto - Schema Protobuf per ordini
syntax = "proto3";
package dev.federicocalo.orders;
option java_package = "dev.federicocalo.orders";
option java_outer_classname = "OrderProto";
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
string currency = 4;
int64 created_at_ms = 5; // timestamp in milliseconds
OrderStatus status = 6;
repeated OrderItem items = 7;
// Campo aggiunto in v2: backward-compatible in Protobuf
// (campi non presenti vengono ignorati)
string shipping_address = 8;
}
enum OrderStatus {
PENDING = 0;
CONFIRMED = 1;
SHIPPED = 2;
DELIVERED = 3;
CANCELLED = 4;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double unit_price = 3;
}
// Producer con Protobuf serializer
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
Order order = Order.newBuilder()
.setOrderId(UUID.randomUUID().toString())
.setUserId("user-42")
.setAmount(99.99)
.setCurrency("EUR")
.setCreatedAtMs(Instant.now().toEpochMilli())
.setStatus(OrderStatus.CONFIRMED)
.addItems(OrderItem.newBuilder()
.setProductId("prod-789")
.setQuantity(2)
.setUnitPrice(49.99)
.build())
.build();
producer.send(new ProducerRecord<>("orders-proto", order.getOrderId(), order));
Avro vs Protobuf vs Schema JSON: Când să folosiți care
Tabel de comparație: formate de serializare
- Avro: Compact, eficient pentru schema Hadoop/Spark evoluție bine documentată, excelentă pentru ingineria datelor. Lipsa înapoi compatibilitate după tip: Schimbarea unui tip necesită strategie. Alege dacă folosești Platformă confluentă sau are conductă către lacul de date (Avro nativ pe parchet).
- Protobuf: Excelent pentru microservicii gRPC, tipuri mai expresive (una dintre, hartă, timestamp nativ), suport IDE mai bun. Numerele câmpurilor garantează compatibilitate inversă naturală (adăugați un câmp nou = număr nou). Alege dacă aveți deja Protobuf în gRPC sau preferați IDL tastat.
- Schema JSON: Interoperabil, citibil de om, niciunul compilare. Sarcină utilă mai mare. Alegeți pentru echipe cu mai puțină experiență IaC sau pentru API-uri care trebuie să fie citite fără instrumente.
Cele mai bune practici pentru guvernarea schemelor
# 1. Naming convention per i subject
# Default (TopicNameStrategy): {topic-name}-value, {topic-name}-key
# Record name strategy (piu flessibile): {namespace}.{record-name}
# Configura su producer:
# props.put("value.subject.name.strategy",
# "io.confluent.kafka.serializers.subject.RecordNameStrategy")
# 2. Schema versioning in CI/CD
# Aggiorna lo schema nel repository -> PR review -> test compatibilita
# pre-merge -> register nel Registry di staging -> deploy producer
# Script di verifica compatibilita in CI:
#!/bin/bash
SCHEMA_FILE="src/main/avro/Order.avsc"
SUBJECT="orders-value"
REGISTRY_URL="http://schema-registry-staging:8081"
# Verifica compatibilita prima del merge
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
-X POST "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
-H "Content-Type: application/json" \
-d "{\"schema\": $(cat $SCHEMA_FILE | jq -Rs .)}")
if [ "$RESPONSE" != "200" ]; then
echo "ERRORE: Schema non compatibile con la versione in produzione"
exit 1
fi
echo "Schema compatibile: OK"
# 3. Usa schema IDs fissi nei test (non schema content)
# Questo rende i test stabili anche se lo schema evolve
# 4. Documenta ogni campo con "doc" in Avro
# Il Registry mostra la documentazione nella UI
# 5. Backup del Registry:
# Lo Schema Registry persiste gli schemi su Kafka (_schemas topic)
# Il backup del topic = backup degli schemi
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic _schemas \
--from-beginning \
> schemas-backup-$(date +%Y%m%d).json
Concluzii
Schema Registry transformă Kafka dintr-un sistem de mesagerie într-unul real platformă de date cu guvernare: contracte formale între producători și consumatori, evoluție modele controlate, erori explicite în loc de corupție tăcută. Într-o organizație cu mai multe echipe, este una dintre cele mai critice componente să fie configurat corect înainte de a intra în producție.
Seria completă: Apache Kafka
- Articolul 01 — Fundamentele Apache Kafka
- Articolul 02 — KRaft în Kafka 4.0
- Articolul 03 — Producător și Consumator avansat
- Articolul 04 — Exact-O dată Semantică în Kafka
- Articolul 05 (acest) — Schema Registry: Avro, Protobuf și Schema Evolution
- Articolul 06 — Fluxuri Kafka: KTable și Windowing
- Articolul 07 — Kafka Connect: Debezium CDC și integrare DB
- Articolul 08 — Kafka + Apache Flink: Pipeline Analytics în timp real







