Dlaczego monitorowanie Kafki jest krytyczne

Apache Kafka został zaprojektowany tak, aby był odporny, ale odporność nie jest automatyczna: wymaga aktywnej obserwowalności. Przeciążony broker, grupa konsumentów, która nie nadąża za produkcją, lub partycja z obniżoną replikacją jeśli nie zostaną wcześnie wykryte, mogą skutkować utratą danych lub przestojem.

Trzy podstawowe wymiary, które należy monitorować w Kafce to:

  • Kondycja klastra: aktywni brokerzy, aktywny kontroler, niedostatecznie zreplikowane partycje, partycje offline
  • Wydajność producenta: częstotliwość żądań, liczba błędów rekordu, opóźnienie żądań, wielkość partii
  • Wydajność konsumencka: opóźnienie konsumenckie na grupę i na partycję, współczynnik zatwierdzania, częstotliwość ponownego równoważenia

5 najbardziej krytycznych wskaźników Kafki

  • kafka.consumer.lag: różnica między LEO a zatwierdzonym przesunięciem — wskaźnik nr 1 dotyczący kondycji systemu
  • kafka.server.UnderReplicatedPartitions: partycje z niekompletnym ISR — zdegradowany sygnał brokera
  • kafka.server.OfflinePartitionsCount: partycje bez lidera — błąd krytyczny, dane niedostępne
  • kafka.network.RequestMetrics.TotalTimeMs: całkowite opóźnienie żądań
  • kafka.server.BrokerTopicMetrics.MessagesInPerSec: przepustowość przyjmowania

Jak Kafka ujawnia metryki: JMX

Kafka udostępnia wszystkie swoje wewnętrzne wskaźniki poprzez JMX (rozszerzenia zarządzania Java), standardowe ramy dla monitorowanie aplikacji JVM. Każda metryka jest identyfikowana przez Nazwa komponentu MBean w formacie dominio:tipo=Tipo,nome=Metrica.

Aby udostępnić metryki JMX Prometheusowi, najczęściej używanemu systemowi monitorowania w środowisku natywnym w chmurze, używasz Eksporter JMX: Agent Java udostępniający metryki JMX jako punkt końcowy HTTP w formacie Prometheus (tekstowy, model pull).

Skonfiguruj eksporter JMX jako agenta Java

JMX Exporter jest uruchamiany jako agent JVM: wystarczy dodać parametr -javaagent do brokera Kafka JVM. Pobierasz plik JAR z oficjalnego repozytorium Prometheus i tworzysz plik konfiguracyjny YAML, który określa które komponenty MBean mają być zbierane i jak zmienić ich nazwę na metryki Prometheus.

# Scaricare JMX Exporter (ultima versione stabile)
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/1.0.1/jmx_prometheus_javaagent-1.0.1.jar \
  -O /opt/kafka/libs/jmx_prometheus_javaagent.jar

# File di configurazione: /opt/kafka/config/kafka-jmx-exporter.yml
# Questo file dice a JMX Exporter quali MBean raccogliere
# kafka-jmx-exporter.yml
# Configurazione completa per Kafka broker + consumer group lag
---
startDelaySeconds: 0
ssl: false
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
  # Metriche Broker: throughput messaggi
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>OneMinuteRate'
    name: kafka_server_brokertopicmetrics_messagesinpersec
    labels:
      topic: "$1"

  # Under-replicated partitions: CRITICO
  - pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
    name: kafka_server_replicamanager_underreplicatedpartitions

  # Offline partitions: CRITICO
  - pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
    name: kafka_controller_kafkacontroller_offlinepartitionscount

  # Active controller count (deve essere sempre 1)
  - pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
    name: kafka_controller_kafkacontroller_activecontrollercount

  # Request latency per tipo di request
  - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(\w+)><>(Mean|99thPercentile)'
    name: kafka_network_requestmetrics_totaltimems
    labels:
      request: "$1"
      quantile: "$2"

  # Producer request rate
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>OneMinuteRate'
    name: kafka_server_brokertopicmetrics_bytesinpersec

  # Log size per topic-partizione
  - pattern: 'kafka.log<type=Log, name=Size, topic=(.+), partition=(\d+)><>Value'
    name: kafka_log_log_size
    labels:
      topic: "$1"
      partition: "$2"

  # JVM heap usage
  - pattern: 'java.lang<type=Memory><>HeapMemoryUsage'
    name: jvm_heap_memory_usage
    type: GAUGE

Włącz eksporter JMX w brokerze

# Modificare kafka-server-start.sh oppure impostare la variabile d'ambiente
# Aggiungere al file bin/kafka-server-start.sh o configurare systemd

# Opzione 1: variabile d'ambiente KAFKA_OPTS
export KAFKA_OPTS="-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"

# Opzione 2: in systemd service file
# [Service]
# Environment="KAFKA_OPTS=-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"

# Verificare che l'endpoint funzioni dopo il riavvio
curl http://localhost:9404/metrics | grep kafka_server_replicamanager

Opóźnienie konsumenta: najważniejszy wskaźnik

Il opóźnienie konsumenckie mierzy, ile rekordów konsument musi jeszcze przetworzyć: jest to różnica pomiędzy the Przesunięcie końca dziennika (ostatni zapis zapisany w temacie) i Zaangażowane przesunięcie (ostatni rekord przetworzony przez grupę konsumentów). Rosnące opóźnienie wskazuje, że konsumenci nie nadążają za tempem produkcji.

Samo opóźnienie nie ujawnia wskaźników JMX brokerów: należy je zebrać, wysyłając zapytanie do koordynatora grupy konsumentów. Standardowym narzędziem do tego jest Eksporter Kafka Lag (Projekt Lightbend/open source) lub wtyczkę grupa_konsumentów kafka samego Prometeusza.

# Docker Compose per Kafka Lag Exporter (alternativa moderna)
# https://github.com/seglo/kafka-lag-exporter

services:
  kafka-lag-exporter:
    image: seglo/kafka-lag-exporter:0.8.0
    ports:
      - "8000:8000"
    volumes:
      - ./lag-exporter.conf:/opt/docker/conf/application.conf
    environment:
      JAVA_OPTS: "-Xmx256m"
# lag-exporter.conf (formato HOCON)
kafka-lag-exporter {
  port = 8000

  clusters = [
    {
      name = "production-cluster"
      bootstrap-brokers = "kafka1:9092,kafka2:9092,kafka3:9092"

      # Polling interval
      poll-interval = 30 seconds

      # Consumer groups da monitorare (regex, vuoto = tutti)
      consumer-group-whitelist = [".*"]
    }
  ]

  # Metriche Prometheus esposte:
  # kafka_consumer_group_latest_offset
  # kafka_consumer_group_partition_lag
  # kafka_consumer_group_sum_lag  <-- lag totale per group
  # kafka_consumer_group_max_lag  <-- lag massimo tra le partizioni
}

Oblicz opóźnienie za pomocą PromQL

# Query PromQL per Grafana / Prometheus

# Lag totale per consumer group
sum(kafka_consumer_group_partition_lag) by (group)

# Top 10 consumer groups per lag
topk(10, sum(kafka_consumer_group_partition_lag) by (group))

# Tasso di crescita del lag (se positivo, i consumer non stanno al passo)
rate(kafka_consumer_group_sum_lag[5m])

# Consumer lag per topic specifico
sum(kafka_consumer_group_partition_lag{topic="ordini-effettuati"}) by (group)

# Alert: lag sopra soglia critica per piu di 5 minuti
# kafka_consumer_group_sum_lag{group="servizio-inventario"} > 10000

Skonfiguruj Prometeusza dla Kafki

Prometeusz posługuje się szablonem ciągnąć: to Prometheus zbiera metryki z punktów końcowych eksponowanych przez eksporterów, według konfigurowalnego interwału (scrape_interval). Konfiguracja określa zeskrobać cel: Punkty końcowe HTTP do zapytania.

# prometheus.yml - configurazione per scraping Kafka
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "kafka-alerts.yml"

scrape_configs:
  # JMX Exporter su ogni broker Kafka
  - job_name: "kafka-brokers"
    static_configs:
      - targets:
          - "kafka1:9404"
          - "kafka2:9404"
          - "kafka3:9404"
    relabel_configs:
      - source_labels: [__address__]
        target_label: broker
        regex: "([^:]+):.*"
        replacement: "$1"

  # Kafka Lag Exporter per consumer group metrics
  - job_name: "kafka-lag-exporter"
    static_configs:
      - targets:
          - "kafka-lag-exporter:8000"
    scrape_interval: 30s  # lag cambia meno frequentemente

  # JVM metrics dei broker (se abilitato jvm_exporter separato)
  - job_name: "kafka-jvm"
    static_configs:
      - targets:
          - "kafka1:9404"
          - "kafka2:9404"
          - "kafka3:9404"
    metrics_path: /metrics
    params:
      module: [jvm]

Zasady alertów Kafka: podstawowe zasady

Reguły alertów definiują warunki wyzwalające powiadomienie. W Prometeuszu wyrażają siebie z PromQL i są pakowane w osobne pliki YAML. Oto podstawowe zasady dotyczące Kafki w produkcji:

# kafka-alerts.yml - Regole di alert per Prometheus Alertmanager
groups:
  - name: kafka.critical
    rules:
      # Broker down: nessun dato dall'endpoint JMX
      - alert: KafkaBrokerDown
        expr: up{job="kafka-brokers"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka broker {{ $labels.broker }} non raggiungibile"
          description: "Il broker {{ $labels.broker }} non risponde da piu di 1 minuto."

      # Partizioni offline: CRITICO, dati non accessibili
      - alert: KafkaOfflinePartitions
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "{{ $value }} partizioni offline in Kafka"
          description: "Partizioni senza leader: dati non leggibili/scrivibili."

      # Nessun active controller
      - alert: KafkaNoActiveController
        expr: sum(kafka_controller_kafkacontroller_activecontrollercount) != 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka senza controller attivo"

  - name: kafka.warning
    rules:
      # Under-replicated partitions: replica degradata
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "{{ $value }} partizioni under-replicated su {{ $labels.broker }}"
          description: "Replica degradata: un broker potrebbe essere lento o down."

      # Consumer lag critico
      - alert: KafkaConsumerLagHigh
        expr: sum(kafka_consumer_group_partition_lag) by (group) > 50000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Consumer lag alto per gruppo {{ $labels.group }}"
          description: "Lag: {{ $value }}. I consumer non stanno al passo con la produzione."

      # Consumer lag critico prolungato
      - alert: KafkaConsumerLagCritical
        expr: sum(kafka_consumer_group_partition_lag) by (group) > 200000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Consumer lag CRITICO per gruppo {{ $labels.group }}"

      # JVM heap usage alta
      - alert: KafkaBrokerHeapHigh
        expr: jvm_heap_memory_usage{area="used"} / jvm_heap_memory_usage{area="max"} > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Heap JVM alta su broker Kafka {{ $labels.broker }}"

Panel Grafana dla Kafki

Grafana to standardowe narzędzie do wizualizacji dla Prometheusa. Istnieją gotowe do importu dashboardy bezpośrednio z Grafana Dashboard Hub (grafana.com/dashboards). Najczęściej używane w przypadku Kafki to:

  • Identyfikator deski rozdzielczej 7589: Przegląd Kafki (kondycja brokera, przepustowość, partycje)
  • Identyfikator deski rozdzielczej 9021: Metryki platformy konfluentnej Kafki
  • Identyfikator deski rozdzielczej 13282: Opóźnienie konsumenckie Kafki (szczegółowe opóźnienie grupy konsumentów)

Aby zaimportować dashboard do Grafany: Panele → Import → wprowadź ID → wybierz źródło danych Prometheus.

Panele klawiszy, które warto mieć na swoim pulpicie nawigacyjnym

# Pannello 1: Broker Health Overview
# Stat panel: up/down per ogni broker
up{job="kafka-brokers"}

# Pannello 2: Under-Replicated Partitions
# Stat panel con soglia colorata (verde=0, rosso>0)
sum(kafka_server_replicamanager_underreplicatedpartitions)

# Pannello 3: Consumer Lag per Group (time series)
sum(kafka_consumer_group_partition_lag) by (group)

# Pannello 4: Messages In/Out per secondo
rate(kafka_server_brokertopicmetrics_messagesinpersec[1m])

# Pannello 5: Request Latency p99 (heatmap o time series)
kafka_network_requestmetrics_totaltimems{quantile="99thPercentile"}

# Pannello 6: Disk Usage per Broker
# Utile per pianificare l'espansione storage
node_filesystem_size_bytes{mountpoint="/kafka-data"} -
node_filesystem_avail_bytes{mountpoint="/kafka-data"}

Docker Compose: pełne monitorowanie stosu

W środowiskach programistycznych lub przejściowych ten Docker Compose uruchamia cały stos monitorowania: JMX Exporter (zintegrowany z brokerem), Kafka Lag Exporter, Prometheus i Grafana.

# docker-compose.monitoring.yml
version: "3.9"

services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka
    ports:
      - "9092:9092"
      - "9404:9404"   # JMX Exporter
    volumes:
      - ./config/kafka-jmx-exporter.yml:/opt/kafka-jmx-exporter.yml
      - ./libs/jmx_prometheus_javaagent.jar:/opt/jmx_prometheus_javaagent.jar
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      CLUSTER_ID: "monitoring-demo-cluster"
      KAFKA_OPTS: "-javaagent:/opt/jmx_prometheus_javaagent.jar=9404:/opt/kafka-jmx-exporter.yml"

  kafka-lag-exporter:
    image: seglo/kafka-lag-exporter:0.8.0
    container_name: kafka-lag-exporter
    ports:
      - "8000:8000"
    volumes:
      - ./config/lag-exporter.conf:/opt/docker/conf/application.conf
    depends_on:
      - kafka

  prometheus:
    image: prom/prometheus:v2.50.1
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./config/kafka-alerts.yml:/etc/prometheus/kafka-alerts.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"
      - "--storage.tsdb.retention.time=30d"

  grafana:
    image: grafana/grafana:10.3.1
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: "kafka-monitoring"
      GF_USERS_ALLOW_SIGN_UP: "false"
    volumes:
      - grafana-data:/var/lib/grafana
      - ./config/grafana-datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yml
      - ./config/grafana-dashboards.yml:/etc/grafana/provisioning/dashboards/kafka.yml

volumes:
  grafana-data:

Monitorowanie opóźnień konsumenckich za pomocą języka Java (programowe)

W niektórych przypadkach przydatne jest zmierzenie opóźnień konsumenckich bezpośrednio w kodzie aplikacji, na przykład, aby wyświetlić metrykę za pomocą mikrometru/siłownika w aplikacji Spring Boot. Podejście to stanowi uzupełnienie monitoringu zewnętrznego i pozwala na dokładniejsze korelacje.

// ConsumerLagMonitor.java - Misura il lag programmaticamente con Kafka AdminClient
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ConsumerLagMonitor {

    private final AdminClient adminClient;
    private final String groupId;

    public ConsumerLagMonitor(String bootstrapServers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        this.adminClient = AdminClient.create(props);
        this.groupId = groupId;
    }

    public Map<TopicPartition, Long> getLagPerPartition() throws ExecutionException, InterruptedException {
        // 1. Recupera committed offsets del consumer group
        ListConsumerGroupOffsetsResult offsetsResult =
            adminClient.listConsumerGroupOffsets(groupId);

        Map<TopicPartition, OffsetAndMetadata> committedOffsets =
            offsetsResult.partitionsToOffsetAndMetadata().get();

        // 2. Recupera l'end offset (Log End Offset) per le stesse partizioni
        Map<TopicPartition, OffsetSpec> latestOffsetRequest = committedOffsets.keySet().stream()
            .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));

        ListOffsetsResult latestOffsets = adminClient.listOffsets(latestOffsetRequest);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
            latestOffsets.all().get();

        // 3. Calcola il lag = endOffset - committedOffset
        Map<TopicPartition, Long> lag = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            long committedOffset = entry.getValue().offset();
            long endOffset = endOffsets.get(tp).offset();
            lag.put(tp, Math.max(0, endOffset - committedOffset));
        }

        return lag;
    }

    public long getTotalLag() throws ExecutionException, InterruptedException {
        return getLagPerPartition().values().stream()
            .mapToLong(Long::longValue)
            .sum();
    }

    public void close() {
        adminClient.close();
    }
}

Najlepsze praktyki monitorowania platformy Kafka

Anti-Pattern: Alarm przy każdym skoku opóźnienia

Opóźnienie konsumenta zmienia się w sposób naturalny: chwilowy szczyt produkcji lub przerwa w GC konsumenta powodują tymczasowe opóźnienie, które szybko ustępuje. Skonfiguruj alerty dotyczące progów bezwzględnych bez okno czasowe (for: 10m) generuje stale fałszywe alarmy. Zawsze używaj minimalnego czasu trwania (5-10 minut) przed wyzwoleniem alertu.

  • Monitoruj trend, a nie wartość bezwzględną: Malejące opóźnienie wynoszące 100 tys. nie stanowi większego problemu rosnącego opóźnienia wynoszącego 5 tys. USA rate() w PromQL, aby zobaczyć pochodną.
  • Oddzielne pulpity nawigacyjne według osób: pulpit operacyjny z alertami krytycznymi dla SRE, dashboard planowania wydajności dla zespołów zajmujących się infrastrukturą, dashboard biznesowy KPI dla menadżerów produktu.
  • Uwzględnij metryki JVM: 30% problemów z wydajnością Kafki wynika z ciśnienia GC o brokerze. Zawsze monitoruj jvm_gc_pause_seconds i wykorzystanie sterty.
  • Przechowuj Prometeusza przez co najmniej 30 dni: do analizy trendów i miesięcznego planowania wydajności. Rozważ Thanosa lub Mimira w celu długoterminowego przechowywania.
  • Testuj alerty za pomocą amtool: Sprawdź, czy reguły alertów są poprawne składniowo prawidłowe i uruchamiane zgodnie z oczekiwanymi danymi testowymi przed wejściem do produkcji.

Kafka w chmurze zarządzanej: Confluent Cloud Metrics API

Jeśli korzystasz z Confluent Cloud lub zarządzanej usługi Kafka (AWS MSK, Aiven Kafka), metryki JMX nie są bezpośrednio dostępne. W tym przypadku używamy Interfejs API metryk dostawcy chmury. Confluent Cloud udostępnia interfejs API REST Metrics zgodny z Prometheus za pośrednictwem zdalnego_write:

# prometheus.yml per Confluent Cloud Metrics API
scrape_configs:
  - job_name: "confluent-cloud"
    honor_timestamps: true
    metrics_path: "/v2/metrics/cloud/export"
    scheme: https
    basic_auth:
      username: "<CONFLUENT_CLOUD_API_KEY>"
      password: "<CONFLUENT_CLOUD_API_SECRET>"
    static_configs:
      - targets:
          - "api.telemetry.confluent.cloud"
    params:
      resource.kafka.id:
        - "<CLUSTER_ID>"

# Per AWS MSK: usa il CloudWatch exporter
# https://github.com/prometheus/cloudwatch_exporter
scrape_configs:
  - job_name: "aws-msk"
    static_configs:
      - targets:
          - "cloudwatch-exporter:9106"

Podsumowanie: Lista kontrolna monitorowania Kafki

  • JMX Exporter skonfigurowany na wszystkich brokerach, port 9404
  • Kafka Lag Exporter lub odpowiednik dla metryk grup konsumentów
  • Prometheus z interwałem skrobania 15 s dla brokera, 30 s dla eksportera opóźnień
  • Alerty krytyczne: awaria brokera, partycje offline, brak aktywnego kontrolera
  • Alert ostrzegawczy: niedostatecznie zreplikowane partycje, duże opóźnienie konsumenckie, sterta > 85%
  • Zaimportowano pulpit nawigacyjny Grafana (ID 7589 dla przeglądu, ID 13282 dla opóźnienia)
  • Przechowywanie Prometheus minimum 30 dni
  • Alerty testowane z minimalnym czasem trwania (dla: 5 m), aby uniknąć fałszywych alarmów

Kolejne kroki w serii

Skonfigurowałeś monitorowanie. Następnym krokiem jest wiedza, co zrobić, gdy wystąpią problemy:

  • Artykuł 10 – Kolejka niedostarczonych listów i obsługa błędów: wzorce zarządzania wiadomościami które nie zostały przetworzone, w tym DLQ, wykładnicza ponowna próba i wykrywanie trujących pigułek.
  • Artykuł 11 – Kafka w produkcji: kompletny przewodnik operacyjny dotyczący rozmiaru klastra, optymalną konfigurację czynników retencji i replikacji oraz MirrorMaker 2 do odzyskiwania po awarii.

Połącz z innymi seriami

  • Podstawy Apache Kafka (artykuł 1): Koncepcja opóźnień konsumenckich wymaga zrozumienia różnica między LEO, HW i zatwierdzonym offsetem wyjaśniona w pierwszym artykule z serii.
  • Architektura sterowana zdarzeniami: Monitorowanie opóźnień konsumenckich jest również krytyczne w systemach EDA z AWS SQS i SNS, gdzie równoważną metryką jest głębokość kolejki.