Kafka のモニタリング: JMX エクスポーター、Prometheus、および Grafana ダッシュボード
サイレント Kafka クラスターは健全なクラスターではありません。これは、ユーザーが注目していないクラスターです。の 消費者の遅れ それは成長します、 レプリケーションが不十分なパーティションが蓄積し、リクエストのレイテンシーが増加します。これらのシグナルは、数時間または数日単位で障害が発生することを予測します。 このガイドでは、完全な監視システム (JMX Exporter、Prometheus、Grafana、および実稼働環境のすべての Kafka クラスターに必要なアラート) を構成します。
Kafka モニタリングが重要な理由
Apache Kafka は回復力があるように設計されていますが、回復力は自動的には得られません。アクティブな可観測性が必要です。 過負荷のブローカー、本番環境に追いつけないコンシューマ グループ、またはレプリケーションが低下したパーティション 早期に検出しないと、データ損失やダウンタイムが発生する可能性があります。
Kafka で監視する 3 つの基本的な側面は次のとおりです。
- クラスターの健全性: アクティブなブローカー、アクティブなコントローラー、複製中のパーティション、オフラインのパーティション
- プロデューサーのパフォーマンス: リクエストレート、レコードエラーレート、リクエストレイテンシ、バッチサイズ
- 消費者のパフォーマンス: グループごとおよびパーティションごとのコンシューマ ラグ、コミット レート、リバランス頻度
最も重要な 5 つの Kafka メトリクス
- kafka.consumer.lag: LEO とコミットされたオフセットの差 - システムの健全性に関する最大の指標
- kafka.server.UnderReplicatedPartitions: ISR が不完全なパーティション — ブローカー信号の劣化
- kafka.server.OfflinePartitionsCount: リーダーなしパーティション - 重大なエラー、データにアクセスできない
- kafka.network.RequestMetrics.TotalTimeMs: リクエストの合計レイテンシ
- kafka.server.BrokerTopicMetrics.MessagesInPerSec: 取り込みスループット
Kafka がメトリクスを公開する方法: JMX
Kafka は、すべての内部メトリクスを次の方法で公開します。 JMX (Java 管理拡張機能)の標準フレームワーク
JVM アプリケーションの監視。各メトリクスは次のように識別されます。 MBean名 形式で
dominio:tipo=Tipo,nome=Metrica.
クラウドネイティブ環境で最もよく使用される監視システムである Prometheus に JMX メトリクスにアクセスできるようにするには、 あなたはそれを使用します JMXエクスポーター: JMX メトリクスを HTTP エンドポイントとして公開する Java エージェント Prometheus 形式 (テキストベース、プル モデル)。
JMX エクスポーターを Java エージェントとして構成する
JMX Exporter は JVM エージェントとして起動されます。パラメータを追加するだけです。 -javaagent Kafka ブローカー JVM に送信します。
公式 Prometheus リポジトリから JAR をダウンロードし、以下を指定する YAML 構成ファイルを作成します。
どの MBean を収集するか、およびそれらの名前を Prometheus メトリックに変更する方法。
# Scaricare JMX Exporter (ultima versione stabile)
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/1.0.1/jmx_prometheus_javaagent-1.0.1.jar \
-O /opt/kafka/libs/jmx_prometheus_javaagent.jar
# File di configurazione: /opt/kafka/config/kafka-jmx-exporter.yml
# Questo file dice a JMX Exporter quali MBean raccogliere
# kafka-jmx-exporter.yml
# Configurazione completa per Kafka broker + consumer group lag
---
startDelaySeconds: 0
ssl: false
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Metriche Broker: throughput messaggi
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>OneMinuteRate'
name: kafka_server_brokertopicmetrics_messagesinpersec
labels:
topic: "$1"
# Under-replicated partitions: CRITICO
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_server_replicamanager_underreplicatedpartitions
# Offline partitions: CRITICO
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_controller_kafkacontroller_offlinepartitionscount
# Active controller count (deve essere sempre 1)
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_kafkacontroller_activecontrollercount
# Request latency per tipo di request
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(\w+)><>(Mean|99thPercentile)'
name: kafka_network_requestmetrics_totaltimems
labels:
request: "$1"
quantile: "$2"
# Producer request rate
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>OneMinuteRate'
name: kafka_server_brokertopicmetrics_bytesinpersec
# Log size per topic-partizione
- pattern: 'kafka.log<type=Log, name=Size, topic=(.+), partition=(\d+)><>Value'
name: kafka_log_log_size
labels:
topic: "$1"
partition: "$2"
# JVM heap usage
- pattern: 'java.lang<type=Memory><>HeapMemoryUsage'
name: jvm_heap_memory_usage
type: GAUGE
ブローカーで JMX エクスポーターを有効にする
# Modificare kafka-server-start.sh oppure impostare la variabile d'ambiente
# Aggiungere al file bin/kafka-server-start.sh o configurare systemd
# Opzione 1: variabile d'ambiente KAFKA_OPTS
export KAFKA_OPTS="-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"
# Opzione 2: in systemd service file
# [Service]
# Environment="KAFKA_OPTS=-javaagent:/opt/kafka/libs/jmx_prometheus_javaagent.jar=9404:/opt/kafka/config/kafka-jmx-exporter.yml"
# Verificare che l'endpoint funzioni dopo il riavvio
curl http://localhost:9404/metrics | grep kafka_server_replicamanager
消費者のラグ: 最も重要な指標
Il 消費者の遅れ 消費者がまだ処理しなければならないレコードの数を測定します。これは、 の ログ終了オフセット (トピックに書き込まれた最後のレコード) と コミットされたオフセット (コンシューマ グループによって処理された最後のレコード)。 遅延の増加は、消費者が生産の速度に追いついていないことを示しています。
ラグだけではブローカーの JMX メトリックは公開されません。コンシューマー グループ コーディネーターにクエリを実行して収集する必要があります。 このための標準ツールは次のとおりです Kafka ラグ エクスポーター (Lightbend/オープンソース プロジェクト) またはプラグイン kafka_consumer_group プロメテウス自身のこと。
# Docker Compose per Kafka Lag Exporter (alternativa moderna)
# https://github.com/seglo/kafka-lag-exporter
services:
kafka-lag-exporter:
image: seglo/kafka-lag-exporter:0.8.0
ports:
- "8000:8000"
volumes:
- ./lag-exporter.conf:/opt/docker/conf/application.conf
environment:
JAVA_OPTS: "-Xmx256m"
# lag-exporter.conf (formato HOCON)
kafka-lag-exporter {
port = 8000
clusters = [
{
name = "production-cluster"
bootstrap-brokers = "kafka1:9092,kafka2:9092,kafka3:9092"
# Polling interval
poll-interval = 30 seconds
# Consumer groups da monitorare (regex, vuoto = tutti)
consumer-group-whitelist = [".*"]
}
]
# Metriche Prometheus esposte:
# kafka_consumer_group_latest_offset
# kafka_consumer_group_partition_lag
# kafka_consumer_group_sum_lag <-- lag totale per group
# kafka_consumer_group_max_lag <-- lag massimo tra le partizioni
}
PromQL でラグを計算する
# Query PromQL per Grafana / Prometheus
# Lag totale per consumer group
sum(kafka_consumer_group_partition_lag) by (group)
# Top 10 consumer groups per lag
topk(10, sum(kafka_consumer_group_partition_lag) by (group))
# Tasso di crescita del lag (se positivo, i consumer non stanno al passo)
rate(kafka_consumer_group_sum_lag[5m])
# Consumer lag per topic specifico
sum(kafka_consumer_group_partition_lag{topic="ordini-effettuati"}) by (group)
# Alert: lag sopra soglia critica per piu di 5 minuti
# kafka_consumer_group_sum_lag{group="servizio-inventario"} > 10000
Kafka 用に Prometheus を構成する
プロメテウスはテンプレートを使用します 引く: エンドポイントからメトリクスを収集するのは Prometheus です
設定可能な間隔 (scrape_interval)。
構成では、 ターゲットをスクレイピング: クエリする HTTP エンドポイント。
# prometheus.yml - configurazione per scraping Kafka
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "kafka-alerts.yml"
scrape_configs:
# JMX Exporter su ogni broker Kafka
- job_name: "kafka-brokers"
static_configs:
- targets:
- "kafka1:9404"
- "kafka2:9404"
- "kafka3:9404"
relabel_configs:
- source_labels: [__address__]
target_label: broker
regex: "([^:]+):.*"
replacement: "$1"
# Kafka Lag Exporter per consumer group metrics
- job_name: "kafka-lag-exporter"
static_configs:
- targets:
- "kafka-lag-exporter:8000"
scrape_interval: 30s # lag cambia meno frequentemente
# JVM metrics dei broker (se abilitato jvm_exporter separato)
- job_name: "kafka-jvm"
static_configs:
- targets:
- "kafka1:9404"
- "kafka2:9404"
- "kafka3:9404"
metrics_path: /metrics
params:
module: [jvm]
アラート ルール Kafka: 重要なルール
アラート ルールは、通知をトリガーする条件を定義します。プロメテウスでは彼らは自分自身を表現します PromQL と統合されており、別の YAML ファイルにバンドルされています。運用環境における Kafka の基本ルールは次のとおりです。
# kafka-alerts.yml - Regole di alert per Prometheus Alertmanager
groups:
- name: kafka.critical
rules:
# Broker down: nessun dato dall'endpoint JMX
- alert: KafkaBrokerDown
expr: up{job="kafka-brokers"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka broker {{ $labels.broker }} non raggiungibile"
description: "Il broker {{ $labels.broker }} non risponde da piu di 1 minuto."
# Partizioni offline: CRITICO, dati non accessibili
- alert: KafkaOfflinePartitions
expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
for: 30s
labels:
severity: critical
annotations:
summary: "{{ $value }} partizioni offline in Kafka"
description: "Partizioni senza leader: dati non leggibili/scrivibili."
# Nessun active controller
- alert: KafkaNoActiveController
expr: sum(kafka_controller_kafkacontroller_activecontrollercount) != 1
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka senza controller attivo"
- name: kafka.warning
rules:
# Under-replicated partitions: replica degradata
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "{{ $value }} partizioni under-replicated su {{ $labels.broker }}"
description: "Replica degradata: un broker potrebbe essere lento o down."
# Consumer lag critico
- alert: KafkaConsumerLagHigh
expr: sum(kafka_consumer_group_partition_lag) by (group) > 50000
for: 10m
labels:
severity: warning
annotations:
summary: "Consumer lag alto per gruppo {{ $labels.group }}"
description: "Lag: {{ $value }}. I consumer non stanno al passo con la produzione."
# Consumer lag critico prolungato
- alert: KafkaConsumerLagCritical
expr: sum(kafka_consumer_group_partition_lag) by (group) > 200000
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer lag CRITICO per gruppo {{ $labels.group }}"
# JVM heap usage alta
- alert: KafkaBrokerHeapHigh
expr: jvm_heap_memory_usage{area="used"} / jvm_heap_memory_usage{area="max"} > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "Heap JVM alta su broker Kafka {{ $labels.broker }}"
Kafka 用 Grafana ダッシュボード
Grafana は、Prometheus の標準の視覚化ツールです。すぐにインポートできるダッシュボードがあります Grafana ダッシュボード ハブ (grafana.com/dashboards) から直接。 Kafka で最もよく使用されるものは次のとおりです。
- ダッシュボードID 7589: Kafka の概要 (ブローカーの健全性、スループット、パーティション)
- ダッシュボードID 9021: Confluent プラットフォーム Kafka メトリクス
- ダッシュボードID 13282: Kafka Consumer Lag (詳細なコンシューマ グループ ラグ)
ダッシュボードを Grafana にインポートするには: ダッシュボード → インポート → ID を入力 → Prometheus データソースを選択.
ダッシュボードに含めるキーパネル
# Pannello 1: Broker Health Overview
# Stat panel: up/down per ogni broker
up{job="kafka-brokers"}
# Pannello 2: Under-Replicated Partitions
# Stat panel con soglia colorata (verde=0, rosso>0)
sum(kafka_server_replicamanager_underreplicatedpartitions)
# Pannello 3: Consumer Lag per Group (time series)
sum(kafka_consumer_group_partition_lag) by (group)
# Pannello 4: Messages In/Out per secondo
rate(kafka_server_brokertopicmetrics_messagesinpersec[1m])
# Pannello 5: Request Latency p99 (heatmap o time series)
kafka_network_requestmetrics_totaltimems{quantile="99thPercentile"}
# Pannello 6: Disk Usage per Broker
# Utile per pianificare l'espansione storage
node_filesystem_size_bytes{mountpoint="/kafka-data"} -
node_filesystem_avail_bytes{mountpoint="/kafka-data"}
Docker Compose: 完全なスタック監視
開発環境またはステージング環境の場合、この Docker Compose は監視スタック全体を起動します。 JMX エクスポーター (ブローカーに統合)、Kafka Lag エクスポーター、Prometheus、および Grafana。
# docker-compose.monitoring.yml
version: "3.9"
services:
kafka:
image: apache/kafka:4.0.0
container_name: kafka
ports:
- "9092:9092"
- "9404:9404" # JMX Exporter
volumes:
- ./config/kafka-jmx-exporter.yml:/opt/kafka-jmx-exporter.yml
- ./libs/jmx_prometheus_javaagent.jar:/opt/jmx_prometheus_javaagent.jar
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
CLUSTER_ID: "monitoring-demo-cluster"
KAFKA_OPTS: "-javaagent:/opt/jmx_prometheus_javaagent.jar=9404:/opt/kafka-jmx-exporter.yml"
kafka-lag-exporter:
image: seglo/kafka-lag-exporter:0.8.0
container_name: kafka-lag-exporter
ports:
- "8000:8000"
volumes:
- ./config/lag-exporter.conf:/opt/docker/conf/application.conf
depends_on:
- kafka
prometheus:
image: prom/prometheus:v2.50.1
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
- ./config/kafka-alerts.yml:/etc/prometheus/kafka-alerts.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.retention.time=30d"
grafana:
image: grafana/grafana:10.3.1
container_name: grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: "kafka-monitoring"
GF_USERS_ALLOW_SIGN_UP: "false"
volumes:
- grafana-data:/var/lib/grafana
- ./config/grafana-datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yml
- ./config/grafana-dashboards.yml:/etc/grafana/provisioning/dashboards/kafka.yml
volumes:
grafana-data:
Java を使用したコンシューマ ラグ モニタリング (プログラムによる)
場合によっては、アプリケーション コード内で消費者のラグを直接測定すると便利です。 たとえば、Spring Boot アプリケーションでマイクロメーター/アクチュエーターを介してメトリックを公開する場合などです。 このアプローチは外部モニタリングを補完し、より正確な相関関係を可能にします。
// ConsumerLagMonitor.java - Misura il lag programmaticamente con Kafka AdminClient
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ConsumerLagMonitor {
private final AdminClient adminClient;
private final String groupId;
public ConsumerLagMonitor(String bootstrapServers, String groupId) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
this.adminClient = AdminClient.create(props);
this.groupId = groupId;
}
public Map<TopicPartition, Long> getLagPerPartition() throws ExecutionException, InterruptedException {
// 1. Recupera committed offsets del consumer group
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(groupId);
Map<TopicPartition, OffsetAndMetadata> committedOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
// 2. Recupera l'end offset (Log End Offset) per le stesse partizioni
Map<TopicPartition, OffsetSpec> latestOffsetRequest = committedOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
ListOffsetsResult latestOffsets = adminClient.listOffsets(latestOffsetRequest);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
latestOffsets.all().get();
// 3. Calcola il lag = endOffset - committedOffset
Map<TopicPartition, Long> lag = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
long committedOffset = entry.getValue().offset();
long endOffset = endOffsets.get(tp).offset();
lag.put(tp, Math.max(0, endOffset - committedOffset));
}
return lag;
}
public long getTotalLag() throws ExecutionException, InterruptedException {
return getLagPerPartition().values().stream()
.mapToLong(Long::longValue)
.sum();
}
public void close() {
adminClient.close();
}
}
Kafka を監視するためのベスト プラクティス
アンチパターン: 遅延が急増するたびにアラートを送信
コンシューマーラグは自然に変動します: 本番環境の一時的なピークまたはコンシューマー GC の一時停止
一時的な遅延が発生しますが、すぐに回復します。絶対しきい値を使用せずにアラートを構成する
時間枠 (for: 10m) 一定の誤検知が生成されます。
アラートをトリガーする前に、常に最小期間 (5 ~ 10 分) を設定してください。
-
絶対値ではなくトレンドを監視する: 100K の遅延の減少はそれほど問題ではありません
5K の遅れが増大しています。アメリカ合衆国
rate()PromQL で派生を確認します。 - 人ごとにダッシュボードを分ける: SRE 向けの重要なアラートを備えた運用ダッシュボード インフラストラクチャ チーム向けのキャパシティ プランニング ダッシュボード、プロダクト マネージャー向けのビジネス KPI ダッシュボードです。
-
JVMメトリクスを含める: Kafka のパフォーマンス問題の 30% は GC の圧力から発生します
ブローカーについて。常に監視する
jvm_gc_pause_secondsそしてヒープの使用量。 - プロメテウスを少なくとも 30 日間保存する: 傾向分析と月次容量計画用。 長期保持の場合は、Thanos または Mimir を検討してください。
-
アラートをテストする
amtool: アラート ルールが構文的に正しいことを確認してください 正しく、本番環境に入る前に予期されるテスト データでトリガーされることを確認します。
クラウド管理の Kafka: Confluent クラウド メトリクス API
Confluent Cloud またはマネージド Kafka サービス (AWS MSK、Aiven Kafka) を使用している場合、JMX メトリクスはサポートされません。 直接アクセスできます。この場合は、 メトリクスAPI クラウドプロバイダーの。 Confluent Cloud は、remote_write 経由で Prometheus 互換のメトリクス REST API を公開します。
# prometheus.yml per Confluent Cloud Metrics API
scrape_configs:
- job_name: "confluent-cloud"
honor_timestamps: true
metrics_path: "/v2/metrics/cloud/export"
scheme: https
basic_auth:
username: "<CONFLUENT_CLOUD_API_KEY>"
password: "<CONFLUENT_CLOUD_API_SECRET>"
static_configs:
- targets:
- "api.telemetry.confluent.cloud"
params:
resource.kafka.id:
- "<CLUSTER_ID>"
# Per AWS MSK: usa il CloudWatch exporter
# https://github.com/prometheus/cloudwatch_exporter
scrape_configs:
- job_name: "aws-msk"
static_configs:
- targets:
- "cloudwatch-exporter:9106"
概要: Kafka 監視チェックリスト
- すべてのブローカーで構成された JMX エクスポーター、ポート 9404
- Kafka Lag Exporter またはコンシューマ グループ メトリクスの同等のもの
- Prometheus のスクレイピング間隔はブローカーの場合は 15 秒、ラグ エクスポーターの場合は 30 秒です
- 重大なアラート: ブローカーのダウン、オフラインのパーティション、アクティブなコントローラーなし
- アラート警告: レプリケーションが不十分なパーティション、高いコンシューマ ラグ、ヒープ >85%
- Grafana ダッシュボードがインポートされました (概要の場合は ID 7589、ラグの場合は ID 13282)
- Prometheus の保存期間は最低 30 日
- 誤検知を避けるため、アラートは最小期間 (5 分間) でテストされています
シリーズの次のステップ
監視を設定しました。次のステップは、問題が発生したときに何をすべきかを知ることです。
- 第 10 条 – デッドレターキューとエラー処理: メッセージを管理するためのパターン DLQ、指数関数的再試行、ポイズン ピル検出など、処理に失敗したもの。
- 第 11 条 – 本番環境における Kafka: クラスターのサイジングに関する完全な操作ガイド、 保持要素とレプリケーション要素の最適な構成、および災害復旧のための MirrorMaker 2。
他シリーズとの連携
- Apache Kafka の基礎 (第 1 条): 消費者のラグの概念を理解する必要があります LEO、HW、コミットされたオフセットの違いについては、シリーズの最初の記事で説明しました。
- イベント駆動型アーキテクチャ: 消費者の遅れを監視することも重要です AWS SQS および SNS を備えた EDA システムでは、同等のメトリクスはキューの深さです。







