Problém správy systému

Představte si systém Kafka s 20 týmy produkujícími zprávy na 50 různých témat. Bez schématu registru každý tým rozhoduje o formátu zprávy nezávisle: dnes JSON s polem user_id, zítra se změní na userId. Spotřebitelé, kteří toto téma čtou, se tiše zhroutí. Data, která přicházejí na datovém jezeře a nekonzistentní. Ladění nekonzistence se stává vyšetřováním forenzní mezi verzemi kódu z různých týmů.

Lo Registr schémat (Confluent, open-source) to řeší pomocí a formální smlouva: výrobce zaregistruje schéma, spotřebitel potvrdí zprávu přijaté odpovídá kompatibilnímu schématu. Jakýkoli pokus o odeslání dat nekompatibilní selže okamžitě s explicitní chybou, místo aby došlo k poškození následný systém tiše.

Co se naučíte

  • Architektura registru schémat: jak interaguje s výrobci a spotřebiteli
  • Schéma Avro vs Protobuf vs JSON: kdy je použít
  • Typy kompatibility: zpětná, vpřed, plná, tranzitivní
  • Vývoj schématu: přidat/odebrat pole bez porušení změny
  • Nastavení Java pomocí Avro a Confluent's Avro serializer
  • Osvědčené postupy: pojmenování předmětů, verzování, globální vs konfigurace podle předmětu

Architektura: Jak funguje registr schémat

Registr schémat je samostatná služba HTTP od Kafky, která zpřístupňuje REST API. Každé schéma je označeno a podrobit (obvykle název téma + přípona -value o -key) a číselnou verzi. Komunikace probíhá takto:

# Flusso producer:
# 1. Producer crea un ProducerRecord con un oggetto Avro/Protobuf
# 2. L'Avro Serializer fa una chiamata HTTP GET al Registry:
#    "Esiste lo schema X per il subject 'orders-value'?"
# 3. Se non esiste (o e cambiato), POST per registrarlo:
#    Schema Registry valida la compatibilita con le versioni precedenti
#    Se compatibile: OK, assegna schema ID intero (es: 42)
# 4. Il serializer scrive il messaggio come:
#    [0x00] [schema_id: 4 bytes] [payload Avro serializzato]
# 5. I primi 5 byte identificano il formato "magic byte + schema ID"

# Flusso consumer:
# 1. Consumer riceve i byte del messaggio
# 2. L'Avro Deserializer legge i primi 5 byte: magic byte + schema ID
# 3. Chiama il Registry: GET /schemas/ids/42
# 4. Registry risponde con lo schema (cachato localmente dopo la prima chiamata)
# 5. Deserializza il payload usando lo schema writer (come e stato scritto)
#    e lo schema reader (come il consumer si aspetta di leggerlo)
# 6. Avro fa la conversione automatica se gli schemi sono compatibili

# Struttura del payload serializzato:
# | 0x00 | schema_id (4 bytes BE) | avro binary payload |
#   ^magic byte     ^es: 0x0000002A = 42

# Avvia lo Schema Registry (via Docker)
docker run -d \
  -p 8081:8081 \
  -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="kafka-1:9092,kafka-2:9092" \
  -e SCHEMA_REGISTRY_KAFKASTORE_TOPIC="_schemas" \
  confluentinc/cp-schema-registry:7.6.0

Avro: Schéma a serializace

Apache Avro Je to nejpoužívanější serializační formát s Kafkou pro svou kompaktnost a silnou podporu v ekosystému Confluent. Schéma Avro Je definován v JSON a je uložen v registru.

// Schema Avro per un ordine - orders-value v1
{
  "type": "record",
  "namespace": "dev.federicocalo.orders",
  "name": "Order",
  "doc": "Schema per gli ordini e-commerce",
  "fields": [
    {
      "name": "order_id",
      "type": "string",
      "doc": "Identificatore univoco dell'ordine"
    },
    {
      "name": "user_id",
      "type": "string"
    },
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    {
      "name": "currency",
      "type": "string",
      "default": "EUR"
    },
    {
      "name": "created_at",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": ["PENDING", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
      },
      "default": "PENDING"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "product_id", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "unit_price", "type": "double"}
          ]
        }
      }
    }
  ]
}
// Producer con Avro serializer e Schema Registry (Maven: io.confluent:kafka-avro-serializer)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Avro Value Serializer (registra automaticamente lo schema nel Registry)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.KafkaAvroSerializer");

// URL dello Schema Registry
props.put("schema.registry.url", "http://schema-registry:8081");

// Opzionale: autenticazione al Registry (se con Confluent Cloud)
// props.put("basic.auth.credentials.source", "USER_INFO");
// props.put("basic.auth.user.info", "api-key:api-secret");

KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

// Carica lo schema da file .avsc
Schema schema = new Schema.Parser().parse(
    new File("src/main/avro/Order.avsc")
);

// Crea un record Avro generico
GenericRecord order = new GenericData.Record(schema);
order.put("order_id", UUID.randomUUID().toString());
order.put("user_id", "user-42");
order.put("amount", new BigDecimal("99.99"));
order.put("currency", "EUR");
order.put("created_at", Instant.now().toEpochMilli());
order.put("status", new GenericData.EnumSymbol(schema.getField("status").schema(), "CONFIRMED"));

List<GenericRecord> items = new ArrayList<>();
GenericRecord item = new GenericData.Record(schema.getField("items").schema().getElementType());
item.put("product_id", "prod-789");
item.put("quantity", 2);
item.put("unit_price", 49.99);
items.add(item);
order.put("items", items);

producer.send(new ProducerRecord<>("orders", order.get("order_id").toString(), order));
producer.flush();

Pravidla kompatibility

Pravidlo kompatibility subjektu určuje, které změny schématu jsou povoleny. Toto je nejkritičtější funkce registru: špatně parametr může zlomit spotřebitele ve výrobě.

# Tipi di compatibilita disponibili:

# BACKWARD (default): le nuove versioni dello schema possono leggere
# i dati scritti con la versione precedente.
# Operazioni consentite: aggiungere campi CON default, rimuovere campi senza default
# Use case: consumer viene aggiornato PRIMA del producer
# Esempio: aggiungi campo "shipping_address" con default ""

# FORWARD: le versioni precedenti dello schema possono leggere
# i dati scritti con la nuova versione.
# Operazioni consentite: aggiungere campi senza default, rimuovere campi CON default
# Use case: producer viene aggiornato PRIMA del consumer

# FULL: sia backward che forward. La piu restrittiva.
# Operazioni consentite: SOLO aggiungere/rimuovere campi CON default
# Use case: non sai quale viene aggiornato prima

# NONE: nessun controllo di compatibilita (pericoloso in produzione)

# BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE:
# Come le versioni non-transitive ma la compatibilita e verificata
# rispetto a TUTTE le versioni precedenti, non solo l'ultima

# Configurazione globale e per-subject via REST API:
# Configurazione globale (default per tutti gli subject nuovi):
curl -X PUT http://schema-registry:8081/config \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "FULL"}'

# Configurazione per subject specifico (override del globale):
curl -X PUT http://schema-registry:8081/config/orders-value \
  -H "Content-Type: application/json" \
  -d '{"compatibility": "BACKWARD"}'

# Verifica compatibilita prima di registrare (dry-run):
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/json" \
  -d '{"schema": "{\"type\": \"record\", \"name\": \"Order\", ...}"}'
# Response: {"is_compatible": true}

Evoluční schéma: Praktický příklad

// Schema v1 (attuale in produzione)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}

// Schema v2 - AGGIUNTA backward-compatible:
// Aggiungi campo con default -> OK con BACKWARD e FULL
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"},
    // NUOVO: campo con default (null per Avro union o valore stringa)
    {"name": "discount_code", "type": ["null", "string"], "default": null}
  ]
}

// Schema v3 - RIMOZIONE backward-compatible:
// Rimuovi campo che aveva default -> OK con BACKWARD
// (Consumer con schema v2 riceveranno il default per discount_code quando leggono v3)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"}
    // discount_code rimosso: OK perche aveva default null
  ]
}

// Schema v4 - CAMBIAMENTO di tipo NON COMPATIBILE:
// Cambiare "amount" da double a string ROMPE backward e forward
// -> Rifiutato da Schema Registry con BACKWARD/FORWARD/FULL
// -> Devi usare un nuovo subject (nuovo topic) oppure NONE (rischio!)
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "string"},  // BREAKING: double -> string
    {"name": "currency", "type": "string", "default": "EUR"}
  ]
}

Protobuf jako alternativa k Avro

Protokolové vyrovnávací paměti (Protobuf) a výběr společnosti Google a mnoha týmů kteří preferují výraznější typový systém a datově oddělený IDL. Podporováno Confluent's Schema Registry od verze 5.5.

// orders.proto - Schema Protobuf per ordini
syntax = "proto3";

package dev.federicocalo.orders;

option java_package = "dev.federicocalo.orders";
option java_outer_classname = "OrderProto";

message Order {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
  string currency = 4;
  int64 created_at_ms = 5;  // timestamp in milliseconds
  OrderStatus status = 6;
  repeated OrderItem items = 7;

  // Campo aggiunto in v2: backward-compatible in Protobuf
  // (campi non presenti vengono ignorati)
  string shipping_address = 8;
}

enum OrderStatus {
  PENDING = 0;
  CONFIRMED = 1;
  SHIPPED = 2;
  DELIVERED = 3;
  CANCELLED = 4;
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double unit_price = 3;
}
// Producer con Protobuf serializer
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

Order order = Order.newBuilder()
    .setOrderId(UUID.randomUUID().toString())
    .setUserId("user-42")
    .setAmount(99.99)
    .setCurrency("EUR")
    .setCreatedAtMs(Instant.now().toEpochMilli())
    .setStatus(OrderStatus.CONFIRMED)
    .addItems(OrderItem.newBuilder()
        .setProductId("prod-789")
        .setQuantity(2)
        .setUnitPrice(49.99)
        .build())
    .build();

producer.send(new ProducerRecord<>("orders-proto", order.getOrderId(), order));

Schéma Avro vs Protobuf vs JSON: Kdy použít který

Srovnávací tabulka: Formáty serializace

  • Avro: Kompaktní, efektivní pro Hadoop/Spark, schéma vývoj dobře zdokumentovaný, skvělé pro datové inženýrství. Nedostatek vzad kompatibilita podle typu: Změna typu vyžaduje strategii. Vyberte, zda používáte Confluent Platform nebo mít potrubí do datového jezera (nativní Avro on Parquet).
  • Protobuf: Vynikající pro mikroslužby gRPC, výraznější typy (jeden z, mapa, nativní časové razítko), lepší podpora IDE. Čísla polí zaručují přirozená zpětná kompatibilita (přidat nové pole = nové číslo). Vyberte si pokud již máte Protobuf v gRPC nebo dáváte přednost typovanému IDL.
  • Schéma JSON: Interoperabilní, čitelný pro člověka, žádný kompilace. Větší užitečné zatížení. Vyberte si pro týmy s menšími zkušenostmi s IaC nebo pro API, která musí být čitelná bez nástrojů.

Nejlepší postupy pro správu systému

# 1. Naming convention per i subject
# Default (TopicNameStrategy): {topic-name}-value, {topic-name}-key
# Record name strategy (piu flessibile): {namespace}.{record-name}
# Configura su producer:
# props.put("value.subject.name.strategy",
#     "io.confluent.kafka.serializers.subject.RecordNameStrategy")

# 2. Schema versioning in CI/CD
# Aggiorna lo schema nel repository -> PR review -> test compatibilita
# pre-merge -> register nel Registry di staging -> deploy producer

# Script di verifica compatibilita in CI:
#!/bin/bash
SCHEMA_FILE="src/main/avro/Order.avsc"
SUBJECT="orders-value"
REGISTRY_URL="http://schema-registry-staging:8081"

# Verifica compatibilita prima del merge
RESPONSE=$(curl -s -o /dev/null -w "%{http_code}" \
  -X POST "${REGISTRY_URL}/compatibility/subjects/${SUBJECT}/versions/latest" \
  -H "Content-Type: application/json" \
  -d "{\"schema\": $(cat $SCHEMA_FILE | jq -Rs .)}")

if [ "$RESPONSE" != "200" ]; then
  echo "ERRORE: Schema non compatibile con la versione in produzione"
  exit 1
fi

echo "Schema compatibile: OK"

# 3. Usa schema IDs fissi nei test (non schema content)
# Questo rende i test stabili anche se lo schema evolve

# 4. Documenta ogni campo con "doc" in Avro
# Il Registry mostra la documentazione nella UI

# 5. Backup del Registry:
# Lo Schema Registry persiste gli schemi su Kafka (_schemas topic)
# Il backup del topic = backup degli schemi
kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic _schemas \
  --from-beginning \
  > schemas-backup-$(date +%Y%m%d).json

Závěry

Schema Registry transformuje Kafku ze systému zasílání zpráv na skutečný datová platforma s řízením: formální smlouvy mezi výrobci a spotřebiteli, vývoj kontrolované vzory, explicitní chyby místo tiché korupce. V organizaci s více týmy je to jedna z nejkritičtějších součástí být správně nakonfigurován před uvedením do výroby.

Kompletní série: Apache Kafka

  • článek 01 — Základy Apache Kafky
  • článek 02 — KRaft v Kafce 4.0
  • článek 03 — Pokročilý výrobce a spotřebitel
  • článek 04 — Sémantika přesně jednou u Kafky
  • článek 05 (tento) — Registr schémat: Avro, Protobuf a Schema Evolution
  • článek 06 — Kafka Streams: KTable a Windowing
  • článek 07 — Kafka Connect: Debezium CDC a integrace DB
  • článek 08 — Kafka + Apache Flink: Pipeline Analytics v reálném čase