MQTT'den InfluxDB'ye: Gerçek Zamanlı Enerji IoT Platformu
Küresel endüstriyel enerji IoT pazarı şunları aşıyor: 2025'te 22 milyar dolar ve 2030 yılına kadar %19,8'lik bir Bileşik Büyüme Oranı ile 54 milyara ulaşacak. Her biri 10 MW'lık fotovoltaik sistem ötesini üretir Günde 4 milyon ölçüm invertörlerden, radyasyon sensörlerinden, dizi ölçerler ve meteorolojik veri kaydediciler. Her rüzgar çiftliği titreşim verilerini ekler. motor kaportalarından gelen tork ve sıcaklık. Her EV şarj istasyonu bağlantı durumunu yayınlar, güç çıkışı ve konnektör sağlık durumu her 30 saniyede bir görüntülenir.
Buradaki zorluk bu verileri toplamak değil; bunu güvenilir, ölçeklenebilir ve gecikmeli bir şekilde yapmaktır. saniyenin altında, yıllar boyunca tarihselleştirmeyi sürdürme ve analitik sorgulama yapma yeteneği gerçek zamanlı. Sınırlı bant genişliğine sahip endüstriyel ağlar için oluşturulan MQTT protokolü, Enerji IIoT'nin ortak dili. InfluxDB ve zaman serisi depolama çözümü en çok bu alan için benimsenmiştir. Köprü olarak Telegraf ve katman olarak Grafana ile birlikte görselleştirme, küçük evsel sistemlerden kamu hizmetlerine kadar yönetim sağlayan bir yığın oluşturur çoklu gigawatt.
Bu makale mimariden eksiksiz bir enerji IoT platformu oluşturuyor Docker'a kadar her bileşenin ayrıntılı konfigürasyonu yoluyla uçtan uca 200 invertörlü 10 MW'lık bir fotovoltaik parkın çalışma kompozisyonu ve gerçek bir vaka çalışması. Her bölüm test edilmiş kodu ve üretim stratejilerini içerir.
Bu Makalede Neler Öğreneceksiniz?
- Uçtan uca mimari: sensörler → ağ geçidi → MQTT → Telegraf → InfluxDB → Grafana
- MQTT'nin ayrıntılı incelemesi: QoS, saklama, irade mesajları, enerji için konu ad alanı tasarımı
- Broker karşılaştırması: Ölçeklenebilirlik ve kümeleme için Mosquitto, EMQX ve HiveMQ karşılaştırması
- Telegraf: MQTT tüketici yapılandırması, işlemci hattı, InfluxDB çıkışı
- InfluxDB 3.x: paket tasarımı, saklama politikası, Flux sorguları ve görev zamanlaması
- Modbus RTU/TCP → Python ve pymodbus ile MQTT köprüsü
- Üretim yığını için Docker Compose'u tamamlayın
- Güvenlik: TLS karşılıklı, MQTT kimlik doğrulaması, InfluxDB belirteci, ağ segmentasyonu
- Örnek olay: 200 invertörlü 10 MW fotovoltaik park, 100K mesaj/sn
- Gelişmiş uyarı: Grafana, PagerDuty ve Slack entegrasyonu
EnergyTech Serisi - Makale Konumu
| # | Öğe | Seviye | Durum |
|---|---|---|---|
| 1 | OCPP 2.x Protokolü: EV Şarj Sistemleri Oluşturmak | Gelişmiş | Yayınlandı |
| 2 | DERMS Mimarisi: Milyonlarca Dağıtılmış Kaynağı Bir araya Getirme | Gelişmiş | Yayınlandı |
| 3 | ML ile Yenilenebilir Enerji Tahmini: Python LSTM | Gelişmiş | Yayınlandı |
| 4 | Şebeke Ölçeğinde Depolama için Pil Yönetim Sistemi | Gelişmiş | Yayınlandı |
| 5 | Yazılım Mühendisleri için IEC 61850: Akıllı Şebeke İletişimi | Gelişmiş | Yayınlandı |
| 6 | EV Şarj Yükü Dengeleme: Gerçek Zamanlı Algoritmalar | Gelişmiş | Yayınlandı |
| 7 | Buradasınız - MQTT'den InfluxDB'ye: Gerçek Zamanlı Enerji IoT Platformu | Gelişmiş | Akım |
| 8 | Karbon Muhasebe Yazılım Mimarisi: ESG Platformları | Gelişmiş | Sonraki |
| 9 | Enerji Altyapısı için Dijital İkiz: Gerçek Zamanlı Simülasyon | Gelişmiş | Yakında gelecek |
| 10 | P2P Enerji Ticareti için Blockchain: Akıllı Sözleşmeler ve Kısıtlamalar | Gelişmiş | Yakında gelecek |
Uçtan Uca Mimari: Sensörden Kontrol Paneline
Gerçek zamanlı enerji IoT platformunun mimarisi beş farklı seviyeye ayrılmıştır: her birinin belirli sorumlulukları ve farklı güvenilirlik gereksinimleri vardır. Anlamak Bu ayırma, ilk konfigürasyon satırını yazmadan önce gereklidir.
Seviye 1: Alan (Alan Katmanı)
En alt seviyede fiziksel cihazları buluyoruz: fotovoltaik invertörler, anemometreler, piranometreler, dizi ölçerler, PMU'lar (Fazör Ölçüm Birimleri), akıllı sayaçlar ve EV şarj istasyonları. Bu cihazların çoğu endüstriyel saha protokollerini konuşur: Modbus RTU RS-485'te, Modbus TCP Ethernet üzerinden, IEC 61850 trafo merkezlerinde, DNP3 şebeke şebekeleri için veya invertörler için SunSpec gibi özel protokoller.
Bu protokoller, modern IP veya bulut ile yerel olarak uyumlu değildir. Onların oylaması Tipik döngü 100 ms (PMU) ila 60 saniye (eski veri kaydedici) arasında değişir. için en uygun frekans bir fotovoltaik invertör ve elektriksel büyüklükler (voltaj, akım, güç) için 1-5 saniye ve termal büyüklükler (modül sıcaklığı, invertör sıcaklığı) için 30-60 saniye.
Seviye 2: Ağ Geçidi / Kenar (Kenar Katmanı)
Edge ağ geçidi ve çevirici: saha protokollerini okur ve MQTT üzerinden komisyoncuya yayınlar merkezi. Gömülü bir cihaz olabilir (Raspberry Pi 4, BeagleBone, Moxa ioThinx), birden fazla kurulum için entegre MQTT yığınına sahip bir PLC veya yerel bir uç sunucu (NUC, ODROID) büyük. Ağ geçidi üç kritik işlevi gerçekleştirir:
- Protokol çevirisi: Modbus → MQTT, IEC 61850 → MQTT, DNP3 → MQTT
- Yerel ara belleğe alma: WAN bağlantılarının kesilmesi sırasında verileri toplar (depola ve ilet)
- Kenar ön işleme: filtreleme, toplama, yerel anormallik tespiti
Katman 3: Mesaj Aracısı (Aktarım Katmanı)
MQTT komisyoncusu taşıma sisteminin kalbidir. Edge ağ geçitlerinden mesajlar alır, li abonelere (öncelikle Telegraf'a, aynı zamanda sistemler gibi diğer tüketicilere) dağıtım yapar SCADA veya gerçek zamanlı bildirimler). Broker seçimi ölçeğe bağlıdır: Mosquitto for tek veya küçük kurulumlar (10.000 bağlantının altında), dağıtım başına EMQX veya HiveMQ kurumsal ve çoklu site.
Seviye 4: Alma ve Depolama (Veri Katmanı)
Telegraf, MQTT konularına abone olur ve mesajları InfluxDB ölçümlerine dönüştürür. yapılandırılabilir işlemci boru hatları. InfluxDB, zaman serilerini saklamayla alır ve indeksler farklılaştırılmış politikalar: 30 günlük yüksek çözünürlüklü ham veriler, saatlik olarak toplanan 1 yıl, 5 yılın günlük toplamları. Flux görevleri zamanlama altörnekleme otomatik ve hesaplanmış uyarılar.
Seviye 5: Görselleştirme ve Uyarı (Sunum Katmanı)
Grafana, veri kaynağı eklentisi aracılığıyla InfluxDB'ye bağlanır ve gerçek zamanlı kontrol panelleri oluşturur her 5-30 saniyede bir güncelleyin. Grafana'nın uyarı sistemi sorguları değerlendirir periyodik olarak ve PagerDuty, Slack, e-posta veya özel web kancalarına bildirimler gönderin. eşikler aşılmıştır.
Komple Mimari Diyagram
Veri akışı şu yolu izler: İnvertörler/Sensörler (Modbus/SunSpec) → Ağ Geçidi Kenarı (Pymodbuslu Raspberry Pi/NUC) → MQTT Brokerı (Sivrisinek/EMQX) → Telgraf (MQTT Tüketici + JSON Ayrıştırıcı) → AkışDB (zaman serisi DB) → Grafana (kontrol paneli + uyarı). Buna paralel olarak bir Uyarı Yöneticisi InfluxDB'de Flux görevlerini her dakika değerlendirir ve yazabilir özel bir paketteki olaylar veya çağrı HTTP uç noktası.
MQTT Derinlemesine İnceleme: Enerji için Protokol ve Tasarım
MQTT 5.0 Temelleri
MQTT (Message Queuing Telemetri Aktarımı), tasarlanmış bir yayınlama-abone olma protokolüdür Sınırlı bant genişliğine ve yüksek gecikmeye sahip ağlar için. Sürüm 5.0 (OASIS Standardı RFC 2019) kurumsal uygulamalar için kritik özellikler ekler: oturumun sona erme aralığı, mesaj son kullanma aralığı, genişletilmiş neden kodları, mesajlardaki kullanıcı özellikleri, ile akış kontrolü üstbilgilerin boyutunu azaltmak için maksimum ve konu takma adlarını alın.
QoS 0, 1 ve 2: Enerji Verileri için Doğru Seçim
Hizmet Kalitesi, bir enerji MQTT sisteminin tasarımında en kritik seçimdir. Üç seçeneğin verim, ağ yükü ve Teslimat garantileri:
| QoS | Garanti | Tepegöz | Case Energy'yi kullanın |
|---|---|---|---|
| Hizmet Kalitesi 0 | En fazla bir kez (ateş et ve unut) | Minimum (1 RTT) | Yüksek frekanslı telemetri (1Hz+), zaman zaman kaybın kabul edilebilir olduğu veriler: ışınım, ortam sıcaklığı |
| Hizmet Kalitesi 1 | En az bir kez (kopyalarla) | Orta (2 RTT) | Üretim ölçümleri (kWh, kW), alarmlar, cihaz durumları - InfluxDB tarafından zaman damgası aracılığıyla yönetilen kopyalar |
| Hizmet Kalitesi 2 | Tam olarak ons | Yüksek (4 RTT) | Kontrol komutları (invertör ayar noktası, anahtar açma/kapama), finansal veriler (şebeke üzerinde satılan enerji) |
Tipik bir PV sistemi için pratik öneri şudur: Telemetri için QoS 0 meteorolojik (ışınlama, sıcaklık, rüzgar), üretim ve alarmlar için QoS 1, QoS 2 yalnızca kontrol komutları için. Bu seçim güvenilirliği ve verimi dengeler, aracının WAN bant genişliğini doyurmadan saniyede 50-100 bin mesajı yönetmesine olanak tanır.
Mesajları ve İrade Mesajlarını Saklayın
Enerji sistemleri için genellikle hafife alınan ancak kritik olan iki MQTT özelliği:
-
Mesajları sakla: Aracı, son mesajı keep=true ile saklar
her konu için. Yeni bir abone bağlandığında değeri anında alır
Bir sonraki yayını beklemeden güncel. Durum konuları için temel:
plant/PV001/statusmuhafaza ile her yeni kontrol panelinin görülmesini sağlar Sistemin durumu anında. -
Vasiyet mesajları (Son Vasiyet ve Vasiyet): Ağ geçidi bir mesaj yapılandırır
İstemcinin bağlantısı anormal bir şekilde kesilirse aracının otomatik olarak yayımladığı.
Ağ geçidi bağlantısının kesildiğini bildirmek için kullanılır: uzak sitenin Raspberry Pi'si düşerse,
komisyoncu yayınlıyor
plant/PV001/connectivity offlinehemen, oturumun zaman aşımını beklemeden.
Konu Enerji Sistemleri için Ad Alanı Tasarımı
Konu ad alanının tasarımı, her yönü etkileyen mimari bir karardır sistem: InfluxDB'de yönlendirme, filtreleme, ACL güvenliği ve organizasyon. İyi tasarlanmış bir hiyerarşik yapı şu modeli izler:
# Schema topic namespace per impianto energetico
# Struttura: {tipo_impianto}/{plant_id}/{subsystem}/{device_id}/{metric_type}
# === FOTOVOLTAICO ===
# Inverter string-level metrics
plant/PV001/inverter/INV001/metrics
plant/PV001/inverter/INV001/alarms
plant/PV001/inverter/INV001/status
# String-level measurements
plant/PV001/string/STR001/metrics
plant/PV001/string/STR001/metrics/voltage
plant/PV001/string/STR001/metrics/current
# Meteorological sensors
plant/PV001/weather/WS001/metrics
# Plant-level aggregates (calcolati dal gateway)
plant/PV001/totals/power_ac
plant/PV001/totals/energy_today
# === EOLICO ===
wind/WF001/turbine/T001/nacelle/metrics
wind/WF001/turbine/T001/blade/metrics
wind/WF001/turbine/T001/gearbox/temperature
wind/WF001/substation/SS001/metrics
# === STAZIONI DI RICARICA EV ===
ev/SITE001/charger/CHRG001/connector/1/metrics
ev/SITE001/charger/CHRG001/session/current
ev/SITE001/charger/CHRG001/status
ev/SITE001/totals/energy_dispensed
# === SISTEMA ===
# Heartbeat e connectivity
plant/PV001/gateway/GW001/heartbeat # ogni 60s
plant/PV001/gateway/GW001/status # retain=true
plant/PV001/gateway/GW001/version # retain=true
# === WILDCARD SUBSCRIPTIONS ===
# Telegraf sottoscrive tutti i metrics:
plant/+/inverter/+/metrics # tutti gli inverter di tutti gli impianti
plant/PV001/+/+/metrics # tutti i metrics dell'impianto PV001
plant/# # tutto il plant PV001 (usare con cautela)
Ad alanı için temel kurallar: her düzeyin anlamsal anlamı olmalıdır, kontrol konuları (komutlar) telemetri konularından (veriler) ayrılmalıdır, Tutarlılık için Snake_case kullanın, özel karakterlerden ve boşluklardan kaçının, derinliği sınırlayın Farklı broker ve müşterilerle uyumluluk için maksimum 6 seviyede.
MQTT Broker Karşılaştırması: Mosquitto vs EMQX vs HiveMQ
MQTT komisyoncusunun seçimi sistemin ölçeklenebilirliği ve güvenilirliği açısından temel öneme sahiptir. Enerji sektöründe en çok kullanılan üç broker birbirinden oldukça farklı özelliklere sahiptir.
| Özellikler | Sivrisinek 2.x | EMQX 5.x | HiveMQ 4.x |
|---|---|---|---|
| Maks. bağlantılar | 100K (tek düğüm) | 100M (23 düğümlü küme) | 200M (kümeler) |
| Kümeleme | Yerli yok | Evet (Erlang dağıtıldı) | Evet (kurumsal) |
| MQTT sürümü | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 |
| Kural Motoru | No | Evet (SQL tabanlı) | Evet (kurumsal) |
| Köprü | Si | Si | Si |
| Kimlik doğrulama | Dosyalar, TLS, eklentiler | JWT, OAuth2, LDAP, mTLS | JWT, OAuth2, kurumsal |
| Lisans | EPL/EDL (açık kaynak) | Apache 2.0 / Kurumsal | Topluluk / İşletme |
| RAM (100K bağlantı) | ~500MB | ~2GB | ~3GB |
| Durum enerjisini kullanın | Tek tesis, uç ağ geçidi, laboratuvar | Çok tesisli, hizmet ölçeğinde, kurumsal | Hizmet ölçeğinde, kurumsal uyumluluk |
Üretim için Sivrisinek Yapılandırması
# /etc/mosquitto/mosquitto.conf - Produzione con TLS e ACL
# Network listeners
listener 1883 localhost
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true # mutual TLS
use_identity_as_username true
# WebSocket listener per dashboard web
listener 9001
protocol websockets
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# Autenticazione
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistenza
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 60
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S
# Performance tuning
max_queued_messages 10000
max_packet_size 1048576 # 1MB
message_size_limit 262144 # 256KB per messaggio
max_connections 10000
max_keepalive 300
# /etc/mosquitto/acl - Access Control List per ruoli
# Telegraf: legge tutto
user telegraf_reader
topic read plant/#
topic read wind/#
topic read ev/#
# Gateway PV001: scrive solo per il suo impianto
user gateway_PV001
topic write plant/PV001/#
topic read plant/PV001/commands/#
# Gateway EV SITE001: scrive solo per il suo sito
user gateway_SITE001
topic write ev/SITE001/#
# Dashboard/SCADA: solo lettura
user dashboard_user
topic read plant/#
topic read wind/#
topic read ev/#
# Admin: tutto (solo per maintenance)
user mqtt_admin
topic #
Ağ Geçidi Kenarı: Köprü Modbus → Python ile MQTT
Edge ağ geçidi, endüstriyel saha protokolleri dünyasını birbirine bağlayan bileşendir MQTT ekosistemi ile. İşte fotovoltaik invertörler için eksiksiz bir uygulama WAN bağlantı kesintilerini yönetmek için yerel ara belleğe alma dahil Modbus TCP protokolü.
#!/usr/bin/env python3
"""
gateway_modbus_mqtt.py
Gateway edge per inverter fotovoltaici Modbus TCP → MQTT
Compatibile con registro SunSpec (standard de facto per inverter PV)
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import aiofiles
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import aiomqtt
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('gateway')
@dataclass
class InverterMetrics:
"""Metriche SunSpec inverter (Model 103 - Three Phase Inverter)"""
timestamp: str
plant_id: str
inverter_id: str
# AC output
ac_power_w: float # Potenza AC totale (W)
ac_voltage_l1_v: float # Tensione fase L1 (V)
ac_voltage_l2_v: float # Tensione fase L2 (V)
ac_voltage_l3_v: float # Tensione fase L3 (V)
ac_current_a: float # Corrente totale AC (A)
ac_frequency_hz: float # Frequenza rete (Hz)
# DC input
dc_voltage_v: float # Tensione DC ingresso (V)
dc_current_a: float # Corrente DC ingresso (A)
dc_power_w: float # Potenza DC ingresso (W)
# Energy
energy_wh_total: float # Energia prodotta totale (Wh)
energy_wh_today: float # Energia prodotta oggi (Wh)
# Thermal
temp_cabinet_c: float # Temperatura cabinet inverter (C)
temp_heatsink_c: float # Temperatura heatsink (C)
# Status
operating_state: int # 1=Off, 2=Sleeping, 3=Starting, 4=MPPT, 5=Throttled
error_code: int # Codice errore (0=no error)
# Efficiency
efficiency_pct: float # Efficienza conversione (%)
class ModbusMqttGateway:
"""
Gateway che legge metriche Modbus da inverter SunSpec
e le pubblica su MQTT con buffering locale per disconnessioni WAN
"""
def __init__(self, config: dict):
self.plant_id = config['plant_id']
self.inverters = config['inverters'] # lista di {id, host, port, unit_id}
self.mqtt_host = config['mqtt']['host']
self.mqtt_port = config['mqtt']['port']
self.mqtt_user = config['mqtt']['username']
self.mqtt_password = config['mqtt']['password']
self.poll_interval = config.get('poll_interval_sec', 5)
self.buffer_file = config.get('buffer_file', '/var/lib/gateway/buffer.jsonl')
self.clients: dict[str, AsyncModbusTcpClient] = {}
self._buffer: list[dict] = []
self._mqtt_connected = False
async def connect_inverter(self, inverter_cfg: dict) -> AsyncModbusTcpClient:
"""Crea e connette client Modbus TCP"""
client = AsyncModbusTcpClient(
host=inverter_cfg['host'],
port=inverter_cfg.get('port', 502),
timeout=5,
retries=3,
retry_on_empty=True,
)
await client.connect()
if not client.connected:
raise ConnectionError(
f"Cannot connect to inverter {inverter_cfg['id']} "
f"at {inverter_cfg['host']}:{inverter_cfg.get('port', 502)}"
)
logger.info(f"Connected to inverter {inverter_cfg['id']}")
return client
async def read_sunspec_model103(
self,
client: AsyncModbusTcpClient,
unit_id: int,
plant_id: str,
inverter_id: str
) -> Optional[InverterMetrics]:
"""
Legge registri SunSpec Model 103 (Three Phase Inverter)
SunSpec base address: 40001 (Modbus address 40000)
Model 103 start: 40070 (tipico, dipende da inverter)
"""
try:
# Leggi blocco principale Model 103: 40069 - 40094 (26 registri)
result = await client.read_holding_registers(
address=40069,
count=26,
slave=unit_id
)
if result.isError():
logger.error(f"Modbus error reading INV {inverter_id}: {result}")
return None
regs = result.registers
# Scala con SF (scale factors) SunSpec
ac_current_sf = self._sf(regs[10])
ac_voltage_sf = self._sf(regs[14])
ac_power_sf = self._sf(regs[17])
ac_freq_sf = self._sf(regs[19])
dc_current_sf = self._sf(regs[21])
dc_voltage_sf = self._sf(regs[23])
dc_power_sf = self._sf(regs[25])
# Leggi energia prodotta (registri 40093-40096)
energy_result = await client.read_holding_registers(
address=40093, count=4, slave=unit_id
)
energy_regs = energy_result.registers
energy_total_wh = ((energy_regs[0] << 16) | energy_regs[1]) * 1.0
# Leggi temperatura (registri 40103-40108)
temp_result = await client.read_holding_registers(
address=40103, count=3, slave=unit_id
)
temp_regs = temp_result.registers
ac_power_raw = self._signed(regs[16])
dc_power_w = self._signed(regs[24]) * (10 ** dc_power_sf)
efficiency = (ac_power_raw / dc_power_w * 100) if dc_power_w > 0 else 0.0
return InverterMetrics(
timestamp=datetime.now(timezone.utc).isoformat(),
plant_id=plant_id,
inverter_id=inverter_id,
ac_power_w=ac_power_raw * (10 ** ac_power_sf),
ac_voltage_l1_v=regs[11] * (10 ** ac_voltage_sf),
ac_voltage_l2_v=regs[12] * (10 ** ac_voltage_sf),
ac_voltage_l3_v=regs[13] * (10 ** ac_voltage_sf),
ac_current_a=(regs[7] + regs[8] + regs[9]) * (10 ** ac_current_sf),
ac_frequency_hz=regs[18] * (10 ** ac_freq_sf),
dc_voltage_v=self._signed(regs[22]) * (10 ** dc_voltage_sf),
dc_current_a=self._signed(regs[20]) * (10 ** dc_current_sf),
dc_power_w=dc_power_w,
energy_wh_total=energy_total_wh,
energy_wh_today=0.0, # da registro specifico vendor
temp_cabinet_c=self._signed(temp_regs[0]) * 0.01,
temp_heatsink_c=self._signed(temp_regs[1]) * 0.01,
operating_state=regs[0],
error_code=regs[1],
efficiency_pct=round(efficiency, 2),
)
except ModbusException as e:
logger.error(f"Modbus exception for {inverter_id}: {e}")
return None
def _signed(self, val: int) -> int:
"""Converte uint16 in int16 (signed)"""
return val if val < 32768 else val - 65536
def _sf(self, val: int) -> int:
"""Scale factor SunSpec: int16 nel range -10..10"""
s = self._signed(val)
return s if -10 <= s <= 10 else 0
async def publish_metrics(
self,
mqtt_client: aiomqtt.Client,
metrics: InverterMetrics
) -> None:
"""Pubblica metriche su MQTT con QoS 1"""
topic = f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/metrics"
payload = json.dumps(asdict(metrics), default=str)
await mqtt_client.publish(
topic=topic,
payload=payload.encode(),
qos=1,
retain=False,
)
# Pubblica anche stato con retain
status = {
"timestamp": metrics.timestamp,
"operating_state": metrics.operating_state,
"error_code": metrics.error_code,
"online": True,
}
await mqtt_client.publish(
topic=f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/status",
payload=json.dumps(status).encode(),
qos=1,
retain=True, # retain per status
)
async def run(self):
"""Loop principale del gateway"""
# Carica buffer da disco se esiste
await self._load_buffer()
# Will message: pubblica offline status se il gateway crolla
will = aiomqtt.Will(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": False, "reason": "unexpected_disconnect"}).encode(),
qos=1,
retain=True,
)
async with aiomqtt.Client(
hostname=self.mqtt_host,
port=self.mqtt_port,
username=self.mqtt_user,
password=self.mqtt_password,
will=will,
keepalive=60,
tls_params=aiomqtt.TLSParameters(
ca_certs='/etc/gateway/certs/ca.crt',
certfile='/etc/gateway/certs/gateway.crt',
keyfile='/etc/gateway/certs/gateway.key',
),
) as mqtt:
self._mqtt_connected = True
logger.info("MQTT connected")
# Pubblica online status con retain
await mqtt.publish(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": True}).encode(),
qos=1, retain=True,
)
# Svuota buffer offline
await self._flush_buffer(mqtt)
# Connetti a tutti gli inverter
for inv_cfg in self.inverters:
try:
self.clients[inv_cfg['id']] = await self.connect_inverter(inv_cfg)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
# Loop principale di polling
while True:
poll_start = time.monotonic()
tasks = [
self._poll_and_publish(inv_cfg, mqtt)
for inv_cfg in self.inverters
]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - poll_start
sleep_time = max(0, self.poll_interval - elapsed)
await asyncio.sleep(sleep_time)
async def _poll_and_publish(self, inv_cfg: dict, mqtt: aiomqtt.Client):
"""Polling singolo inverter e pubblicazione"""
inv_id = inv_cfg['id']
client = self.clients.get(inv_id)
if client is None or not client.connected:
try:
client = await self.connect_inverter(inv_cfg)
self.clients[inv_id] = client
except ConnectionError:
return
metrics = await self.read_sunspec_model103(
client, inv_cfg.get('unit_id', 1),
self.plant_id, inv_id
)
if metrics:
try:
await self.publish_metrics(mqtt, metrics)
except Exception as e:
logger.warning(f"MQTT publish failed, buffering: {e}")
self._buffer.append(asdict(metrics))
await self._save_buffer()
async def _load_buffer(self):
"""Carica buffer dal disco"""
try:
async with aiofiles.open(self.buffer_file) as f:
async for line in f:
self._buffer.append(json.loads(line))
logger.info(f"Loaded {len(self._buffer)} buffered messages")
except FileNotFoundError:
pass
async def _save_buffer(self):
"""Salva buffer su disco"""
async with aiofiles.open(self.buffer_file, 'w') as f:
for item in self._buffer:
await f.write(json.dumps(item) + '\n')
async def _flush_buffer(self, mqtt: aiomqtt.Client):
"""Pubblica messaggi bufferizzati"""
if not self._buffer:
return
logger.info(f"Flushing {len(self._buffer)} buffered messages")
published = []
for item in self._buffer:
try:
metrics = InverterMetrics(**item)
await self.publish_metrics(mqtt, metrics)
published.append(item)
except Exception as e:
logger.error(f"Failed to flush buffered message: {e}")
break
self._buffer = [i for i in self._buffer if i not in published]
await self._save_buffer()
if __name__ == '__main__':
config = {
'plant_id': 'PV001',
'poll_interval_sec': 5,
'mqtt': {
'host': '10.0.1.10',
'port': 8883,
'username': 'gateway_PV001',
'password': 'secret',
},
'inverters': [
{'id': 'INV001', 'host': '192.168.1.101', 'port': 502, 'unit_id': 1},
{'id': 'INV002', 'host': '192.168.1.102', 'port': 502, 'unit_id': 1},
# ... fino a INV200 per impianto da 10 MW
],
'buffer_file': '/var/lib/gateway/buffer.jsonl',
}
asyncio.run(ModbusMqttGateway(config).run())
Telegraf: MQTT Köprüsü → Dönüşüm Boru Hattı ile InfluxDB
Telegraf, InfluxData'nın 300'den fazla eklentiyle Go'da yazılmış veri toplayıcısıdır. Yığınımızda Telegraf, MQTT konularına abone olur, JSON yükünün kodunu çözer, İşlemci aracılığıyla dönüşümleri uygular ve InfluxDB'ye yazar. Sürüm 1.30+ MQTT 5.0'ı ve etiket olarak konu ayrıştırmayı yerel olarak destekler.
Telegraf yapılandırmasını tamamlayın
# telegraf.conf - Configurazione completa per piattaforma IoT energetica
# ============================================================
# AGENT SETTINGS
# ============================================================
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "1s"
precision = "1ns"
debug = false
quiet = false
logtarget = "file"
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "50MB"
logfile_rotation_max_archives = 5
# ============================================================
# OUTPUT: InfluxDB 2.x / 3.x
# ============================================================
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "plant_metrics_raw"
timeout = "5s"
# Tag per routing - usato per bucket separati per plant
[outputs.influxdb_v2.tagpass]
data_type = ["inverter_metrics", "weather_metrics", "string_metrics"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "ev_metrics_raw"
timeout = "5s"
[outputs.influxdb_v2.tagpass]
data_type = ["ev_metrics"]
# Output di backup su file locale (per audit e recovery)
[[outputs.file]]
files = ["/var/log/telegraf/metrics.jsonl"]
rotation_interval = "1h"
rotation_max_size = "500MB"
data_format = "json"
[outputs.file.tagpass]
error_code = ["*"] # solo metriche con errori
# ============================================================
# INPUT: MQTT Consumer - Inverter Fotovoltaici
# ============================================================
[[inputs.mqtt_consumer]]
name_override = "pv_inverter"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/inverter/+/metrics"]
qos = 1
connection_timeout = "30s"
max_undelivered_messages = 5000
# Autenticazione mTLS
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-energy-01"
persistent_session = true # QoS 1+ richiede sessione persistente
# Parsing topic come tag
# topic: plant/PV001/inverter/INV001/metrics
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/inverter/+/metrics"
measurement = "_/plant_id/_/inverter_id/_"
tags = "_/plant_id/_/inverter_id/_"
# Estrae plant_id=PV001, inverter_id=INV001 come tag
# Formato payload
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l1_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l2_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l3_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_frequency_hz"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_wh_total"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_cabinet_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_heatsink_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "operating_state"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "error_code"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "efficiency_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "plant_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "inverter_id"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05.999999999Z07:00" # RFC3339Nano
# INPUT: MQTT Consumer - Stazioni Meteo
[[inputs.mqtt_consumer]]
name_override = "pv_weather"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/weather/+/metrics"]
qos = 0 # QoS 0 per dati meteo (perdita occasionale accettabile)
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-weather-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/weather/+/metrics"
tags = "_/plant_id/_/station_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "irradiance_wm2"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temperature_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "wind_speed_ms"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "humidity_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05Z07:00"
# INPUT: MQTT Consumer - Stazioni EV
[[inputs.mqtt_consumer]]
name_override = "ev_charger"
servers = ["ssl://mosquitto:8883"]
topics = ["ev/+/charger/+/connector/+/metrics"]
qos = 1
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-ev-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "ev/+/charger/+/connector/+/metrics"
tags = "_/site_id/_/charger_id/_/connector_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "power_kw"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_kwh"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "status"
type = "string"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "site_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "charger_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "connector_id"
# ============================================================
# PROCESSORS: Trasformazione dati
# ============================================================
# Aggiungi tag data_type per routing output
[[processors.converter]]
namepass = ["pv_inverter"]
[processors.converter.tags]
string = ["data_type"]
[[processors.override]]
namepass = ["pv_inverter"]
[processors.override.tags]
data_type = "inverter_metrics"
[[processors.override]]
namepass = ["ev_charger"]
[processors.override.tags]
data_type = "ev_metrics"
# Filtra metriche con valori anomali (sanity check)
[[processors.dedup]]
dedup_interval = "1s" # evita duplicati QoS 1
[[processors.enum]]
namepass = ["pv_inverter"]
[[processors.enum.mapping]]
field = "operating_state"
dest = "operating_state_str"
[processors.enum.mapping.value_mappings]
1 = "off"
2 = "sleeping"
3 = "starting"
4 = "mppt"
5 = "throttled"
6 = "shutting_down"
7 = "fault"
8 = "standby"
# ============================================================
# MONITORING: Telegraf self-monitoring
# ============================================================
[[inputs.internal]]
collect_memstats = true
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "telegraf_monitoring"
[outputs.influxdb_v2.namepass]
internal = ["*"]
InfluxDB: Enerji için Zaman Serisi Depolama ve Sorgulama
Enerji Sistemleri için InfluxDB 2.x / 3.x Mimarisi
InfluxDB, enerji IoT için referans zaman serisi veritabanıdır. Sürüm 2.x (2025'te üretimde hala yaygın olarak kullanılmaktadır) sorgulama dili olarak Flux'u kullanır ve şunu sunar: kavramı kova entegre saklama politikasıyla. Sürüm 3.x, Rust'ta Apache Arrow ve DataFusion ile yeniden yazılan bu sürüm, birincil dil olarak SQL'i sunar ve yüksek kardinalite için ölçeklenebilirliği büyük ölçüde artırır.
Her 5 saniyede bir örnekleme yapan 200 invertörlü 10 MW'lık bir sistem için veri hacmi ve kayda değer: her invertör yaklaşık 15 alan üretir; bu da her 5 yılda bir toplam 3.000 alan anlamına gelir. saniye veya dakikada 216.000 veri noktası. Bir günde yaklaşık 311 milyon veri noktaları. 30 günlük saklama yaklaşık 9,3 milyar ham veri noktası gerektirir. Altörnekleme stratejisi önemlidir.
İlk Kurulum: Kovalar ve Organizasyon
#!/bin/bash
# setup_influxdb.sh - Configurazione iniziale InfluxDB per piattaforma energetica
INFLUX_HOST="http://localhost:8086"
INFLUX_TOKEN="my-super-secret-admin-token"
INFLUX_ORG="energy-corp"
# Setup iniziale (solo prima volta)
influx setup \
--host "$INFLUX_HOST" \
--username admin \
--password "SecurePassword123!" \
--org "$INFLUX_ORG" \
--bucket plant_metrics_raw \
--retention 720h \ # 30 giorni dati grezzi
--token "$INFLUX_TOKEN" \
--force
# Bucket dati grezzi PV (alta risoluzione: 30 giorni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_raw" \
--retention "720h" \
--shard-group-duration "24h" # shard giornalieri per query efficienti
# Bucket downsampled 1 ora (retention 1 anno)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1h" \
--retention "8760h" \
--shard-group-duration "168h" # shard settimanali
# Bucket downsampled 1 giorno (retention 5 anni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1d" \
--retention "43800h" \ # ~5 anni
--shard-group-duration "720h" # shard mensili
# Bucket per alerting e eventi
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "energy_alerts" \
--retention "8760h" # 1 anno per audit
# Bucket per stazioni EV
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "ev_metrics_raw" \
--retention "720h"
# Token con permessi limitati per Telegraf (write only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Telegraf write-only token" \
--write-bucket plant_metrics_raw \
--write-bucket ev_metrics_raw \
--write-bucket telegraf_monitoring
# Token per Grafana (read only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Grafana read-only token" \
--read-bucket plant_metrics_raw \
--read-bucket plant_metrics_1h \
--read-bucket plant_metrics_1d \
--read-bucket ev_metrics_raw \
--read-bucket energy_alerts
echo "InfluxDB setup completato"
Flux Görevleri: Alt Örnekleme ve Otomatik Uyarı
Flux Görevleri, Telegraf'ın InfluxDB içinde periyodik olarak yürüttüğü planlanmış işlerdir. Bunları üç amaç için kullanırız: ham verilerin otomatik alt örneklemesi, KPI hesaplaması toplamalar ve kova uyarılarına yazma ile anormallik tespiti.
// task_downsample_1h.flux
// Task: Downsampling dati inverter da 5s a 1h
// Schedulato ogni ora, processa ultima ora
option task = {
name: "downsample_pv_inverter_1h",
every: 1h,
offset: 5m, // aspetta 5 min per assicurarsi che tutti i dati siano arrivati
}
from(bucket: "plant_metrics_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) =>
r._field == "ac_power_w" or
r._field == "dc_power_w" or
r._field == "energy_wh_total" or
r._field == "ac_voltage_l1_v" or
r._field == "temp_cabinet_c" or
r._field == "efficiency_pct"
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables
|> mean(column: column), // media oraria
createEmpty: false
)
// Aggiungi statistiche aggiuntive per power
|> map(fn: (r) => ({r with _measurement: "pv_inverter_1h"}))
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_plant_kpi.flux
// Task: Calcolo KPI giornalieri per impianto
// Produccion, efficienza, performance ratio
option task = {
name: "calculate_plant_daily_kpi",
every: 15m,
}
// Potenza attiva totale per impianto (somma tutti inverter)
totalPower = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "total_ac_power_w",
}))
// Energia prodotta totale (max energy_wh_total - min energy_wh_total per periodo)
energyDelta = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "energy_wh_total")
|> difference()
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "energy_delta_wh",
}))
// Temperatura media inverter
avgTemp = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> mean()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "avg_inverter_temp_c",
}))
union(tables: [totalPower, energyDelta, avgTemp])
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_thermal_alert.flux
// Task: Rilevamento surriscaldamento inverter
// Alert se temperatura > 75°C per più di 5 minuti
option task = {
name: "thermal_alert_inverter",
every: 1m,
}
threshold = 75.0
from(bucket: "plant_metrics_raw")
|> range(start: -6m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> filter(fn: (r) => r._value > threshold)
// Conta quanti valori sopra soglia negli ultimi 6 min (= almeno 5 min)
|> aggregateWindow(every: 6m, fn: count, createEmpty: false)
|> filter(fn: (r) => r._value >= 60) // 60 campioni * 5sec = 5 min
|> map(fn: (r) => ({r with
_measurement: "energy_alert",
_field: "alert_type",
_value: "thermal_overtemperature",
severity: "high",
description: "Inverter temperature exceeds " + string(v: threshold) + "°C for > 5 minutes",
}))
|> to(bucket: "energy_alerts", org: "energy-corp")
Fotovoltaik Analitik için Sorgu Akısı
// query_production_analysis.flux
// Analisi produzione giornaliera per impianto PV001
// Confronto con irraggiamento (Performance Ratio)
import "math"
// Produzione AC ogni 5 minuti
production = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
|> group(columns: ["plant_id"])
|> sum() // somma tutti gli inverter per potenza totale impianto
// Irraggiamento dalla stazione meteo
irradiance = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// Join per calcolo Performance Ratio
// PR = (energia prodotta / (irraggiamento * superficie)) * 100
join(
tables: {production: production, irradiance: irradiance},
on: ["_time", "plant_id"]
)
// PV001: 10 MW peak, superficie pannelli ~50.000 m2
|> map(fn: (r) => ({r with
performance_ratio: if r.irradiance_irradiance_wm2 > 50.0 then
(r.production__value / (r.irradiance_irradiance_wm2 * 50000.0)) * 100.0
else
0.0,
}))
|> keep(columns: ["_time", "production__value", "irradiance_irradiance_wm2", "performance_ratio"])
Docker Compose: Tam Üretim Yığını
Aşağıdaki Docker Compose, üretim dağıtımının başlangıç noktasıdır. tam yığın. Her hizmetin durum denetimi, yeniden başlatma ilkesi ve birim yapılandırmaları vardır Güvenlik için kalıcı ve yalıtılmış ağ iletişimi.
# docker-compose.yml
# Stack IoT Energetico: Mosquitto + Telegraf + InfluxDB + Grafana
# Produzione-ready con TLS, health checks e persistent volumes
version: '3.9'
networks:
iot_network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
# Network separata per accesso esterno (solo Grafana e Mosquitto esposti)
external_network:
driver: bridge
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
influxdb_config:
grafana_data:
grafana_provisioning:
telegraf_buffer:
services:
# ============================================================
# MOSQUITTO MQTT BROKER
# ============================================================
mosquitto:
image: eclipse-mosquitto:2.0.20
container_name: mosquitto
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "1883:1883" # MQTT plain (solo per sviluppo/testing locale)
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket over TLS
volumes:
- ./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./config/mosquitto/passwd:/mosquitto/config/passwd:ro
- ./config/mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/etc/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
healthcheck:
test: ["CMD", "mosquitto_pub", "-h", "localhost", "-p", "1883",
"-t", "health", "-m", "ping", "--quiet"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 512m
cpus: '1.0'
# ============================================================
# INFLUXDB 2.7 (stable production)
# ============================================================
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
networks:
- iot_network
ports:
- "8086:8086" # solo su rete interna, proxy via Nginx se necessario
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: {{ "$INFLUXDB_ADMIN_PASSWORD" }}
DOCKER_INFLUXDB_INIT_ORG: energy-corp
DOCKER_INFLUXDB_INIT_BUCKET: plant_metrics_raw
DOCKER_INFLUXDB_INIT_RETENTION: 720h
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: {{ "$INFLUXDB_TOKEN" }}
# Performance tuning
INFLUXD_STORAGE_CACHE_MAX_MEMORY_SIZE: 1073741824 # 1GB cache
INFLUXD_STORAGE_COMPACT_THROUGHPUT_BURST: 50331648 # 48MB/s compaction
INFLUXD_HTTP_MAX_BODY_SIZE: 104857600 # 100MB max body
INFLUXD_QUERY_MEMORY_BYTES: 2147483648 # 2GB query memory limit
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
limits:
memory: 4g
cpus: '2.0'
# ============================================================
# TELEGRAF
# ============================================================
telegraf:
image: telegraf:1.34-alpine
container_name: telegraf
restart: unless-stopped
networks:
- iot_network
- external_network # per raggiungere MQTT su rete esterna
depends_on:
influxdb:
condition: service_healthy
mosquitto:
condition: service_healthy
environment:
INFLUXDB_TOKEN: {{ "$INFLUXDB_TOKEN" }}
MQTT_PASSWORD: {{ "$MQTT_PASSWORD" }}
HOSTNAME: telegraf-energy-01
volumes:
- ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
- telegraf_buffer:/var/lib/telegraf
- /var/log/telegraf:/var/log/telegraf
healthcheck:
test: ["CMD", "telegraf", "--test", "--config", "/etc/telegraf/telegraf.conf"]
interval: 60s
timeout: 30s
retries: 3
deploy:
resources:
limits:
memory: 1g
cpus: '1.0'
# ============================================================
# GRAFANA
# ============================================================
grafana:
image: grafana/grafana-oss:11.4.0
container_name: grafana
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: {{ "$GRAFANA_ADMIN_PASSWORD" }}
GF_SECURITY_SECRET_KEY: {{ "$GRAFANA_SECRET_KEY" }}
GF_SERVER_ROOT_URL: "https://grafana.energy-corp.it"
GF_SERVER_DOMAIN: "grafana.energy-corp.it"
# Analytics disabilitate per privacy
GF_ANALYTICS_REPORTING_ENABLED: "false"
GF_ANALYTICS_CHECK_FOR_UPDATES: "false"
# SMTP per alerting
GF_SMTP_ENABLED: "true"
GF_SMTP_HOST: {{ "$SMTP_HOST" }}
GF_SMTP_USER: {{ "$SMTP_USER" }}
GF_SMTP_PASSWORD: {{ "$SMTP_PASSWORD" }}
GF_SMTP_FROM_ADDRESS: "alerts@energy-corp.it"
# InfluxDB datasource
GF_DATASOURCES_DEFAULT: InfluxDB
INFLUXDB_TOKEN_GRAFANA: {{ "$INFLUXDB_TOKEN_GRAFANA" }}
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning:ro
- ./config/grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
influxdb:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3000/api/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
memory: 512m
cpus: '0.5'
# ============================================================
# NGINX REVERSE PROXY (opzionale ma raccomandato)
# ============================================================
nginx:
image: nginx:1.27-alpine
container_name: nginx
restart: unless-stopped
networks:
- external_network
ports:
- "80:80"
- "443:443"
volumes:
- ./config/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
- /var/log/nginx:/var/log/nginx
depends_on:
- grafana
- mosquitto
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
Otomatik Grafana Sağlama
Grafana, veri kaynaklarının ve kontrol panellerinin YAML dosyaları aracılığıyla otomatik olarak sağlanmasını destekler.
Bu, konteynerlerin yeniden oluşturulduğu konteynerli ortamlar için çok önemlidir.
Dosya config/grafana/provisioning/datasources/influxdb.yml içermelidir
JSON formatındaki kontrol panelleri içeri girerken InfluxDB veri kaynağının yapılandırması
config/grafana/dashboards/.
# config/grafana/provisioning/datasources/influxdb.yml
apiVersion: 1
datasources:
- name: InfluxDB-Raw
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_raw
tlsSkipVerify: false
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-1h
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_1h
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-Alerts
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: energy_alerts
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
Grafana: Fotovoltaik Sistem için Kontrol Paneli ve Uyarı
PV Sistem Kontrol Paneli Yapısı
Fotovoltaik sistem için etkili bir gösterge paneli operasyonel sorulara cevap vermelidir 3 saniyeden kısa sürede görsel okuma. Önerilen yapı satırlar halinde düzenlenmiştir:
- 1. Satır - Günlük KPI'lar: Bugünkü üretimle (kWh) 4 adet “stat” panel, mevcut güç (kW), performans oranı (%), arızalı invertör sayısı. 30'lu yılların güncellemesi.
- Hat 2 - Geçici Üretim: Toplam AC gücünü gösteren alan grafiği son 24 saat içinde normalize edilmiş ışınımın üzerine bindirilmiş. InfluxDB-Raw'u şununla sorgulayın: 5 dakikalık toplam Pencere.
- Satır 3 - İnvertör Isı Haritası: X ekseni = zaman, Y ekseni = invertör_id ile ısı haritası, değer = AC gücü. Daha az üreten invertörleri görsel olarak tanımlamanıza olanak tanır. 1 dakikalık güncelleme.
- Satır 4 - Sıcaklık ve Alarmlar: Maksimum invertör sıcaklığı için gösterge tablosu, Energy_alerts grubundaki son 50 alarmın zaman damgasına göre sıralandığı tablo paneli.
- 5. Satır - Ağ verimliliği ve kalitesi: Dağılım grafiği verimliliği ve sıcaklığa karşı, Şebeke frekansı ve faz gerilimleri için zaman serisi.
Gelişmiş Uyarı: Grafana + PagerDuty + Slack
# config/grafana/provisioning/alerting/rules.yml
apiVersion: 1
groups:
- name: "PV Plant Alerts"
folder: "Energy Alerts"
interval: 1m
rules:
# Alert: Produzione impianto drasticamente ridotta
- uid: pv001-production-drop
title: "PV001 - Produzione ridotta >30%"
condition: C
data:
# Query A: Produzione attuale (15 min rolling mean)
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> mean()
|> sum()
# Query B: Irraggiamento attuale (per normalizzare)
- refId: B
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> mean()
# Condition C: se irraggiamento > 200 W/m2 (giorno chiaro)
# e produzione normalizzata < 70% del teorico
- refId: C
datasourceUid: __expr__
model:
type: math
expression: "$A / ($B * 50000) < 0.70 and $B > 200"
for: 10m # alert solo se condizione persiste 10 min
labels:
severity: "warning"
plant_id: "PV001"
category: "production"
annotations:
summary: "PV001: produzione sotto il 70% del teorico"
description: |
Produzione attuale: {{ $A }} W
Irraggiamento: {{ $B }} W/m2
Performance attesa: > 70%
runbook_url: "https://wiki.energy-corp.it/runbook/pv-production-drop"
# Alert: Inverter in fault
- uid: pv001-inverter-fault
title: "PV001 - Inverter Fault Rilevato"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "operating_state")
|> filter(fn: (r) => r._value == 7) // stato fault
|> count()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [0]
type: gt
for: 1m
labels:
severity: "critical"
plant_id: "PV001"
annotations:
summary: "Inverter in stato FAULT rilevato"
# Alert: Temperatura critica
- uid: pv001-thermal-critical
title: "PV001 - Temperatura Inverter Critica"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> max()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [85]
type: gt
for: 5m
labels:
severity: "critical"
category: "thermal"
# Contact points
contactPoints:
- name: "PagerDuty Critical"
receivers:
- uid: pagerduty-critical
type: pagerduty
settings:
integrationKey: $PAGERDUTY_INTEGRATION_KEY
severity: critical
class: "EnergyAlerts"
component: "PV Plant"
- name: "Slack Operations"
receivers:
- uid: slack-ops
type: slack
settings:
url: $SLACK_WEBHOOK_URL
channel: "#energy-ops-alerts"
username: "Grafana Alert Bot"
iconEmoji: ":zap:"
title: "{{ template \"slack.title\" . }}"
text: "{{ template \"slack.message\" . }}"
# Routing
policies:
- receiver: "Slack Operations"
group_by: ["plant_id", "severity"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- receiver: "PagerDuty Critical"
matchers:
- "severity = critical"
continue: true # invia anche a Slack
Güvenlik: TLS, Kimlik Doğrulama ve Ağ Segmentasyonu
Enerji ve kritik IoT platformunun güvenliği: üretim verileri hassas ticari bilgiler ve invertörlere komut gönderme yeteneği yetkisiz erişime karşı korunmalıdır. Derinlemesine savunma stratejisi dört seviyeye ayrılmıştır.
Komut Dosyasıyla TLS Sertifikalarının Oluşturulması
#!/bin/bash
# generate_certs.sh - Genera CA e certificati per mutual TLS
set -euo pipefail
CERTS_DIR="./certs"
mkdir -p "$CERTS_DIR"
# 1. CA (Certificate Authority) - Root of trust
openssl genrsa -out "$CERTS_DIR/ca.key" 4096
openssl req -x509 -new -nodes \
-key "$CERTS_DIR/ca.key" \
-sha256 -days 3650 \
-out "$CERTS_DIR/ca.crt" \
-subj "/C=IT/ST=Puglia/L=Bari/O=EnergyCorp/CN=Energy IoT CA"
# 2. Server certificate (Mosquitto)
openssl genrsa -out "$CERTS_DIR/server.key" 2048
openssl req -new \
-key "$CERTS_DIR/server.key" \
-out "$CERTS_DIR/server.csr" \
-subj "/C=IT/O=EnergyCorp/CN=mosquitto"
# SAN per tutti i possibili hostname
cat > "$CERTS_DIR/server-ext.cnf" <<EOF
[req]
req_extensions = v3_req
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = mosquitto
DNS.2 = mqtt.energy-corp.it
DNS.3 = localhost
IP.1 = 127.0.0.1
IP.2 = 172.20.0.10
EOF
openssl x509 -req \
-in "$CERTS_DIR/server.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/server.crt" \
-days 825 \
-sha256 \
-extfile "$CERTS_DIR/server-ext.cnf" \
-extensions v3_req
# 3. Client certificate per Telegraf
openssl genrsa -out "$CERTS_DIR/telegraf.key" 2048
openssl req -new \
-key "$CERTS_DIR/telegraf.key" \
-out "$CERTS_DIR/telegraf.csr" \
-subj "/C=IT/O=EnergyCorp/CN=telegraf"
openssl x509 -req \
-in "$CERTS_DIR/telegraf.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/telegraf.crt" \
-days 825 -sha256
# 4. Client certificate per gateway PV001
openssl genrsa -out "$CERTS_DIR/gateway_PV001.key" 2048
openssl req -new \
-key "$CERTS_DIR/gateway_PV001.key" \
-out "$CERTS_DIR/gateway_PV001.csr" \
-subj "/C=IT/O=EnergyCorp/CN=gateway_PV001"
openssl x509 -req \
-in "$CERTS_DIR/gateway_PV001.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/gateway_PV001.crt" \
-days 365 -sha256 # 1 anno per gateway (rotazione annuale)
echo "Certificati generati in $CERTS_DIR/"
echo "CA: ca.crt"
echo "Server (Mosquitto): server.crt + server.key"
echo "Client Telegraf: telegraf.crt + telegraf.key"
echo "Client Gateway PV001: gateway_PV001.crt + gateway_PV001.key"
Güvenlik: Üretimde Zorunlu Uygulamalar
- Karşılıklı TLS her zaman: Hem istemcinin hem de sunucunun sertifika sunması gerekir. Kullanıcı adı/şifreyi bilseler bile yetkisiz ağ geçitlerinden gelen bağlantıları engeller.
- Ortam değişkenleri veya gizli yönetici aracılığıyla sırlar: Hiçbir zaman zor kod Kodunuzda veya yapılandırma dosyalarınızda InfluxDB belirteçleri, MQTT şifreleri veya API anahtarları Git'e kararlı. Docker sırlarını, HashiCorp Vault'u veya AWS Secrets Manager'ı kullanın.
- Ağ segmentasyonu: MQTT komisyoncusu, InfluxDB ve Telegraf'ın asla doğrudan internette ifşa edilmemelidir. Grafana yalnızca HTTPS ters proxy aracılığıyla. Uç ağ geçitlerine yönelik MQTT aracısı, özel bir VPN üzerinden kullanıma sunulabilir.
- Sertifika rotasyonu: Ağ geçidi sertifikaları 1 yıl süreyle geçerlidir (365 gün). Grafana alarmlarını 30 gün önceden sona erecek şekilde ayarlayın.
- MQTT denetim günlükleri: Tüm bağlantıların ve konuların günlüğe kaydedilmesini etkinleştir Sivrisinek'te. Anormal erişim tespiti için SIEM ile entegre edin.
Performans: Yüksek Ölçek için Karşılaştırma ve Optimizasyon
10 MW sistem için yığın kapasitesi
10 MW'lık bir fotovoltaik parkta tipik olarak her biri 50 kW'lık 200 invertör bulunur. Her 5 saniyede bir yoklama yapıldığında MQTT mesajlarının hacmi aşağıdaki gibidir:
| Bileşen | Metrikler | Hacim |
|---|---|---|
| 200 invertör x 5s | 15 alan/mesaj | 40 mesaj/sn, 600 veri noktası/sn |
| 10 meteoroloji istasyonu x 10s | 8 alan/mesaj | 1 msg/sn, 8 veri noktası/sn |
| Tesis toplamları x 15 dk | Akı agregaları | Veritabanında hesaplandı |
| Toplam | - | ~650 veri noktası/sn, ~56M veri noktası/gün |
| Ham depolama (30 gün) | ~1,68 milyar veri noktası | ~8-15 GB (InfluxDB sıkıştırmasıyla) |
Bu birim, tek bir InfluxDB 2.7 düğümü tarafından fazlasıyla yönetilebilir. 8 GB RAM ve NVMe SSD. 4 vCPU ve 8 GB RAM'e sahip VM'de EMQX, 100.000 bağlantıyı yönetir 1 milyon mesaj/saniye aktarım hızına sahip rakipler. Tek kurulum için 200 invertörden, Raspberry Pi 4'te tek bir Mosquitto komisyoncusundan ve fazlasıyla yeterli (gerçek dünya testleri: 10K msg/sn, 500 MB RAM).
MQTT Karşılaştırma Komut Dosyası
#!/usr/bin/env python3
"""
mqtt_benchmark.py
Benchmark del broker MQTT per simulare 200 inverter
Verifica throughput, latenza e affidabilità
"""
import asyncio
import json
import time
import random
import statistics
from datetime import datetime, timezone
import aiomqtt
async def simulate_inverter(
plant_id: str,
inverter_id: str,
broker_host: str,
broker_port: int,
duration_sec: int,
poll_interval: float = 5.0
) -> dict:
"""Simula un inverter che pubblica metriche su MQTT"""
messages_sent = 0
latencies = []
errors = 0
start_time = time.monotonic()
async with aiomqtt.Client(
hostname=broker_host,
port=broker_port,
client_id=f"bench_{plant_id}_{inverter_id}",
keepalive=30,
) as client:
while time.monotonic() - start_time < duration_sec:
# Simula metriche realistiche
base_power = 50000 * random.uniform(0.3, 1.0) # 0-50 kW
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"plant_id": plant_id,
"inverter_id": inverter_id,
"ac_power_w": base_power,
"ac_voltage_l1_v": random.uniform(225, 235),
"dc_voltage_v": random.uniform(550, 650),
"dc_current_a": base_power / random.uniform(550, 650),
"temp_cabinet_c": random.uniform(35, 65),
"efficiency_pct": random.uniform(94, 98.5),
"operating_state": 4, # MPPT
"error_code": 0,
"energy_wh_total": messages_sent * base_power * poll_interval / 3600,
}
topic = f"plant/{plant_id}/inverter/{inverter_id}/metrics"
t_send = time.monotonic()
try:
await client.publish(
topic=topic,
payload=json.dumps(payload).encode(),
qos=1,
)
latencies.append((time.monotonic() - t_send) * 1000) # ms
messages_sent += 1
except Exception:
errors += 1
await asyncio.sleep(poll_interval)
return {
"inverter_id": inverter_id,
"messages_sent": messages_sent,
"errors": errors,
"avg_latency_ms": statistics.mean(latencies) if latencies else 0,
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
async def run_benchmark(
num_inverters: int = 200,
duration_sec: int = 60,
broker_host: str = "localhost",
broker_port: int = 1883,
) -> None:
"""Esegui benchmark con N inverter simulati"""
print(f"Avvio benchmark: {num_inverters} inverter per {duration_sec}s")
print(f"Broker: {broker_host}:{broker_port}")
print("-" * 60)
start = time.monotonic()
tasks = [
simulate_inverter(
plant_id="BENCH001",
inverter_id=f"INV{i:03d}",
broker_host=broker_host,
broker_port=broker_port,
duration_sec=duration_sec,
poll_interval=5.0,
)
for i in range(1, num_inverters + 1)
]
results = await asyncio.gather(*tasks)
elapsed = time.monotonic() - start
# Aggregazione risultati
total_messages = sum(r["messages_sent"] for r in results)
total_errors = sum(r["errors"] for r in results)
all_latencies_avg = [r["avg_latency_ms"] for r in results]
all_latencies_p99 = [r["p99_latency_ms"] for r in results]
print(f"Risultati benchmark:")
print(f" Durata: {elapsed:.1f}s")
print(f" Messaggi inviati: {total_messages:,}")
print(f" Errori: {total_errors} ({total_errors/total_messages*100:.2f}%)")
print(f" Throughput: {total_messages/elapsed:.1f} msg/sec")
print(f" Latenza media: {statistics.mean(all_latencies_avg):.2f} ms")
print(f" Latenza P99: {statistics.mean(all_latencies_p99):.2f} ms")
print(f" Inverter senza errori: {sum(1 for r in results if r['errors'] == 0)}/{num_inverters}")
if __name__ == "__main__":
asyncio.run(run_benchmark(
num_inverters=200,
duration_sec=120,
broker_host="localhost",
broker_port=1883,
))
Vaka Çalışması: 10 MW Fotovoltaik Park İzleme
Bu örnek olay çalışması, bir enerji IoT platformunun gerçek dünyadaki uygulamasını açıklamaktadır bir fotovoltaik park için 10 MW ile Güney İtalya'da 50 kW'lık 200 SMA Sunny Tripower invertörü ve 8 hava durumu ölçüm istasyonu.
İlk Kurulum ve Zorluklar
Site, 10 Mbps bant genişliği garantili ve aralarında değişken gecikme süresi olan 4G WAN ağındadır. 30 ve 200 ms. Her invertör, yerel LAN ağı (192.168.x.x) üzerinde Modbus TCP konuşur. endüstriyel anahtar. Edge ağ geçidi, 256 GB SSD'ye sahip bir Raspberry Pi 4'tür (8 GB RAM) yerel ara belleğe alma için, özel veri SIM'ine sahip endüstriyel bir 4G yönlendirici aracılığıyla WAN'a bağlanır.
| Parametre | Değer |
|---|---|
| Sistem gücü | 10,2 MW zirve (SMA STP50-US-40) |
| İnvertör numarası | 200 (SMA Sunny Tripower, 50 kW) |
| Hava istasyonları | 8 (Işıma, T, HR, rüzgar) |
| Kenar ağ geçitleri | Ahududu Pi 4 8GB + 256GB SSD |
| Yoklama aralığı invertörü | 5 saniye |
| Hava durumu yoklama aralığı | 10 saniye |
| MQTT mesaj hacmi | ~41 msg/sn (200 inv x 1/5s + 8 hava durumu x 1/10s) |
| Kullanılan WAN bant genişliği | ~180 KB/s (mevcut 10 Mbps'nin %2'si) |
| MQTT Brokerı | Bulut VM'de EMQX 5.8 (4 vCPU, 8 GB) |
| AkışDB | Özel VM'de 2.7.10 (8 vCPU, 32 GB RAM, NVMe) |
6 aylık operasyondan sonraki sonuçlar
- Platformun kullanılabilirliği: %99,94 (toplam kesinti süresi: 6 ayda 2,6 saat, tümü bulut sunucusunun planlı bakımı için)
- WAN bağlantısının kesilmesi nedeniyle veri kaybı: %0,002 (yerel ara belleğe alma sayesinde) ağ geçidine ve MQTT kalıcı oturumuna)
- Otomatik olarak algılanan invertör arıza uyarıları: 6 ayda 47 olay, Anında uyarı sayesinde MTTR (Ortalama Onarım Süresi) 8 saatten 2,3 saate düşürüldü PagerDuty'de (ertesi gün manuel keşif yerine)
- Ortalama Performans Oranı: Önceki dönemde %82,3 ve %79,1 (olmadan gerçek zamanlı izleme): +3,2 yüzde puanı = +320 ek MWh/yıl
- Kestirimci bakımdan tasarruf: 12 invertörün tespiti arızadan önce erken bozulma (ortalamayla karşılaştırıldığında ısı emici sıcaklığı +8°C) ilan edildi. Önleyici müdahalenin acil durum değişimine kıyasla tasarruf açısından 45 bin Euro tutarında olduğu tahmin ediliyor.
- 6 ay sonra InfluxDB depolama alanı: Ham veriler için 18,7 GB (ard arda 30 gün), 1 saatlik toplamlar için 4,2 GB (6 ay), 1 günlük toplamlar için 0,8 GB (toplam 180 gün)
Öğrenilen Ders: Arabelleğe Alma Sınırı Pazarlık Edilemez
Operasyonun ilk ayında 4G bağlantısında 3 kesinti yaşandı telefon operatörü sorunları nedeniyle uzatıldı (her biri 2-6 saat). olmadan Raspberry Pi'nin SSD'sinde yerel ara belleğe alma olsaydı, tüm ölçümleri kaybederdik bu pencereler sırasında. 64 GB tampon ve sakla-ilet mantığı sayesinde, bağlantı her kurulduğunda, biriken tüm mesajlar silinir MQTT'de orijinal zaman damgaları ve InfluxDB ile kronolojik sırayla yayınlandı sıra dışı ekleme için bunları doğru şekilde kabul etti.
En İyi Uygulamalar ve Anti-Kalıplar
En İyi Uygulamalar
- Aracıda değil, yükteki zaman damgası: Ağ geçidi her zaman şunları içermelidir: JSON yükündeki yakalamanın kesin zaman damgası, zaman damgasına güvenmeyin MQTT komisyoncusunun alımı. Çevrimdışı ara belleğe alma durumunda iletiler geç ulaşır ancak veriler geçici olarak doğrudur. InfluxDB sıra dışı eklemeleri kabul eder.
-
Telemetri kanallarını komut kanallarından ayırın: Telemetri konuları
(
plant/+/inverter/+/metrics) ve komut olanlar (plant/+/commands/#) farklı ACL'lere sahip ayrı ad alanlarında olmaları gerekir. Komutlar QoS 2 e gerektirir daha sıkı kimlik doğrulama. - Geçmiş veriler için agresif alt örnekleme: Ham veriler 5 saniyede ölüm sonrası arıza analizi için değerlidirler, ancak tarihsel eğilimler için toplamlar yeterlidir 15 dakika veya birkaç kez. Flux altörnekleme ve saklama görevlerini hemen uygulayın.
- Ağ geçidi sağlığı izleme: Ağ geçidinin MQTT kalp atışını kullanın (her 60'lı yıllarda belirli bir konuda yayınlayın) ve Grafana'da izleyin. Kalp atışı eksikse 3 dönem boyunca kritik uyarı: düşen bir komisyoncu sorunu değil, ağ geçidi olabilir.
- InfluxDB'deki ağ geçidi sistemi ölçümleri: Edge ağ geçidinde Telegraf CPU, RAM, CPU sıcaklığı, arabellek için kullanılabilir disk alanı gibi ölçümleri toplayabilir, ve bunları MQTT'de yayınlayın. Raspberry Pi'nin stres altında olup olmadığını bilmek önemlidir.
Kaçınılması Gereken Anti-Desenler
Kritik Anti-Desenler
-
Konular toplama olmadan çok ayrıntılı: Her birini serbest bırakın
Ayrı bir konudaki Modbus kaydı (örn.
plant/PV001/INV001/register/40001) komisyoncuda on binlerce konu ve muazzam meta veri yükü oluşturur. İlgili günlükleri her zaman cihaz başına tek bir JSON yükünde toplayın. - Yüksek frekanslı telemetri için QoS 2: Dörtlü el sıkışma QoS 2, kontrol mesajlarının sayısını dört katına çıkarır. 1-5Hz veriler için QoS 1'i kullanın (kopyalar InfluxDB tarafından zaman damgası idempotence yoluyla işlenir). Yalnızca komutlar için QoS 2.
-
Telemetri konusunu koruyun: Tutma durum konuları için kullanışlıdır
(son değer önemlidir), ancak yüksek frekanslı telemetri konularında
Aracı, tutulan mesajı her yayında ek yüklerle birlikte güncellemelidir.
depolama ve CPU. Korumayı yalnızca şunun için kullan:
/statuse/config. -
InfluxDB'de sınırsız kardinalite: Yüksek değere sahip etiketlerden kaçının
kardinalite (örn. kullanım
session_idEV istasyonları için bir etiket olarak yeni bir Her oturum için seri). Bunun yerine etiket olarak değil, alan olarak kullanın. - InfluxDB'de hız sınırlaması yoktur: Eşzamanlı sorguların sınırı olmadan, tek bir yoğun Grafana sorgusu (örneğin 6 aylık ham veriyi dışarı aktarma) DB belleği ve öldürücü OOM'a neden olur. Her zaman sorgu sınırlarını yapılandırın.
Kaynaklar ve Teknik Referanslar
- MQTT 5.0 Spesifikasyonu (OASIS): Resmi MQTT 5.0 protokol spesifikasyonu, QoS'yi, saklamayı, Will mesajları, oturum yönetimi ve kullanıcı özellikleri. Uygulamalar için temel kurumsal.
- Telegraf MQTT Tüketici Eklentisi: Resmi InfluxData belgeleri tüm yapılandırma parametrelerini açıklar konu ayrıştırma ve desteklenen veri formatları dahil olmak üzere mqtt_consumer eklentisinin. URL: docs.influxdata.com/telegraf
- InfluxDB Flux Belgeleri: Flux dili güçlüdür ancak öğrenme eğrisi vardır. Belgeler resmi,gregateWindow dahil tüm işlevlerle tamamlanmış referansı içerir, Katılın, döndürün ve haritalayın.
- SunSpec İttifakı: SunSpec standardı, fotovoltaik invertörler için Modbus kayıt haritasını tanımlar. Piller ve sayaçlar. sunspec.org'da mevcuttur. Arasında birlikte çalışabilirliği sağlar farklı üreticilerin cihazları.
- EMQX Belgeleri: EMQX belgeleri, kümeleme, kural motorları, güvenlik ve InfluxDB dahil harici depolama sistemleriyle entegrasyon.
- Grafana Uyarısı: Birleşik uyarı sistemi (v9+) için Grafana belgeleri şunları içerir: YAML aracılığıyla temel hazırlık, iletişim noktaları yapılandırması ve yönlendirmeye yönelik kılavuzlar.
- IIoT Güvenliği: IEC 62443: IEC 62443 standardı otomasyon sistemleri için güvenlik gereksinimlerini tanımlar ve enerji sistemleri de dahil olmak üzere endüstriyel kontrol. Sertifikalar için zorunlu NIS2 (İtalya'da 138/2024 sayılı Kanun Hükmünde Kararname ile uygulanmıştır).
Sonuçlar ve Sonraki Adımlar
Eksiksiz ve üretime hazır bir enerji IoT platformu oluşturduk: köprüden Grafana, PagerDuty ve aracılığıyla çok kanallı uyarıya kadar yerel arabelleğe alma özelliğine sahip Modbus-MQTT Gevşek. MQTT + Telegraf + InfluxDB + Grafana yığını test edilmiştir, belgelenmiştir ve ölçeklenebilirdir: 10 MW'lık bir sistem için saniyede 650 veri noktasını bol marjla yönetir ve Kümelemede ve çoklu düğümlerde EMQX InfluxDB, hizmet ölçeğindeki dağıtımlara doğrusal olarak ölçeklenir yüzlerce MW.
Vaka çalışması rakamları kendi adına konuşuyor: %99,94 kullanılabilirlik, MTTR'de azalma 2,3 saatte 8, +3,2 Performans Oranı puanı ve 45.000 EUR bakım tasarrufu 6 ay içinde tahmin. Gerçek zamanlı izleme enerji sistemleri için lüks değildir yenilenebilir kaynaklar: gelirlerin ve varlıkların faydalı ömrünün doğrudan çarpanı.
EnergyTech serisinin bir sonraki makalesi şu konuyu ele alıyor: karbon muhasebe: Önlenen emisyonları hesaplayan ÇSY platformlarının nasıl oluşturulacağı, Toplanan üretim verilerinden başlayarak yeşil sertifikalar ve karbon kredisi yeni oluşturduğumuz IoT platformundan.
İlgili Makaleler
- EnergyTech Serisi: Bu makale özel serinin bir parçasıdır yenilenebilir enerji için yazılım mühendisliğine. Önceki makale bununla ilgili EV Yük Dengeleme ve ardından gelen Karbon Muhasebesi resmi tamamlıyor Enerji geçişine yönelik yazılım altyapısının geliştirilmesi.
- MLOps Serisi: Alt örnekleme ve anormallik tespiti uygulandı Flux ile bunlar yalnızca ilk adımdır. MLOps serisi modellerin nasıl entegre edileceğini açıklar ML (üretim tahmini için LSTM, anormallik tespiti için İzolasyon Ormanı) ile veriler aynı InfluxDB platformundan geliyor.
- Veri ve Yapay Zeka İş Serisi: MQTT-InfluxDB mimarisi bir IoT veri göl evi örneği. Veri ve Yapay Zeka İş serisi, bunun nasıl DBT ve Airflow ile bu zaman serisi verilerini kurumsal analiz hatlarına taşıyın.
- PostgreSQL AI Serisi: gerektiren daha karmaşık analizler için İlişkisel verilerle (ör. sözleşme verileri, tesis ana verileri, geçmiş) BİRLEŞİN bakım), Yabancı Veri Sarmalayıcı aracılığıyla InfluxDB-PostgreSQL entegrasyonu ve PostgreSQL AI serisinde yer alıyor.







