De ce monitorizarea Kafka este critică

Apache Kafka este conceput pentru a fi rezistent, dar rezistența nu este automată: necesită observabilitate activă. Un broker supraîncărcat, un grup de consumatori care nu poate ține pasul cu producția sau o partiție cu replicare degradată ele se pot transforma în pierderi de date sau timp de nefuncționare dacă nu sunt detectate din timp.

Cele trei dimensiuni fundamentale de monitorizat în Kafka sunt:

  • Sănătatea clusterului: brokeri activi, controler activ, partiții subreplicate, partiții offline
  • Performanța producătorului: rata de solicitare, rata de eroare de înregistrare, latența solicitării, dimensiunea lotului
  • Performanța consumatorului: decalajul consumatorului per grup și per partiție, rata de comitere, frecvența de reechilibrare

Cele mai critice 5 valori Kafka

  • kafka.consumer.lag: diferența dintre LEO și compensarea angajată - valoarea #1 pentru sănătatea sistemului
  • kafka.server.UnderReplicatedPartitions: partiții cu ISR incomplet — semnal de broker degradat
  • kafka.server.OfflinePartitionsCount: partiții fără lider — eroare critică, date inaccesibile
  • kafka.network.RequestMetrics.TotalTimeMs: latența totală a cererilor
  • kafka.server.BrokerTopicMetrics.MessagesInPerSec: debitul de ingestie

Cum expune Kafka valorile: JMX

Kafka își expune toate valorile interne prin JMX (Extensii de management Java), cadrul standard pentru monitorizarea aplicațiilor JVM. Fiecare măsurătoare este identificată prin a numele MBean în format dominio:tipo=Tipo,nome=Metrica.

Pentru a face metricile JMX accesibile pentru Prometheus, cel mai folosit sistem de monitorizare într-un mediu nativ în cloud, folosesti Exportator JMX: un agent Java care expune valorile JMX ca punct final HTTP în format Prometheus (bazat pe text, model pull).

Configurați JMX Exporter ca agent Java

JMX Exporter este pornit ca agent JVM: trebuie doar să adăugați parametrul -javaagent către brokerul Kafka JVM. Descărcați JAR din depozitul oficial Prometheus și creați un fișier de configurare YAML care specifică ce MBean-uri să colecteze și cum să le redenumiți în valorile Prometheus.

# Scaricare JMX Exporter (ultima versione stabile)
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/1.0.1/jmx_prometheus_javaagent-1.0.1.jar \
  -O /opt/kafka/libs/jmx_prometheus_javaagent.jar

# File di configurazione: /opt/kafka/config/kafka-jmx-exporter.yml
# Questo file dice a JMX Exporter quali MBean raccogliere
# kafka-jmx-exporter.yml
# Configurazione completa per Kafka broker + consumer group lag
---
startDelaySeconds: 0
ssl: false
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
  # Metriche Broker: throughput messaggi
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>OneMinuteRate'
    name: kafka_server_brokertopicmetrics_messagesinpersec
    labels:
      topic: "$1"

  # Under-replicated partitions: CRITICO
  - pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
    name: kafka_server_replicamanager_underreplicatedpartitions

  # Offline partitions: CRITICO
  - pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
    name: kafka_controller_kafkacontroller_offlinepartitionscount

  # Active controller count (deve essere sempre 1)
  - pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
    name: kafka_controller_kafkacontroller_activecontrollercount

  # Request latency per tipo di request
  - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(\w+)><>(Mean|99thPercentile)'
    name: kafka_network_requestmetrics_totaltimems
    labels:
      request: "$1"
      quantile: "$2"

  # Producer request rate
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>OneMinuteRate'
    name: kafka_server_brokertopicmetrics_bytesinpersec

  # Log size per topic-partizione
  - pattern: 'kafka.log<type=Log, name=Size, topic=(.+), partition=(\d+)><>Value'
    name: kafka_log_log_size
    labels:
      topic: "$1"
      partition: "$2"

  # JVM heap usage
  - pattern: 'java.lang<type=Memory><>HeapMemoryUsage'
    name: jvm_heap_memory_usage
    type: GAUGE

Activați JMX Exporter în Broker

# Modificare kafka-server-start.sh oppure impostare la variabile d'ambiente
# Aggiungere al file bin/kafka-server-start.sh o configurare systemd

# Opzione 1: variabile d'ambiente KAFKA_OPTS
export KAFKA_OPTS="-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"

# Opzione 2: in systemd service file
# [Service]
# Environment="KAFKA_OPTS=-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"

# Verificare che l'endpoint funzioni dopo il riavvio
curl http://localhost:9404/metrics | grep kafka_server_replicamanager

Întârzierea consumatorului: cea mai importantă valoare

Il decalajul consumatorului măsoară câte înregistrări mai are de procesat consumatorul: este diferența dintre cel Log End Offset (ultima înregistrare scrisă în subiect) și Compensare comisă (ultima înregistrare procesată de grupul de consumatori). Un decalaj tot mai mare indică faptul că consumatorii nu țin pasul cu viteza de producție.

Întârzierea în sine nu expune valorile JMX ale brokerilor: trebuie colectată interogând coordonatorul grupului de consumatori. Instrumentul standard pentru aceasta este Exportator Kafka Lag (Proiect Lightbend/open source) sau pluginul kafka_consumer_group al lui Prometeu însuși.

# Docker Compose per Kafka Lag Exporter (alternativa moderna)
# https://github.com/seglo/kafka-lag-exporter

services:
  kafka-lag-exporter:
    image: seglo/kafka-lag-exporter:0.8.0
    ports:
      - "8000:8000"
    volumes:
      - ./lag-exporter.conf:/opt/docker/conf/application.conf
    environment:
      JAVA_OPTS: "-Xmx256m"
# lag-exporter.conf (formato HOCON)
kafka-lag-exporter {
  port = 8000

  clusters = [
    {
      name = "production-cluster"
      bootstrap-brokers = "kafka1:9092,kafka2:9092,kafka3:9092"

      # Polling interval
      poll-interval = 30 seconds

      # Consumer groups da monitorare (regex, vuoto = tutti)
      consumer-group-whitelist = [".*"]
    }
  ]

  # Metriche Prometheus esposte:
  # kafka_consumer_group_latest_offset
  # kafka_consumer_group_partition_lag
  # kafka_consumer_group_sum_lag  <-- lag totale per group
  # kafka_consumer_group_max_lag  <-- lag massimo tra le partizioni
}

Calculați decalajul cu PromQL

# Query PromQL per Grafana / Prometheus

# Lag totale per consumer group
sum(kafka_consumer_group_partition_lag) by (group)

# Top 10 consumer groups per lag
topk(10, sum(kafka_consumer_group_partition_lag) by (group))

# Tasso di crescita del lag (se positivo, i consumer non stanno al passo)
rate(kafka_consumer_group_sum_lag[5m])

# Consumer lag per topic specifico
sum(kafka_consumer_group_partition_lag{topic="ordini-effettuati"}) by (group)

# Alert: lag sopra soglia critica per piu di 5 minuti
# kafka_consumer_group_sum_lag{group="servizio-inventario"} > 10000

Configurați Prometheus pentru Kafka

Prometheus folosește un șablon Trage: Prometheus este cel care colectează valorile de la punctele finale expuse de exportatori, conform unui interval configurabil (scrape_interval). Configurația specifică răzuiți ținta: punctele finale HTTP de interogat.

# prometheus.yml - configurazione per scraping Kafka
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "kafka-alerts.yml"

scrape_configs:
  # JMX Exporter su ogni broker Kafka
  - job_name: "kafka-brokers"
    static_configs:
      - targets:
          - "kafka1:9404"
          - "kafka2:9404"
          - "kafka3:9404"
    relabel_configs:
      - source_labels: [__address__]
        target_label: broker
        regex: "([^:]+):.*"
        replacement: "$1"

  # Kafka Lag Exporter per consumer group metrics
  - job_name: "kafka-lag-exporter"
    static_configs:
      - targets:
          - "kafka-lag-exporter:8000"
    scrape_interval: 30s  # lag cambia meno frequentemente

  # JVM metrics dei broker (se abilitato jvm_exporter separato)
  - job_name: "kafka-jvm"
    static_configs:
      - targets:
          - "kafka1:9404"
          - "kafka2:9404"
          - "kafka3:9404"
    metrics_path: /metrics
    params:
      module: [jvm]

Reguli de alertă Kafka: Regulile esențiale

Regulile de alertă definesc condițiile care declanșează o notificare. În Prometeu se exprimă cu PromQL și sunt grupate în fișiere YAML separate. Iată regulile de bază pentru Kafka în producție:

# kafka-alerts.yml - Regole di alert per Prometheus Alertmanager
groups:
  - name: kafka.critical
    rules:
      # Broker down: nessun dato dall'endpoint JMX
      - alert: KafkaBrokerDown
        expr: up{job="kafka-brokers"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka broker {{ $labels.broker }} non raggiungibile"
          description: "Il broker {{ $labels.broker }} non risponde da piu di 1 minuto."

      # Partizioni offline: CRITICO, dati non accessibili
      - alert: KafkaOfflinePartitions
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "{{ $value }} partizioni offline in Kafka"
          description: "Partizioni senza leader: dati non leggibili/scrivibili."

      # Nessun active controller
      - alert: KafkaNoActiveController
        expr: sum(kafka_controller_kafkacontroller_activecontrollercount) != 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka senza controller attivo"

  - name: kafka.warning
    rules:
      # Under-replicated partitions: replica degradata
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "{{ $value }} partizioni under-replicated su {{ $labels.broker }}"
          description: "Replica degradata: un broker potrebbe essere lento o down."

      # Consumer lag critico
      - alert: KafkaConsumerLagHigh
        expr: sum(kafka_consumer_group_partition_lag) by (group) > 50000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Consumer lag alto per gruppo {{ $labels.group }}"
          description: "Lag: {{ $value }}. I consumer non stanno al passo con la produzione."

      # Consumer lag critico prolungato
      - alert: KafkaConsumerLagCritical
        expr: sum(kafka_consumer_group_partition_lag) by (group) > 200000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Consumer lag CRITICO per gruppo {{ $labels.group }}"

      # JVM heap usage alta
      - alert: KafkaBrokerHeapHigh
        expr: jvm_heap_memory_usage{area="used"} / jvm_heap_memory_usage{area="max"} > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Heap JVM alta su broker Kafka {{ $labels.broker }}"

Tabloul de bord Grafana pentru Kafka

Grafana este instrumentul standard de vizualizare pentru Prometheus. Există tablouri de bord gata de importat direct de la Grafana Dashboard Hub (grafana.com/dashboards). Cele mai utilizate pentru Kafka sunt:

  • ID tablou de bord 7589: Prezentare generală Kafka (sănătate broker, debit, partiții)
  • ID tablou de bord 9021: Confluent Platform Kafka Metrics
  • ID tablou de bord 13282: Kafka Consumer Lag (decalaj detaliat al grupului de consumatori)

Pentru a importa un tablou de bord în Grafana: Tablouri de bord → Import → introduceți ID → selectați sursa de date Prometheus.

Panouri cheie pe care trebuie să le aveți în tabloul de bord

# Pannello 1: Broker Health Overview
# Stat panel: up/down per ogni broker
up{job="kafka-brokers"}

# Pannello 2: Under-Replicated Partitions
# Stat panel con soglia colorata (verde=0, rosso>0)
sum(kafka_server_replicamanager_underreplicatedpartitions)

# Pannello 3: Consumer Lag per Group (time series)
sum(kafka_consumer_group_partition_lag) by (group)

# Pannello 4: Messages In/Out per secondo
rate(kafka_server_brokertopicmetrics_messagesinpersec[1m])

# Pannello 5: Request Latency p99 (heatmap o time series)
kafka_network_requestmetrics_totaltimems{quantile="99thPercentile"}

# Pannello 6: Disk Usage per Broker
# Utile per pianificare l'espansione storage
node_filesystem_size_bytes{mountpoint="/kafka-data"} -
node_filesystem_avail_bytes{mountpoint="/kafka-data"}

Docker Compose: monitorizare completă a stivei

Pentru medii de dezvoltare sau de organizare, acest Docker Compose lansează întreaga stivă de monitorizare: JMX Exporter (integrat în broker), Kafka Lag Exporter, Prometheus și Grafana.

# docker-compose.monitoring.yml
version: "3.9"

services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka
    ports:
      - "9092:9092"
      - "9404:9404"   # JMX Exporter
    volumes:
      - ./config/kafka-jmx-exporter.yml:/opt/kafka-jmx-exporter.yml
      - ./libs/jmx_prometheus_javaagent.jar:/opt/jmx_prometheus_javaagent.jar
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      CLUSTER_ID: "monitoring-demo-cluster"
      KAFKA_OPTS: "-javaagent:/opt/jmx_prometheus_javaagent.jar=9404:/opt/kafka-jmx-exporter.yml"

  kafka-lag-exporter:
    image: seglo/kafka-lag-exporter:0.8.0
    container_name: kafka-lag-exporter
    ports:
      - "8000:8000"
    volumes:
      - ./config/lag-exporter.conf:/opt/docker/conf/application.conf
    depends_on:
      - kafka

  prometheus:
    image: prom/prometheus:v2.50.1
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./config/kafka-alerts.yml:/etc/prometheus/kafka-alerts.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"
      - "--storage.tsdb.retention.time=30d"

  grafana:
    image: grafana/grafana:10.3.1
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: "kafka-monitoring"
      GF_USERS_ALLOW_SIGN_UP: "false"
    volumes:
      - grafana-data:/var/lib/grafana
      - ./config/grafana-datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yml
      - ./config/grafana-dashboards.yml:/etc/grafana/provisioning/dashboards/kafka.yml

volumes:
  grafana-data:

Monitorizarea întârzierii consumatorului cu Java (programatică)

În unele cazuri, este util să măsurați decalajul consumatorului direct în codul aplicației, de exemplu pentru a expune metrica prin Micrometru/Actuator într-o aplicație Spring Boot. Această abordare este complementară monitorizării externe și permite corelații mai precise.

// ConsumerLagMonitor.java - Misura il lag programmaticamente con Kafka AdminClient
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ConsumerLagMonitor {

    private final AdminClient adminClient;
    private final String groupId;

    public ConsumerLagMonitor(String bootstrapServers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        this.adminClient = AdminClient.create(props);
        this.groupId = groupId;
    }

    public Map<TopicPartition, Long> getLagPerPartition() throws ExecutionException, InterruptedException {
        // 1. Recupera committed offsets del consumer group
        ListConsumerGroupOffsetsResult offsetsResult =
            adminClient.listConsumerGroupOffsets(groupId);

        Map<TopicPartition, OffsetAndMetadata> committedOffsets =
            offsetsResult.partitionsToOffsetAndMetadata().get();

        // 2. Recupera l'end offset (Log End Offset) per le stesse partizioni
        Map<TopicPartition, OffsetSpec> latestOffsetRequest = committedOffsets.keySet().stream()
            .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));

        ListOffsetsResult latestOffsets = adminClient.listOffsets(latestOffsetRequest);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
            latestOffsets.all().get();

        // 3. Calcola il lag = endOffset - committedOffset
        Map<TopicPartition, Long> lag = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            long committedOffset = entry.getValue().offset();
            long endOffset = endOffsets.get(tp).offset();
            lag.put(tp, Math.max(0, endOffset - committedOffset));
        }

        return lag;
    }

    public long getTotalLag() throws ExecutionException, InterruptedException {
        return getLagPerPartition().values().stream()
            .mapToLong(Long::longValue)
            .sum();
    }

    public void close() {
        adminClient.close();
    }
}

Cele mai bune practici pentru monitorizarea Kafka

Anti-pattern: alertă la fiecare vârf de întârziere

Întârzierea consumatorului fluctuează în mod natural: un vârf momentan al producției sau o pauză GC a consumatorului ele provoacă întârziere temporară care se recuperează rapid. Configurați alerte pe praguri absolute fără o fereastră de timp (for: 10m) generează false pozitive constante. Utilizați întotdeauna o durată minimă (5-10 minute) înainte de a declanșa alerta.

  • Monitorizați tendința, nu valoarea absolută: O întârziere în scădere de 100K este mai puțin îngrijorătoare a unui decalaj de 5K în creștere. STATELE UNITE ALE AMERICII rate() în PromQL pentru a vedea derivatul.
  • Tablouri de bord separate în funcție de persoană: un tablou de bord operațional cu alerte critice pentru SRE, un tablou de bord de planificare a capacității pentru echipele de infrastructură, un tablou de bord KPI de afaceri pentru managerii de produs.
  • Includeți valori JVM: 30% dintre problemele de performanță Kafka apar din presiunea GC despre broker. Monitorizați întotdeauna jvm_gc_pause_seconds și utilizarea heap-ului.
  • Păstrați Prometheus cel puțin 30 de zile: pentru analiza tendințelor și planificarea lunară a capacității. Luați în considerare Thanos sau Mimir pentru păstrarea pe termen lung.
  • Testați alertele cu amtool: Verificați dacă regulile de alertă sunt corecte din punct de vedere sintactic corecte și că se declanșează cu datele de testare așteptate înainte de a intra în producție.

Kafka în Cloud Managed: Confluent Cloud Metrics API

Dacă utilizați Confluent Cloud sau un serviciu Kafka gestionat (AWS MSK, Aiven Kafka), valorile JMX nu sunt direct accesibil. În acest caz folosim API-ul Metrics al furnizorului de cloud. Confluent Cloud expune un API REST Metrics compatibil cu Prometheus prin remote_write:

# prometheus.yml per Confluent Cloud Metrics API
scrape_configs:
  - job_name: "confluent-cloud"
    honor_timestamps: true
    metrics_path: "/v2/metrics/cloud/export"
    scheme: https
    basic_auth:
      username: "<CONFLUENT_CLOUD_API_KEY>"
      password: "<CONFLUENT_CLOUD_API_SECRET>"
    static_configs:
      - targets:
          - "api.telemetry.confluent.cloud"
    params:
      resource.kafka.id:
        - "<CLUSTER_ID>"

# Per AWS MSK: usa il CloudWatch exporter
# https://github.com/prometheus/cloudwatch_exporter
scrape_configs:
  - job_name: "aws-msk"
    static_configs:
      - targets:
          - "cloudwatch-exporter:9106"

Rezumat: Lista de verificare a monitorizării Kafka

  • JMX Exporter configurat pe toți brokerii, portul 9404
  • Exportator Kafka Lag sau echivalent pentru valorile grupului de consumatori
  • Prometheus cu interval de scrape 15s pentru broker, 30s pentru exportator lag
  • Alerte critice: broker, partiții offline, fără controler activ
  • Avertisment de alertă: partiții subreplicate, întârziere mare a consumatorului, heap >85%
  • Tabloul de bord Grafana a fost importat (ID 7589 pentru prezentare generală, ID 13282 pentru întârziere)
  • Reținere Prometheus minim 30 de zile
  • Alerte testate cu durată minimă (pentru: 5m) pentru a evita fals pozitive

Următorii pași din serie

Ai configurat monitorizarea. Următorul pas este să știi ce să faci când apar probleme:

  • Articolul 10 – Cozi de așteptare și tratarea erorilor: modele pentru gestionarea mesajelor care nu pot fi procesate, inclusiv DLQ, reîncercarea exponențială și detectarea pilulelor otrăvitoare.
  • Articolul 11 ​​– Kafka în producție: ghid operațional complet pentru dimensionarea clusterului, configurația optimă a factorilor de retenție și replicare și MirrorMaker 2 pentru recuperarea în caz de dezastru.

Legătură cu alte serii

  • Fundamentele Apache Kafka (articolul 1): Conceptul de lag consumator necesită înțelegere diferența dintre LEO, HW și committed offset explicată în primul articol al seriei.
  • Arhitectură condusă de evenimente: Monitorizarea decalajului consumatorului este, de asemenea, critică în sistemele EDA cu AWS SQS și SNS, unde metrica echivalentă este adâncimea cozii.