Kafka モニタリングが重要な理由

Apache Kafka は回復力があるように設計されていますが、回復力は自動的には得られません。アクティブな可観測性が必要です。 過負荷のブローカー、本番環境に追いつけないコンシューマ グループ、またはレプリケーションが低下したパーティション 早期に検出しないと、データ損失やダウンタイムが発生する可能性があります。

Kafka で監視する 3 つの基本的な側面は次のとおりです。

  • クラスターの健全性: アクティブなブローカー、アクティブなコントローラー、複製中のパーティション、オフラインのパーティション
  • プロデューサーのパフォーマンス: リクエストレート、レコードエラーレート、リクエストレイテンシ、バッチサイズ
  • 消費者のパフォーマンス: グループごとおよびパーティションごとのコンシューマ ラグ、コミット レート、リバランス頻度

最も重要な 5 つの Kafka メトリクス

  • kafka.consumer.lag: LEO とコミットされたオフセットの差 - システムの健全性に関する最大の指標
  • kafka.server.UnderReplicatedPartitions: ISR が不完全なパーティション — ブローカー信号の劣化
  • kafka.server.OfflinePartitionsCount: リーダーなしパーティション - 重大なエラー、データにアクセスできない
  • kafka.network.RequestMetrics.TotalTimeMs: リクエストの合計レイテンシ
  • kafka.server.BrokerTopicMetrics.MessagesInPerSec: 取り込みスループット

Kafka がメトリクスを公開する方法: JMX

Kafka は、すべての内部メトリクスを次の方法で公開します。 JMX (Java 管理拡張機能)の標準フレームワーク JVM アプリケーションの監視。各メトリクスは次のように識別されます。 MBean名 形式で dominio:tipo=Tipo,nome=Metrica.

クラウドネイティブ環境で最もよく使用される監視システムである Prometheus に JMX メトリクスにアクセスできるようにするには、 あなたはそれを使用します JMXエクスポーター: JMX メトリクスを HTTP エンドポイントとして公開する Java エージェント Prometheus 形式 (テキストベース、プル モデル)。

JMX エクスポーターを Java エージェントとして構成する

JMX Exporter は JVM エージェントとして起動されます。パラメータを追加するだけです。 -javaagent Kafka ブローカー JVM に送信します。 公式 Prometheus リポジトリから JAR をダウンロードし、以下を指定する YAML 構成ファイルを作成します。 どの MBean を収集するか、およびそれらの名前を Prometheus メトリックに変更する方法。

# Scaricare JMX Exporter (ultima versione stabile)
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/1.0.1/jmx_prometheus_javaagent-1.0.1.jar \
  -O /opt/kafka/libs/jmx_prometheus_javaagent.jar

# File di configurazione: /opt/kafka/config/kafka-jmx-exporter.yml
# Questo file dice a JMX Exporter quali MBean raccogliere
# kafka-jmx-exporter.yml
# Configurazione completa per Kafka broker + consumer group lag
---
startDelaySeconds: 0
ssl: false
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
  # Metriche Broker: throughput messaggi
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>OneMinuteRate'
    name: kafka_server_brokertopicmetrics_messagesinpersec
    labels:
      topic: "$1"

  # Under-replicated partitions: CRITICO
  - pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
    name: kafka_server_replicamanager_underreplicatedpartitions

  # Offline partitions: CRITICO
  - pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
    name: kafka_controller_kafkacontroller_offlinepartitionscount

  # Active controller count (deve essere sempre 1)
  - pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
    name: kafka_controller_kafkacontroller_activecontrollercount

  # Request latency per tipo di request
  - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(\w+)><>(Mean|99thPercentile)'
    name: kafka_network_requestmetrics_totaltimems
    labels:
      request: "$1"
      quantile: "$2"

  # Producer request rate
  - pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>OneMinuteRate'
    name: kafka_server_brokertopicmetrics_bytesinpersec

  # Log size per topic-partizione
  - pattern: 'kafka.log<type=Log, name=Size, topic=(.+), partition=(\d+)><>Value'
    name: kafka_log_log_size
    labels:
      topic: "$1"
      partition: "$2"

  # JVM heap usage
  - pattern: 'java.lang<type=Memory><>HeapMemoryUsage'
    name: jvm_heap_memory_usage
    type: GAUGE

ブローカーで JMX エクスポーターを有効にする

# Modificare kafka-server-start.sh oppure impostare la variabile d'ambiente
# Aggiungere al file bin/kafka-server-start.sh o configurare systemd

# Opzione 1: variabile d'ambiente KAFKA_OPTS
export KAFKA_OPTS="-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"

# Opzione 2: in systemd service file
# [Service]
# Environment="KAFKA_OPTS=-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"

# Verificare che l'endpoint funzioni dopo il riavvio
curl http://localhost:9404/metrics | grep kafka_server_replicamanager

消費者のラグ: 最も重要な指標

Il 消費者の遅れ 消費者がまだ処理しなければならないレコードの数を測定します。これは、 の ログ終了オフセット (トピックに書き込まれた最後のレコード) と コミットされたオフセット (コンシューマ グループによって処理された最後のレコード)。 遅延の増加は、消費者が生産の速度に追いついていないことを示しています。

ラグだけではブローカーの JMX メトリックは公開されません。コンシューマー グループ コーディネーターにクエリを実行して収集する必要があります。 このための標準ツールは次のとおりです Kafka ラグ エクスポーター (Lightbend/オープンソース プロジェクト) またはプラグイン kafka_consumer_group プロメテウス自身のこと。

# Docker Compose per Kafka Lag Exporter (alternativa moderna)
# https://github.com/seglo/kafka-lag-exporter

services:
  kafka-lag-exporter:
    image: seglo/kafka-lag-exporter:0.8.0
    ports:
      - "8000:8000"
    volumes:
      - ./lag-exporter.conf:/opt/docker/conf/application.conf
    environment:
      JAVA_OPTS: "-Xmx256m"
# lag-exporter.conf (formato HOCON)
kafka-lag-exporter {
  port = 8000

  clusters = [
    {
      name = "production-cluster"
      bootstrap-brokers = "kafka1:9092,kafka2:9092,kafka3:9092"

      # Polling interval
      poll-interval = 30 seconds

      # Consumer groups da monitorare (regex, vuoto = tutti)
      consumer-group-whitelist = [".*"]
    }
  ]

  # Metriche Prometheus esposte:
  # kafka_consumer_group_latest_offset
  # kafka_consumer_group_partition_lag
  # kafka_consumer_group_sum_lag  <-- lag totale per group
  # kafka_consumer_group_max_lag  <-- lag massimo tra le partizioni
}

PromQL でラグを計算する

# Query PromQL per Grafana / Prometheus

# Lag totale per consumer group
sum(kafka_consumer_group_partition_lag) by (group)

# Top 10 consumer groups per lag
topk(10, sum(kafka_consumer_group_partition_lag) by (group))

# Tasso di crescita del lag (se positivo, i consumer non stanno al passo)
rate(kafka_consumer_group_sum_lag[5m])

# Consumer lag per topic specifico
sum(kafka_consumer_group_partition_lag{topic="ordini-effettuati"}) by (group)

# Alert: lag sopra soglia critica per piu di 5 minuti
# kafka_consumer_group_sum_lag{group="servizio-inventario"} > 10000

Kafka 用に Prometheus を構成する

プロメテウスはテンプレートを使用します 引く: エンドポイントからメトリクスを収集するのは Prometheus です 設定可能な間隔 (scrape_interval)。 構成では、 ターゲットをスクレイピング: クエリする HTTP エンドポイント。

# prometheus.yml - configurazione per scraping Kafka
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "kafka-alerts.yml"

scrape_configs:
  # JMX Exporter su ogni broker Kafka
  - job_name: "kafka-brokers"
    static_configs:
      - targets:
          - "kafka1:9404"
          - "kafka2:9404"
          - "kafka3:9404"
    relabel_configs:
      - source_labels: [__address__]
        target_label: broker
        regex: "([^:]+):.*"
        replacement: "$1"

  # Kafka Lag Exporter per consumer group metrics
  - job_name: "kafka-lag-exporter"
    static_configs:
      - targets:
          - "kafka-lag-exporter:8000"
    scrape_interval: 30s  # lag cambia meno frequentemente

  # JVM metrics dei broker (se abilitato jvm_exporter separato)
  - job_name: "kafka-jvm"
    static_configs:
      - targets:
          - "kafka1:9404"
          - "kafka2:9404"
          - "kafka3:9404"
    metrics_path: /metrics
    params:
      module: [jvm]

アラート ルール Kafka: 重要なルール

アラート ルールは、通知をトリガーする条件を定義します。プロメテウスでは彼らは自分自身を表現します PromQL と統合されており、別の YAML ファイルにバンドルされています。運用環境における Kafka の基本ルールは次のとおりです。

# kafka-alerts.yml - Regole di alert per Prometheus Alertmanager
groups:
  - name: kafka.critical
    rules:
      # Broker down: nessun dato dall'endpoint JMX
      - alert: KafkaBrokerDown
        expr: up{job="kafka-brokers"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka broker {{ $labels.broker }} non raggiungibile"
          description: "Il broker {{ $labels.broker }} non risponde da piu di 1 minuto."

      # Partizioni offline: CRITICO, dati non accessibili
      - alert: KafkaOfflinePartitions
        expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
        for: 30s
        labels:
          severity: critical
        annotations:
          summary: "{{ $value }} partizioni offline in Kafka"
          description: "Partizioni senza leader: dati non leggibili/scrivibili."

      # Nessun active controller
      - alert: KafkaNoActiveController
        expr: sum(kafka_controller_kafkacontroller_activecontrollercount) != 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Kafka senza controller attivo"

  - name: kafka.warning
    rules:
      # Under-replicated partitions: replica degradata
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replicamanager_underreplicatedpartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "{{ $value }} partizioni under-replicated su {{ $labels.broker }}"
          description: "Replica degradata: un broker potrebbe essere lento o down."

      # Consumer lag critico
      - alert: KafkaConsumerLagHigh
        expr: sum(kafka_consumer_group_partition_lag) by (group) > 50000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Consumer lag alto per gruppo {{ $labels.group }}"
          description: "Lag: {{ $value }}. I consumer non stanno al passo con la produzione."

      # Consumer lag critico prolungato
      - alert: KafkaConsumerLagCritical
        expr: sum(kafka_consumer_group_partition_lag) by (group) > 200000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Consumer lag CRITICO per gruppo {{ $labels.group }}"

      # JVM heap usage alta
      - alert: KafkaBrokerHeapHigh
        expr: jvm_heap_memory_usage{area="used"} / jvm_heap_memory_usage{area="max"} > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Heap JVM alta su broker Kafka {{ $labels.broker }}"

Kafka 用 Grafana ダッシュボード

Grafana は、Prometheus の標準の視覚化ツールです。すぐにインポートできるダッシュボードがあります Grafana ダッシュボード ハブ (grafana.com/dashboards) から直接。 Kafka で最もよく使用されるものは次のとおりです。

  • ダッシュボードID 7589: Kafka の概要 (ブローカーの健全性、スループット、パーティション)
  • ダッシュボードID 9021: Confluent プラットフォーム Kafka メトリクス
  • ダッシュボードID 13282: Kafka Consumer Lag (詳細なコンシューマ グループ ラグ)

ダッシュボードを Grafana にインポートするには: ダッシュボード → インポート → ID を入力 → Prometheus データソースを選択.

ダッシュボードに含めるキーパネル

# Pannello 1: Broker Health Overview
# Stat panel: up/down per ogni broker
up{job="kafka-brokers"}

# Pannello 2: Under-Replicated Partitions
# Stat panel con soglia colorata (verde=0, rosso>0)
sum(kafka_server_replicamanager_underreplicatedpartitions)

# Pannello 3: Consumer Lag per Group (time series)
sum(kafka_consumer_group_partition_lag) by (group)

# Pannello 4: Messages In/Out per secondo
rate(kafka_server_brokertopicmetrics_messagesinpersec[1m])

# Pannello 5: Request Latency p99 (heatmap o time series)
kafka_network_requestmetrics_totaltimems{quantile="99thPercentile"}

# Pannello 6: Disk Usage per Broker
# Utile per pianificare l'espansione storage
node_filesystem_size_bytes{mountpoint="/kafka-data"} -
node_filesystem_avail_bytes{mountpoint="/kafka-data"}

Docker Compose: 完全なスタック監視

開発環境またはステージング環境の場合、この Docker Compose は監視スタック全体を起動します。 JMX エクスポーター (ブローカーに統合)、Kafka Lag エクスポーター、Prometheus、および Grafana。

# docker-compose.monitoring.yml
version: "3.9"

services:
  kafka:
    image: apache/kafka:4.0.0
    container_name: kafka
    ports:
      - "9092:9092"
      - "9404:9404"   # JMX Exporter
    volumes:
      - ./config/kafka-jmx-exporter.yml:/opt/kafka-jmx-exporter.yml
      - ./libs/jmx_prometheus_javaagent.jar:/opt/jmx_prometheus_javaagent.jar
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
      CLUSTER_ID: "monitoring-demo-cluster"
      KAFKA_OPTS: "-javaagent:/opt/jmx_prometheus_javaagent.jar=9404:/opt/kafka-jmx-exporter.yml"

  kafka-lag-exporter:
    image: seglo/kafka-lag-exporter:0.8.0
    container_name: kafka-lag-exporter
    ports:
      - "8000:8000"
    volumes:
      - ./config/lag-exporter.conf:/opt/docker/conf/application.conf
    depends_on:
      - kafka

  prometheus:
    image: prom/prometheus:v2.50.1
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./config/kafka-alerts.yml:/etc/prometheus/kafka-alerts.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"
      - "--storage.tsdb.retention.time=30d"

  grafana:
    image: grafana/grafana:10.3.1
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: "kafka-monitoring"
      GF_USERS_ALLOW_SIGN_UP: "false"
    volumes:
      - grafana-data:/var/lib/grafana
      - ./config/grafana-datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yml
      - ./config/grafana-dashboards.yml:/etc/grafana/provisioning/dashboards/kafka.yml

volumes:
  grafana-data:

Java を使用したコンシューマ ラグ モニタリング (プログラムによる)

場合によっては、アプリケーション コード内で消費者のラグを直接測定すると便利です。 たとえば、Spring Boot アプリケーションでマイクロメーター/アクチュエーターを介してメトリックを公開する場合などです。 このアプローチは外部モニタリングを補完し、より正確な相関関係を可能にします。

// ConsumerLagMonitor.java - Misura il lag programmaticamente con Kafka AdminClient
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ConsumerLagMonitor {

    private final AdminClient adminClient;
    private final String groupId;

    public ConsumerLagMonitor(String bootstrapServers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        this.adminClient = AdminClient.create(props);
        this.groupId = groupId;
    }

    public Map<TopicPartition, Long> getLagPerPartition() throws ExecutionException, InterruptedException {
        // 1. Recupera committed offsets del consumer group
        ListConsumerGroupOffsetsResult offsetsResult =
            adminClient.listConsumerGroupOffsets(groupId);

        Map<TopicPartition, OffsetAndMetadata> committedOffsets =
            offsetsResult.partitionsToOffsetAndMetadata().get();

        // 2. Recupera l'end offset (Log End Offset) per le stesse partizioni
        Map<TopicPartition, OffsetSpec> latestOffsetRequest = committedOffsets.keySet().stream()
            .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));

        ListOffsetsResult latestOffsets = adminClient.listOffsets(latestOffsetRequest);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
            latestOffsets.all().get();

        // 3. Calcola il lag = endOffset - committedOffset
        Map<TopicPartition, Long> lag = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            long committedOffset = entry.getValue().offset();
            long endOffset = endOffsets.get(tp).offset();
            lag.put(tp, Math.max(0, endOffset - committedOffset));
        }

        return lag;
    }

    public long getTotalLag() throws ExecutionException, InterruptedException {
        return getLagPerPartition().values().stream()
            .mapToLong(Long::longValue)
            .sum();
    }

    public void close() {
        adminClient.close();
    }
}

Kafka を監視するためのベスト プラクティス

アンチパターン: 遅延が急増するたびにアラートを送信

コンシューマーラグは自然に変動します: 本番環境の一時的なピークまたはコンシューマー GC の一時停止 一時的な遅延が発生しますが、すぐに回復します。絶対しきい値を使用せずにアラートを構成する 時間枠 (for: 10m) 一定の誤検知が生成されます。 アラートをトリガーする前に、常に最小期間 (5 ~ 10 分) を設定してください。

  • 絶対値ではなくトレンドを監視する: 100K の遅延の減少はそれほど問題ではありません 5K の遅れが増大しています。アメリカ合衆国 rate() PromQL で派生を確認します。
  • 人ごとにダッシュボードを分ける: SRE 向けの重要なアラートを備えた運用ダッシュボード インフラストラクチャ チーム向けのキャパシティ プランニング ダッシュボード、プロダクト マネージャー向けのビジネス KPI ダッシュボードです。
  • JVMメトリクスを含める: Kafka のパフォーマンス問題の 30% は GC の圧力から発生します ブローカーについて。常に監視する jvm_gc_pause_seconds そしてヒープの使用量。
  • プロメテウスを少なくとも 30 日間保存する: 傾向分析と月次容量計画用。 長期保持の場合は、Thanos または Mimir を検討してください。
  • アラートをテストする amtool: アラート ルールが構文的に正しいことを確認してください 正しく、本番環境に入る前に予期されるテスト データでトリガーされることを確認します。

クラウド管理の Kafka: Confluent クラウド メトリクス API

Confluent Cloud またはマネージド Kafka サービス (AWS MSK、Aiven Kafka) を使用している場合、JMX メトリクスはサポートされません。 直接アクセスできます。この場合は、 メトリクスAPI クラウドプロバイダーの。 Confluent Cloud は、remote_write 経由で Prometheus 互換のメトリクス REST API を公開します。

# prometheus.yml per Confluent Cloud Metrics API
scrape_configs:
  - job_name: "confluent-cloud"
    honor_timestamps: true
    metrics_path: "/v2/metrics/cloud/export"
    scheme: https
    basic_auth:
      username: "<CONFLUENT_CLOUD_API_KEY>"
      password: "<CONFLUENT_CLOUD_API_SECRET>"
    static_configs:
      - targets:
          - "api.telemetry.confluent.cloud"
    params:
      resource.kafka.id:
        - "<CLUSTER_ID>"

# Per AWS MSK: usa il CloudWatch exporter
# https://github.com/prometheus/cloudwatch_exporter
scrape_configs:
  - job_name: "aws-msk"
    static_configs:
      - targets:
          - "cloudwatch-exporter:9106"

概要: Kafka 監視チェックリスト

  • すべてのブローカーで構成された JMX エクスポーター、ポート 9404
  • Kafka Lag Exporter またはコンシューマ グループ メトリクスの同等のもの
  • Prometheus のスクレイピング間隔はブローカーの場合は 15 秒、ラグ エクスポーターの場合は 30 秒です
  • 重大なアラート: ブローカーのダウン、オフラインのパーティション、アクティブなコントローラーなし
  • アラート警告: レプリケーションが不十分なパーティション、高いコンシューマ ラグ、ヒープ >85%
  • Grafana ダッシュボードがインポートされました (概要の場合は ID 7589、ラグの場合は ID 13282)
  • Prometheus の保存期間は最低 30 日
  • 誤検知を避けるため、アラートは最小期間 (5 分間) でテストされています

シリーズの次のステップ

監視を設定しました。次のステップは、問題が発生したときに何をすべきかを知ることです。

  • 第 10 条 – デッドレターキューとエラー処理: メッセージを管理するためのパターン DLQ、指数関数的再試行、ポイズン ピル検出など、処理に失敗したもの。
  • 第 11 条 – 本番環境における Kafka: クラスターのサイジングに関する完全な操作ガイド、 保持要素とレプリケーション要素の最適な構成、および災害復旧のための MirrorMaker 2。

他シリーズとの連携

  • Apache Kafka の基礎 (第 1 条): 消費者のラグの概念を理解する必要があります LEO、HW、コミットされたオフセットの違いについては、シリーズの最初の記事で説明しました。
  • イベント駆動型アーキテクチャ: 消費者の遅れを監視することも重要です AWS SQS および SNS を備えた EDA システムでは、同等のメトリクスはキューの深さです。