Het Scheme Governance-probleem

Stel je een Kafka-systeem voor met twintig teams die berichten produceren over vijftig verschillende onderwerpen. Zonder registerschema bepaalt elk team onafhankelijk het berichtformaat: vandaag JSON met veld user_id, morgen veranderen ze naar userId. Consumenten die dat onderwerp lezen, gaan stilletjes kapot. De gegevens die binnenkomen op het datameer en inconsistent. Het debuggen van een inconsistentie wordt een onderzoek forensisch onderzoek tussen codeversies van verschillende teams.

Lo Schemaregister (Confluent, open-source) lost dit op met a formeel contract: de producent registreert het schema, de consument valideert het bericht ontvangen voldoet aan een compatibel schema. Elke poging om gegevens te verzenden incompatibel mislukt onmiddellijk met een expliciete fout, in plaats van te corrumperen het stroomafwaartse systeem stil.

Wat je gaat leren

  • Schema Registry-architectuur: hoe deze samenwerkt met producenten en consumenten
  • Avro versus Protobuf versus JSON-schema: wanneer elk te gebruiken
  • Compatibiliteitstypen: achteruit, vooruit, volledig, transitief
  • Schema-evolutie: velden toevoegen/verwijderen zonder de wijziging te verbreken
  • Java-installatie met de Avro-serializer van Avro en Confluent
  • Best practices: naamgeving van onderwerpen, versiebeheer, globale versus configuratie per onderwerp

Architectuur: hoe het schemaregister werkt

Het Schema Registry is een aparte HTTP-service van Kafka die een REST API beschikbaar stelt. Elk schema wordt geïdentificeerd door een onderwerp (normaal gesproken de naam van de onderwerp + achtervoegsel -value o -key) en een numerieke versie. Communicatie gebeurt als volgt:

# Flusso producer:
# 1. Producer crea un ProducerRecord con un oggetto Avro/Protobuf
# 2. L'Avro Serializer fa una chiamata HTTP GET al Registry:
#    "Esiste lo schema X per il subject 'orders-value'?"
# 3. Se non esiste (o e cambiato), POST per registrarlo:
#    Schema Registry valida la compatibilita con le versioni precedenti
#    Se compatibile: OK, assegna schema ID intero (es: 42)
# 4. Il serializer scrive il messaggio come:
#    [0x00] [schema_id: 4 bytes] [payload Avro serializzato]
# 5. I primi 5 byte identificano il formato "magic byte + schema ID"

# Flusso consumer:
# 1. Consumer riceve i byte del messaggio
# 2. L'Avro Deserializer legge i primi 5 byte: magic byte + schema ID
# 3. Chiama il Registry: GET /schemas/ids/42
# 4. Registry risponde con lo schema (cachato localmente dopo la prima chiamata)
# 5. Deserializza il payload usando lo schema writer (come e stato scritto)
#    e lo schema reader (come il consumer si aspetta di leggerlo)
# 6. Avro fa la conversione automatica se gli schemi sono compatibili

# Struttura del payload serializzato:
# | 0x00 | schema_id (4 bytes BE) | avro binary payload |
#   ^magic byte     ^es: 0x0000002A = 42

# Avvia lo Schema Registry (via Docker)
docker run -d \
  -p 8081:8081 \
  -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="kafka-1:9092,kafka-2:9092" \
  -e SCHEMA_REGISTRY_KAFKASTORE_TOPIC="_schemas" \
  confluentinc/cp-schema-registry:7.6.0

Avro: Schema en serialisatie

Apache Avro Het is het meest gebruikte serialisatieformaat bij Kafka vanwege zijn compactheid en sterke ondersteuning in het Confluent-ecosysteem. Het Avro-schema Het wordt gedefinieerd in JSON en wordt opgeslagen in het register.

// Schema Avro per un ordine - orders-value v1
{
  "type": "record",
  "namespace": "dev.federicocalo.orders",
  "name": "Order",
  "doc": "Schema per gli ordini e-commerce",
  "fields": [
    {
      "name": "order_id",
      "type": "string",
      "doc": "Identificatore univoco dell'ordine"
    },
    {
      "name": "user_id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    {
      "name": "currency",
      "type": "string",
      "default": "EUR"
    },
    {
      "name": "created_at",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
      },
      "default": "PENDING"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "product_id", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "unit_price", "type": "double"}
          ]
        }
      }
    }
  ]
}
// Producer con Avro serializer e Schema Registry (Maven: io.confluent:kafka-avro-serializer)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Avro Value Serializer (registra automaticamente lo schema nel Registry)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.KafkaAvroSerializer");

// URL dello Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");

// Opzionale: autenticazione al Registry (se con Confluent Cloud)
// props.put("basic.auth.credentials.source", "USER_INFO");
// props.put("basic.auth.user.info", "api-key:api-secret");

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Carica lo schema da file .avsc
Schema schema = new Schema.Parser().parse(
    new File("src/main/avro/Order.avsc")
);

// Crea un record Avro generico
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", UUID.randomUUID().toString());
order.put("user_id", "user-42");
order.put("amount", new BigDecimal("99.99"));
order.put("currency", "EUR");
order.put("created_at", Instant.now().toEpochMilli());
order.put("status", new GenericData.EnumSymbol(schema.getField("status").schema(), "CONFIRMED"));

List<GenericRecord> items = new ArrayList<>();
GenericRecord item = new GenericData.Record(schema.getField("items").schema().getElementType());
item.put("product_id", "prod-789");
item.put("quantity", 2);
item.put("unit_price", 49.99);
items.add(item);
order.put("items", items);

producer.send(new ProducerRecord<>("orders", order.get("order_id").toString(), order));
producer.flush();

Compatibiliteitsregels

De compatibiliteitsregel van een onderwerp bepaalt welke wijzigingen in het schema zijn toegestaan. Dit is het meest cruciale kenmerk van het register: dit verkeerd doen parameter kan consumenten in de productie breken.

# Tipi di compatibilita disponibili:

# BACKWARD (default): le nuove versioni dello schema possono leggere
# i dati scritti con la versione precedente.
# Operazioni consentite: aggiungere campi CON default, rimuovere campi senza default
# Use case: consumer viene aggiornato PRIMA del producer
# Esempio: aggiungi campo "shipping_address" con default ""

# FORWARD: le versioni precedenti dello schema possono leggere
# i dati scritti con la nuova versione.
# Operazioni consentite: aggiungere campi senza default, rimuovere campi CON default
# Use case: producer viene aggiornato PRIMA del consumer

# FULL: sia backward che forward. La piu restrittiva.
# Operazioni consentite: SOLO aggiungere/rimuovere campi CON default
# Use case: non sai quale viene aggiornato prima

# NONE: nessun controllo di compatibilita (pericoloso in produzione)

# BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE:
# Come le versioni non-transitive ma la compatibilita e verificata
# rispetto a TUTTE le versioni precedenti, non solo l'ultima

# Configurazione globale e per-subject via REST API:
# Configurazione globale (default per tutti gli subject nuovi):
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "FULL"}'

# Configurazione per subject specifico (override del globale):
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

# Verifica compatibilita prima di registrare (dry-run):
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/json" \
  -d '{"schema": "{\"type\": \"record\", \"name\": \"Order\", ...}"}'
# Response: {"is_compatible": true}

Evolutieschema: praktisch voorbeeld

// Schema v1 (attuale in produzione)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}

// Schema v2 - AGGIUNTA backward-compatible:
// Aggiungi campo con default -> OK con BACKWARD e FULL
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"},
    // NUOVO: campo con default (null per Avro union o valore stringa)
    {"name": "discount_code", "type": ["null", "string"], "default": null}
  ]
}

// Schema v3 - RIMOZIONE backward-compatible:
// Rimuovi campo che aveva default -> OK con BACKWARD
// (Consumer con schema v2 riceveranno il default per discount_code quando leggono v3)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
    // discount_code rimosso: OK perche aveva default null
  ]
}

// Schema v4 - CAMBIAMENTO di tipo NON COMPATIBILE:
// Cambiare "amount" da double a string ROMPE backward e forward
// -> Rifiutato da Schema Registry con BACKWARD/FORWARD/FULL
// -> Devi usare un nuovo subject (nuovo topic) oppure NONE (rischio!)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "string"},  // BREAKING: double -> string
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}

Protobuf als alternatief voor Avro

Protocolbuffers (Protobuf) en de keuze van Google en veel teams die de voorkeur geven aan een expressiever systeem en een door gegevens gescheiden IDL. Ondersteund door Confluent's Schema Registry sinds versie 5.5.

// orders.proto - Schema Protobuf per ordini
syntax = "proto3";

package dev.federicocalo.orders;

option java_package = "dev.federicocalo.orders";
option java_outer_classname = "OrderProto";

message Order {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at_ms = 5;  // timestamp in milliseconds
  OrderStatus status = 6;
  repeated OrderItem items = 7;

  // Campo aggiunto in v2: backward-compatible in Protobuf
  // (campi non presenti vengono ignorati)
  string shipping_address = 8;
}

enum OrderStatus {
  PENDING = 0;
  CONFIRMED = 1;
  SHIPPED = 2;
  DELIVERED = 3;
  CANCELLED = 4;
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double unit_price = 3;
}
// Producer con Protobuf serializer
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

Order order = Order.newBuilder()
    .setOrderId(UUID.randomUUID().toString())
    .setUserId("user-42")
    .setAmount(99.99)
    .setCurrency("EUR")
    .setCreatedAtMs(Instant.now().toEpochMilli())
    .setStatus(OrderStatus.CONFIRMED)
    .addItems(OrderItem.newBuilder()
        .setProductId("prod-789")
        .setQuantity(2)
        .setUnitPrice(49.99)
        .build())
    .build();

producer.send(new ProducerRecord<>("orders-proto", order.getOrderId(), order));

Avro versus Protobuf versus JSON-schema: wanneer moet u welke gebruiken?

Vergelijkingstabel: serialisatieformaten

  • Avro: Compact, efficiënt voor Hadoop/Spark-schema evolutie goed gedocumenteerd, geweldig voor data-engineering. Gebrek aan achteruit compatibiliteit per type: Het veranderen van een type vereist strategie. Kies of u gebruikt Confluent Platform of pijplijn naar data lake (native Avro op Parquet).
  • Protobuf: Uitstekend geschikt voor gRPC-microservices, meer expressieve typen (oneof, map, native timestamp), betere IDE-ondersteuning. De veldnummers garanderen natuurlijke achterwaartse compatibiliteit (een nieuw veld toevoegen = nieuw nummer). Kies als u Protobuf al in gRPC heeft of de voorkeur geeft aan getypt IDL.
  • JSON-schema: Interoperabel, leesbaar voor mensen, geen compilatie. Groter laadvermogen. Kies voor teams met minder IaC-ervaring of voor API's die zonder tools leesbaar moeten zijn.

Beste praktijken voor schemabeheer

# 1. Naming convention per i subject
# Default (TopicNameStrategy): {topic-name}-value, {topic-name}-key
# Record name strategy (piu flessibile): {namespace}.{record-name}
# Configura su producer:
# props.put("value.subject.name.strategy",
#     "io.confluent.kafka.serializers.subject.RecordNameStrategy")

# 2. Schema versioning in CI/CD
# Aggiorna lo schema nel repository -> PR review -> test compatibilita
# pre-merge -> register nel Registry di staging -> deploy producer

# Script di verifica compatibilita in CI:
#!/bin/bash
SCHEMA_FILE="src/main/avro/Order.avsc"
SUBJECT="orders-value"
REGISTRY_URL="http://schema-registry-staging:8081"

# Verifica compatibilita prima del merge
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
  -X POST "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
  -H "Content-Type: application/json" \
  -d "{\"schema\": $(cat $SCHEMA_FILE | jq -Rs .)}")

if [ "$RESPONSE" != "200" ]; then
  echo "ERRORE: Schema non compatibile con la versione in produzione"
  exit 1
fi

echo "Schema compatibile: OK"

# 3. Usa schema IDs fissi nei test (non schema content)
# Questo rende i test stabili anche se lo schema evolve

# 4. Documenta ogni campo con "doc" in Avro
# Il Registry mostra la documentazione nella UI

# 5. Backup del Registry:
# Lo Schema Registry persiste gli schemi su Kafka (_schemas topic)
# Il backup del topic = backup degli schemi
kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic _schemas \
  --from-beginning \
  > schemas-backup-$(date +%Y%m%d).json

Conclusies

De Schema Registry transformeert Kafka van een berichtensysteem in een echt systeem dataplatform met governance: formele contracten tussen producenten en consumenten, evolutie gecontroleerde patronen, expliciete fouten in plaats van stille corruptie. In een organisatie met meerdere teams is het een van de meest kritische componenten correct worden geconfigureerd voordat deze in productie gaat.

De complete serie: Apache Kafka

  • Artikel 01 – Apache Kafka-grondbeginselen
  • Artikel 02 — KRaft in Kafka 4.0
  • Artikel 03 - Geavanceerde producent en consument
  • Artikel 04 - Exactly-Once Semantiek in Kafka
  • Artikel 05 (dit) — Schemaregister: Avro, Protobuf en Schema-evolutie
  • Artikel 06 — Kafka-streams: KTable en Windowing
  • Artikel 07 — Kafka Connect: Debezium CDC- en DB-integratie
  • Artikel 08 — Kafka + Apache Flink: realtime pijplijnanalyse