Od MQTT k InfluxDB: Energetická platforma IoT v reálném čase
Globální trh průmyslové energie IoT převyšuje 22 miliard dolarů v roce 2025 a roste s CAGR 19,8 % na 54 miliard do roku 2030. Každý 10 MW fotovoltaický systém generuje mimo 4 miliony měření za den z invertorů, čidel záření, strunové měřiče a záznamníky meteorologických dat. Každá větrná farma přidává vibrační data, z kroutící moment a teplota z gondol. Každá nabíjecí stanice EV zveřejňuje stav připojení, výstupní výkon a stav konektoru každých 30 sekund.
Výzvou není tato data sbírat: a dělat to spolehlivě, škálovatelně a s latencí sub-second, zachování historizace v průběhu let a schopnost provádět analytické dotazy v reálném čase. Stal se protokol MQTT, vytvořený pro průmyslové sítě s omezenou šířkou pásma společný jazyk energetické IIoT. InfluxDB a řešení ukládání časových řad nejpoužívanější pro tuto doménu. Společně s Telegrafem jako mostem a Grafanou jako vrstvou vizualizace, tvoří zásobník, který řídí od malých domácích systémů až po utility multi-gigawatt.
Tento článek staví kompletní energetickou platformu IoT z architektury od začátku do konce, přes podrobnou konfiguraci každé součásti až po Docker Pracovní kompozice a reálná případová studie na 10 MW fotovoltaickém parku s 200 střídači. Každá sekce obsahuje testovaný kód a produkční strategie.
Co se dozvíte v tomto článku
- End-to-end architektura: senzory → brána → MQTT → Telegraf → InfluxDB → Grafana
- Hluboký ponor MQTT: QoS, uchování, vůle, návrh tematického jmenného prostoru pro energii
- Srovnání makléřů: Mosquitto vs EMQX vs HiveMQ pro škálovatelnost a clustering
- Telegraf: spotřebitelská konfigurace MQTT, procesní potrubí, výstup InfluxDB
- InfluxDB 3.x: návrh segmentu, zásady uchovávání, dotazy Flux a plánování úloh
- Modbus RTU/TCP → MQTT most s Pythonem a pymodbus
- Dokončete Docker Compose pro produkční zásobník
- Zabezpečení: TLS vzájemné, MQTT auth, InfluxDB token, segmentace sítě
- Případová studie: 10 MW fotovoltaický park s 200 invertory, 100 000 msg/s
- Pokročilé upozorňování: Integrace Grafana, PagerDuty a Slack
Řada EnergyTech – umístění článku
| # | Položka | Úroveň | Stát |
|---|---|---|---|
| 1 | Protokol OCPP 2.x: Budování nabíjecích systémů EV | Moderní | Publikováno |
| 2 | Architektura DERMS: Agregace milionů distribuovaných zdrojů | Moderní | Publikováno |
| 3 | Prognóza obnovitelné energie s ML: Python LSTM | Moderní | Publikováno |
| 4 | Battery Management System pro Grid-Scale Storage | Moderní | Publikováno |
| 5 | IEC 61850 pro softwarové inženýry: Smart Grid Communication | Moderní | Publikováno |
| 6 | EV Charging Load Balancing: Real-Time Algorithms | Moderní | Publikováno |
| 7 | Jste zde – Od MQTT k InfluxDB: Real-Time Energy IoT Platform | Moderní | Proud |
| 8 | Softwarová architektura uhlíkového účetnictví: Platformy ESG | Moderní | Další |
| 9 | Digitální dvojče pro energetickou infrastrukturu: Simulace v reálném čase | Moderní | Již brzy |
| 10 | Blockchain pro P2P obchodování s energií: Chytré smlouvy a omezení | Moderní | Již brzy |
End-to-End architektura: Od senzoru k řídicímu panelu
Architektura energetické platformy IoT v reálném čase je rozdělena do pěti různých úrovní, každý se specifickými povinnostmi a odlišnými požadavky na spolehlivost. Porozumění Toto oddělení je nezbytné před napsáním prvního konfiguračního řádku.
Úroveň 1: Pole (polní vrstva)
Na nejnižší úrovni najdeme fyzická zařízení: fotovoltaické střídače, anemometry, pyranometry, řetězcové měřiče, PMU (Phasor Measurement Unit), inteligentní měřiče a nabíjecí stanice pro elektromobily. Většina těchto zařízení mluví protokoly průmyslového pole: Modbus RTU na RS-485, Modbus TCP přes Ethernet, IEC 61850 v rozvodnách, DNP3 pro rozvodné sítě nebo proprietární protokoly jako SunSpec pro střídače.
Tyto protokoly nejsou nativně kompatibilní s moderní IP nebo cloudem. Jejich hlasování Typický cyklus se pohybuje od 100 ms (PMU) do 60 sekund (starší datalogger). Optimální frekvence pro fotovoltaický střídač a 1-5 sekund pro elektrické veličiny (napětí, proud, výkon) a 30-60 sekund pro tepelné veličiny (teplota modulu, teplota měniče).
Úroveň 2: Brána / Edge (Edge Layer)
Okrajová brána a překladač: čte protokoly a publikuje přes MQTT brokerovi centrální. Může to být embedded zařízení (Raspberry Pi 4, BeagleBone, Moxa ioThinx), PLC s integrovaným zásobníkem MQTT nebo lokální okrajový server (NUC, ODROID) pro více instalací velký. Brána plní tři důležité funkce:
- Překlad protokolu: Modbus → MQTT, IEC 61850 → MQTT, DNP3 → MQTT
- Místní ukládání do vyrovnávací paměti: Shromažďuje data během odpojení WAN (ukládání a předávání)
- Předzpracování hran: filtrování, agregace, detekce lokálních anomálií
Vrstva 3: Zprostředkovatel zpráv (přepravní vrstva)
Broker MQTT je srdcem dopravního systému. Přijímá zprávy z okrajových bran, li distribuuje předplatitelům (především Telegraf, ale i dalším spotřebitelům, jako jsou systémy SCADA nebo upozornění v reálném čase). Výběr brokera závisí na měřítku: Mosquitto pro jednotlivé nebo malé instalace (méně než 10 000 připojení), EMQX nebo HiveMQ na nasazení podnikové a vícemístné.
Úroveň 4: Příjem a ukládání (datová vrstva)
Telegraf se přihlásí k odběru témat MQTT a převádí zprávy na metriky InfluxDB pomocí konfigurovatelné procesorové potrubí. InfluxDB přijímá a indexuje časové řady s uchováváním diferencované zásady: nezpracovaná data ve vysokém rozlišení za 30 dní, agregovaná za hodinu 1 rok, denní úhrny za 5 let. Úlohy toku plánují downsampling automatické a vypočítané výstrahy.
Úroveň 5: Vizualizace a upozornění (prezentační vrstva)
Grafana se připojuje k InfluxDB prostřednictvím pluginu pro zdroje dat a vykresluje řídicí panely v reálném čase aktualizovat každých 5-30 sekund. Výstražný systém Grafany vyhodnocuje dotazy pravidelně a posílat upozornění na PagerDuty, Slack, e-mail nebo vlastní webhooky, když prahové hodnoty jsou překročeny.
Kompletní architektonický diagram
Datový tok probíhá touto cestou: Invertory/senzory (Modbus/SunSpec) → Hrana brány (Raspberry Pi/NUC s pymodbus) → Broker MQTT (Mosquitto/EMQX) → Telegrafovat (MQTT Consumer + JSON Parser) → InfluxDB (časová řada DB) → Grafana (palubní deska + upozornění). Souběžně s tím a Správce výstrah v InfluxDB vyhodnocuje úlohy Flux každou minutu a umí psát události ve vyhrazeném segmentu nebo volání koncového bodu HTTP.
MQTT Deep Dive: Protokol a design pro energii
Základy MQTT 5.0
MQTT (Message Queuing Telemetry Transport) je navržený protokol pro publikování a odběr pro sítě s omezenou šířkou pásma a vysokou latencí. Verze 5.0 (standard OASIS RFC 2019) přidává kritické funkce pro podnikové aplikace: interval vypršení relace, zpráva interval vypršení platnosti, rozšířené kódy příčiny, vlastnosti uživatele ve zprávách, řízení toku s přijímat maximum a aliasy témat, aby se zmenšila velikost záhlaví.
QoS 0, 1 a 2: Správná volba pro energetická data
Kvalita služeb je nejdůležitější volbou při návrhu energetického systému MQTT. Tyto tři možnosti mají velmi odlišné důsledky pro propustnost, režii sítě a garance doručení:
| QoS | Záruka | Nad hlavou | Use Case Energy |
|---|---|---|---|
| QoS 0 | Maximálně jednou (vypal a zapomeň) | Minimálně (1 RTT) | Vysokofrekvenční telemetrie (1Hz+), data, kde je přípustná příležitostná ztráta: ozáření, okolní teplota |
| QoS 1 | Alespoň jednou (s duplikáty) | Střední (2 RTT) | Výrobní metriky (kWh, kW), alarmy, stavy zařízení – duplikáty spravované InfluxDB prostřednictvím časového razítka |
| QoS 2 | Přesně unce | Vysoká (4 RTT) | Řídicí příkazy (nastavená hodnota střídače, otevření/zavření spínače), finanční údaje (energie prodaná v síti) |
Pro typický fotovoltaický systém je praktické doporučení: QoS 0 pro telemetrii meteorologické (ozáření, teplota, vítr), QoS 1 pro výrobu a alarmy, QoS 2 pouze pro řídicí příkazy. Tato volba vyvažuje spolehlivost a propustnost, což umožňuje zprostředkovateli spravovat 50-100 000 zpráv za sekundu bez nasycení šířky pásma WAN.
Uchovávejte zprávy a zprávy vůle
Dvě často podceňované, ale kritické funkce MQTT pro energetické systémy:
-
Uchovat zprávy: Broker uloží poslední zprávu s keep=true
pro každé téma. Když se připojí nový předplatitel, obdrží hodnotu okamžitě
aktuální bez čekání na další zveřejnění. Základ pro stavová témata:
plant/PV001/statuss uchovat zajišťuje, že každá nová palubní deska vidí stav systému okamžitě. -
Zprávy vůle (Poslední vůle a závěť): Brána konfiguruje zprávu
který broker automaticky zveřejní, pokud se klient abnormálně odpojí.
Používá se k signalizaci odpojení brány: pokud Raspberry Pi na vzdáleném místě spadne,
makléř zveřejňuje
plant/PV001/connectivity offlineokamžitě, bez čekání na časový limit relace.
Téma Návrh jmenného prostoru pro energetické systémy
Návrh jmenného prostoru tématu je architektonickým rozhodnutím, které ovlivňuje každý aspekt systém: směrování, filtrování, zabezpečení ACL a organizace v InfluxDB. Dobře navržená hierarchická struktura se řídí tímto vzorem:
# Schema topic namespace per impianto energetico
# Struttura: {tipo_impianto}/{plant_id}/{subsystem}/{device_id}/{metric_type}
# === FOTOVOLTAICO ===
# Inverter string-level metrics
plant/PV001/inverter/INV001/metrics
plant/PV001/inverter/INV001/alarms
plant/PV001/inverter/INV001/status
# String-level measurements
plant/PV001/string/STR001/metrics
plant/PV001/string/STR001/metrics/voltage
plant/PV001/string/STR001/metrics/current
# Meteorological sensors
plant/PV001/weather/WS001/metrics
# Plant-level aggregates (calcolati dal gateway)
plant/PV001/totals/power_ac
plant/PV001/totals/energy_today
# === EOLICO ===
wind/WF001/turbine/T001/nacelle/metrics
wind/WF001/turbine/T001/blade/metrics
wind/WF001/turbine/T001/gearbox/temperature
wind/WF001/substation/SS001/metrics
# === STAZIONI DI RICARICA EV ===
ev/SITE001/charger/CHRG001/connector/1/metrics
ev/SITE001/charger/CHRG001/session/current
ev/SITE001/charger/CHRG001/status
ev/SITE001/totals/energy_dispensed
# === SISTEMA ===
# Heartbeat e connectivity
plant/PV001/gateway/GW001/heartbeat # ogni 60s
plant/PV001/gateway/GW001/status # retain=true
plant/PV001/gateway/GW001/version # retain=true
# === WILDCARD SUBSCRIPTIONS ===
# Telegraf sottoscrive tutti i metrics:
plant/+/inverter/+/metrics # tutti gli inverter di tutti gli impianti
plant/PV001/+/+/metrics # tutti i metrics dell'impianto PV001
plant/# # tutto il plant PV001 (usare con cautela)
Základní pravidla pro jmenný prostor: každá úroveň musí mít sémantický význam, ovládací témata (příkazy) musí být oddělena od témat (dat) telemetrie, použijte snake_case pro konzistenci, vyhněte se speciálním znakům a mezerám, omezte hloubku na maximálně 6 úrovních pro kompatibilitu s různými makléři a klienty.
Srovnání makléřů MQTT: Mosquitto vs EMQX vs HiveMQ
Volba brokera MQTT je zásadní pro škálovatelnost a spolehlivost systému. Tři nejpoužívanější makléři v energetickém sektoru mají velmi odlišné vlastnosti.
| Vlastnosti | Mosquitto 2.x | EMQX 5.x | HiveMQ 4.x |
|---|---|---|---|
| Max. spojení | 100 kB (jeden uzel) | 100 M (23-uzlový cluster) | 200 mil. (shluky) |
| Shlukování | Žádný domorodec | Ano (distribuováno Erlangem) | Ano (podnik) |
| Verze MQTT | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 |
| Pravidlo Engine | No | Ano (založené na SQL) | Ano (podnik) |
| Most | Si | Si | Si |
| Autentizace | Soubory, TLS, pluginy | JWT, OAuth2, LDAP, mTLS | JWT, OAuth2, podnik |
| Licence | EPL/EDL (open source) | Apache 2.0 / Enterprise | Komunita / Podnik |
| RAM (100K připojení) | ~500 MB | ~2 GB | ~3 GB |
| Use case energy | Jediný závod, okrajová brána, laboratoř | Multi-site, utility-scale, enterprise | Utility-scale, enterprise compliance |
Konfigurace Mosquitto pro výrobu
# /etc/mosquitto/mosquitto.conf - Produzione con TLS e ACL
# Network listeners
listener 1883 localhost
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true # mutual TLS
use_identity_as_username true
# WebSocket listener per dashboard web
listener 9001
protocol websockets
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# Autenticazione
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistenza
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 60
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S
# Performance tuning
max_queued_messages 10000
max_packet_size 1048576 # 1MB
message_size_limit 262144 # 256KB per messaggio
max_connections 10000
max_keepalive 300
# /etc/mosquitto/acl - Access Control List per ruoli
# Telegraf: legge tutto
user telegraf_reader
topic read plant/#
topic read wind/#
topic read ev/#
# Gateway PV001: scrive solo per il suo impianto
user gateway_PV001
topic write plant/PV001/#
topic read plant/PV001/commands/#
# Gateway EV SITE001: scrive solo per il suo sito
user gateway_SITE001
topic write ev/SITE001/#
# Dashboard/SCADA: solo lettura
user dashboard_user
topic read plant/#
topic read wind/#
topic read ev/#
# Admin: tutto (solo per maintenance)
user mqtt_admin
topic #
Gateway Edge: Bridge Modbus → MQTT s Pythonem
Okrajová brána je komponenta, která spojuje svět průmyslových protokolů s ekosystémem MQTT. Zde je kompletní implementace pro fotovoltaické střídače s Protokol Modbus TCP, včetně místního ukládání do vyrovnávací paměti pro řešení odpojení WAN.
#!/usr/bin/env python3
"""
gateway_modbus_mqtt.py
Gateway edge per inverter fotovoltaici Modbus TCP → MQTT
Compatibile con registro SunSpec (standard de facto per inverter PV)
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import aiofiles
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import aiomqtt
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('gateway')
@dataclass
class InverterMetrics:
"""Metriche SunSpec inverter (Model 103 - Three Phase Inverter)"""
timestamp: str
plant_id: str
inverter_id: str
# AC output
ac_power_w: float # Potenza AC totale (W)
ac_voltage_l1_v: float # Tensione fase L1 (V)
ac_voltage_l2_v: float # Tensione fase L2 (V)
ac_voltage_l3_v: float # Tensione fase L3 (V)
ac_current_a: float # Corrente totale AC (A)
ac_frequency_hz: float # Frequenza rete (Hz)
# DC input
dc_voltage_v: float # Tensione DC ingresso (V)
dc_current_a: float # Corrente DC ingresso (A)
dc_power_w: float # Potenza DC ingresso (W)
# Energy
energy_wh_total: float # Energia prodotta totale (Wh)
energy_wh_today: float # Energia prodotta oggi (Wh)
# Thermal
temp_cabinet_c: float # Temperatura cabinet inverter (C)
temp_heatsink_c: float # Temperatura heatsink (C)
# Status
operating_state: int # 1=Off, 2=Sleeping, 3=Starting, 4=MPPT, 5=Throttled
error_code: int # Codice errore (0=no error)
# Efficiency
efficiency_pct: float # Efficienza conversione (%)
class ModbusMqttGateway:
"""
Gateway che legge metriche Modbus da inverter SunSpec
e le pubblica su MQTT con buffering locale per disconnessioni WAN
"""
def __init__(self, config: dict):
self.plant_id = config['plant_id']
self.inverters = config['inverters'] # lista di {id, host, port, unit_id}
self.mqtt_host = config['mqtt']['host']
self.mqtt_port = config['mqtt']['port']
self.mqtt_user = config['mqtt']['username']
self.mqtt_password = config['mqtt']['password']
self.poll_interval = config.get('poll_interval_sec', 5)
self.buffer_file = config.get('buffer_file', '/var/lib/gateway/buffer.jsonl')
self.clients: dict[str, AsyncModbusTcpClient] = {}
self._buffer: list[dict] = []
self._mqtt_connected = False
async def connect_inverter(self, inverter_cfg: dict) -> AsyncModbusTcpClient:
"""Crea e connette client Modbus TCP"""
client = AsyncModbusTcpClient(
host=inverter_cfg['host'],
port=inverter_cfg.get('port', 502),
timeout=5,
retries=3,
retry_on_empty=True,
)
await client.connect()
if not client.connected:
raise ConnectionError(
f"Cannot connect to inverter {inverter_cfg['id']} "
f"at {inverter_cfg['host']}:{inverter_cfg.get('port', 502)}"
)
logger.info(f"Connected to inverter {inverter_cfg['id']}")
return client
async def read_sunspec_model103(
self,
client: AsyncModbusTcpClient,
unit_id: int,
plant_id: str,
inverter_id: str
) -> Optional[InverterMetrics]:
"""
Legge registri SunSpec Model 103 (Three Phase Inverter)
SunSpec base address: 40001 (Modbus address 40000)
Model 103 start: 40070 (tipico, dipende da inverter)
"""
try:
# Leggi blocco principale Model 103: 40069 - 40094 (26 registri)
result = await client.read_holding_registers(
address=40069,
count=26,
slave=unit_id
)
if result.isError():
logger.error(f"Modbus error reading INV {inverter_id}: {result}")
return None
regs = result.registers
# Scala con SF (scale factors) SunSpec
ac_current_sf = self._sf(regs[10])
ac_voltage_sf = self._sf(regs[14])
ac_power_sf = self._sf(regs[17])
ac_freq_sf = self._sf(regs[19])
dc_current_sf = self._sf(regs[21])
dc_voltage_sf = self._sf(regs[23])
dc_power_sf = self._sf(regs[25])
# Leggi energia prodotta (registri 40093-40096)
energy_result = await client.read_holding_registers(
address=40093, count=4, slave=unit_id
)
energy_regs = energy_result.registers
energy_total_wh = ((energy_regs[0] << 16) | energy_regs[1]) * 1.0
# Leggi temperatura (registri 40103-40108)
temp_result = await client.read_holding_registers(
address=40103, count=3, slave=unit_id
)
temp_regs = temp_result.registers
ac_power_raw = self._signed(regs[16])
dc_power_w = self._signed(regs[24]) * (10 ** dc_power_sf)
efficiency = (ac_power_raw / dc_power_w * 100) if dc_power_w > 0 else 0.0
return InverterMetrics(
timestamp=datetime.now(timezone.utc).isoformat(),
plant_id=plant_id,
inverter_id=inverter_id,
ac_power_w=ac_power_raw * (10 ** ac_power_sf),
ac_voltage_l1_v=regs[11] * (10 ** ac_voltage_sf),
ac_voltage_l2_v=regs[12] * (10 ** ac_voltage_sf),
ac_voltage_l3_v=regs[13] * (10 ** ac_voltage_sf),
ac_current_a=(regs[7] + regs[8] + regs[9]) * (10 ** ac_current_sf),
ac_frequency_hz=regs[18] * (10 ** ac_freq_sf),
dc_voltage_v=self._signed(regs[22]) * (10 ** dc_voltage_sf),
dc_current_a=self._signed(regs[20]) * (10 ** dc_current_sf),
dc_power_w=dc_power_w,
energy_wh_total=energy_total_wh,
energy_wh_today=0.0, # da registro specifico vendor
temp_cabinet_c=self._signed(temp_regs[0]) * 0.01,
temp_heatsink_c=self._signed(temp_regs[1]) * 0.01,
operating_state=regs[0],
error_code=regs[1],
efficiency_pct=round(efficiency, 2),
)
except ModbusException as e:
logger.error(f"Modbus exception for {inverter_id}: {e}")
return None
def _signed(self, val: int) -> int:
"""Converte uint16 in int16 (signed)"""
return val if val < 32768 else val - 65536
def _sf(self, val: int) -> int:
"""Scale factor SunSpec: int16 nel range -10..10"""
s = self._signed(val)
return s if -10 <= s <= 10 else 0
async def publish_metrics(
self,
mqtt_client: aiomqtt.Client,
metrics: InverterMetrics
) -> None:
"""Pubblica metriche su MQTT con QoS 1"""
topic = f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/metrics"
payload = json.dumps(asdict(metrics), default=str)
await mqtt_client.publish(
topic=topic,
payload=payload.encode(),
qos=1,
retain=False,
)
# Pubblica anche stato con retain
status = {
"timestamp": metrics.timestamp,
"operating_state": metrics.operating_state,
"error_code": metrics.error_code,
"online": True,
}
await mqtt_client.publish(
topic=f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/status",
payload=json.dumps(status).encode(),
qos=1,
retain=True, # retain per status
)
async def run(self):
"""Loop principale del gateway"""
# Carica buffer da disco se esiste
await self._load_buffer()
# Will message: pubblica offline status se il gateway crolla
will = aiomqtt.Will(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": False, "reason": "unexpected_disconnect"}).encode(),
qos=1,
retain=True,
)
async with aiomqtt.Client(
hostname=self.mqtt_host,
port=self.mqtt_port,
username=self.mqtt_user,
password=self.mqtt_password,
will=will,
keepalive=60,
tls_params=aiomqtt.TLSParameters(
ca_certs='/etc/gateway/certs/ca.crt',
certfile='/etc/gateway/certs/gateway.crt',
keyfile='/etc/gateway/certs/gateway.key',
),
) as mqtt:
self._mqtt_connected = True
logger.info("MQTT connected")
# Pubblica online status con retain
await mqtt.publish(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": True}).encode(),
qos=1, retain=True,
)
# Svuota buffer offline
await self._flush_buffer(mqtt)
# Connetti a tutti gli inverter
for inv_cfg in self.inverters:
try:
self.clients[inv_cfg['id']] = await self.connect_inverter(inv_cfg)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
# Loop principale di polling
while True:
poll_start = time.monotonic()
tasks = [
self._poll_and_publish(inv_cfg, mqtt)
for inv_cfg in self.inverters
]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - poll_start
sleep_time = max(0, self.poll_interval - elapsed)
await asyncio.sleep(sleep_time)
async def _poll_and_publish(self, inv_cfg: dict, mqtt: aiomqtt.Client):
"""Polling singolo inverter e pubblicazione"""
inv_id = inv_cfg['id']
client = self.clients.get(inv_id)
if client is None or not client.connected:
try:
client = await self.connect_inverter(inv_cfg)
self.clients[inv_id] = client
except ConnectionError:
return
metrics = await self.read_sunspec_model103(
client, inv_cfg.get('unit_id', 1),
self.plant_id, inv_id
)
if metrics:
try:
await self.publish_metrics(mqtt, metrics)
except Exception as e:
logger.warning(f"MQTT publish failed, buffering: {e}")
self._buffer.append(asdict(metrics))
await self._save_buffer()
async def _load_buffer(self):
"""Carica buffer dal disco"""
try:
async with aiofiles.open(self.buffer_file) as f:
async for line in f:
self._buffer.append(json.loads(line))
logger.info(f"Loaded {len(self._buffer)} buffered messages")
except FileNotFoundError:
pass
async def _save_buffer(self):
"""Salva buffer su disco"""
async with aiofiles.open(self.buffer_file, 'w') as f:
for item in self._buffer:
await f.write(json.dumps(item) + '\n')
async def _flush_buffer(self, mqtt: aiomqtt.Client):
"""Pubblica messaggi bufferizzati"""
if not self._buffer:
return
logger.info(f"Flushing {len(self._buffer)} buffered messages")
published = []
for item in self._buffer:
try:
metrics = InverterMetrics(**item)
await self.publish_metrics(mqtt, metrics)
published.append(item)
except Exception as e:
logger.error(f"Failed to flush buffered message: {e}")
break
self._buffer = [i for i in self._buffer if i not in published]
await self._save_buffer()
if __name__ == '__main__':
config = {
'plant_id': 'PV001',
'poll_interval_sec': 5,
'mqtt': {
'host': '10.0.1.10',
'port': 8883,
'username': 'gateway_PV001',
'password': 'secret',
},
'inverters': [
{'id': 'INV001', 'host': '192.168.1.101', 'port': 502, 'unit_id': 1},
{'id': 'INV002', 'host': '192.168.1.102', 'port': 502, 'unit_id': 1},
# ... fino a INV200 per impianto da 10 MW
],
'buffer_file': '/var/lib/gateway/buffer.jsonl',
}
asyncio.run(ModbusMqttGateway(config).run())
Telegraf: MQTT Bridge → InfluxDB s Transformation Pipeline
Telegraf je sběrač dat společnosti InfluxData, napsaný v Go, s více než 300 pluginy. V našem zásobníku Telegraf odebírá témata MQTT, dekóduje užitečné zatížení JSON, aplikuje transformace přes procesor a zapisuje do InfluxDB. Verze 1.30+ nativně podporuje MQTT 5.0 a analýzu témat jako značky.
Kompletní konfigurace Telegrafu
# telegraf.conf - Configurazione completa per piattaforma IoT energetica
# ============================================================
# AGENT SETTINGS
# ============================================================
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "1s"
precision = "1ns"
debug = false
quiet = false
logtarget = "file"
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "50MB"
logfile_rotation_max_archives = 5
# ============================================================
# OUTPUT: InfluxDB 2.x / 3.x
# ============================================================
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "plant_metrics_raw"
timeout = "5s"
# Tag per routing - usato per bucket separati per plant
[outputs.influxdb_v2.tagpass]
data_type = ["inverter_metrics", "weather_metrics", "string_metrics"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "ev_metrics_raw"
timeout = "5s"
[outputs.influxdb_v2.tagpass]
data_type = ["ev_metrics"]
# Output di backup su file locale (per audit e recovery)
[[outputs.file]]
files = ["/var/log/telegraf/metrics.jsonl"]
rotation_interval = "1h"
rotation_max_size = "500MB"
data_format = "json"
[outputs.file.tagpass]
error_code = ["*"] # solo metriche con errori
# ============================================================
# INPUT: MQTT Consumer - Inverter Fotovoltaici
# ============================================================
[[inputs.mqtt_consumer]]
name_override = "pv_inverter"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/inverter/+/metrics"]
qos = 1
connection_timeout = "30s"
max_undelivered_messages = 5000
# Autenticazione mTLS
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-energy-01"
persistent_session = true # QoS 1+ richiede sessione persistente
# Parsing topic come tag
# topic: plant/PV001/inverter/INV001/metrics
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/inverter/+/metrics"
measurement = "_/plant_id/_/inverter_id/_"
tags = "_/plant_id/_/inverter_id/_"
# Estrae plant_id=PV001, inverter_id=INV001 come tag
# Formato payload
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l1_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l2_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l3_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_frequency_hz"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_wh_total"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_cabinet_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_heatsink_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "operating_state"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "error_code"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "efficiency_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "plant_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "inverter_id"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05.999999999Z07:00" # RFC3339Nano
# INPUT: MQTT Consumer - Stazioni Meteo
[[inputs.mqtt_consumer]]
name_override = "pv_weather"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/weather/+/metrics"]
qos = 0 # QoS 0 per dati meteo (perdita occasionale accettabile)
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-weather-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/weather/+/metrics"
tags = "_/plant_id/_/station_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "irradiance_wm2"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temperature_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "wind_speed_ms"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "humidity_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05Z07:00"
# INPUT: MQTT Consumer - Stazioni EV
[[inputs.mqtt_consumer]]
name_override = "ev_charger"
servers = ["ssl://mosquitto:8883"]
topics = ["ev/+/charger/+/connector/+/metrics"]
qos = 1
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-ev-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "ev/+/charger/+/connector/+/metrics"
tags = "_/site_id/_/charger_id/_/connector_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "power_kw"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_kwh"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "status"
type = "string"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "site_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "charger_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "connector_id"
# ============================================================
# PROCESSORS: Trasformazione dati
# ============================================================
# Aggiungi tag data_type per routing output
[[processors.converter]]
namepass = ["pv_inverter"]
[processors.converter.tags]
string = ["data_type"]
[[processors.override]]
namepass = ["pv_inverter"]
[processors.override.tags]
data_type = "inverter_metrics"
[[processors.override]]
namepass = ["ev_charger"]
[processors.override.tags]
data_type = "ev_metrics"
# Filtra metriche con valori anomali (sanity check)
[[processors.dedup]]
dedup_interval = "1s" # evita duplicati QoS 1
[[processors.enum]]
namepass = ["pv_inverter"]
[[processors.enum.mapping]]
field = "operating_state"
dest = "operating_state_str"
[processors.enum.mapping.value_mappings]
1 = "off"
2 = "sleeping"
3 = "starting"
4 = "mppt"
5 = "throttled"
6 = "shutting_down"
7 = "fault"
8 = "standby"
# ============================================================
# MONITORING: Telegraf self-monitoring
# ============================================================
[[inputs.internal]]
collect_memstats = true
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "telegraf_monitoring"
[outputs.influxdb_v2.namepass]
internal = ["*"]
InfluxDB: Time-Series Storage and Query for Energy
InfluxDB 2.x / 3.x Architektura pro energetické systémy
InfluxDB je referenční databáze časových řad pro energetický IoT. Verze 2.x (stále široce používaný v produkci v roce 2025) používá Flux jako dotazovací jazyk a zavádí koncept vědro s integrovanou politikou uchovávání. Verze 3.x, přepsaný v Rustu pomocí Apache Arrow a DataFusion, zavádí SQL jako primární jazyk a výrazně zlepšuje škálovatelnost pro vysokou mohutnost.
U systému 10 MW s 200 střídačmi vzorkování každých 5 sekund, objem dat a značné: každý střídač generuje přibližně 15 polí, celkem tedy 3 000 polí každých 5 sekund, neboli 216 000 datových bodů za minutu. Za jeden den je jich přibližně 311 milionů datové body. 30denní uchovávání vyžaduje přibližně 9,3 miliardy nezpracovaných datových bodů. Strategie downsamplingu je zásadní.
Počáteční nastavení: Skupiny a organizace
#!/bin/bash
# setup_influxdb.sh - Configurazione iniziale InfluxDB per piattaforma energetica
INFLUX_HOST="http://localhost:8086"
INFLUX_TOKEN="my-super-secret-admin-token"
INFLUX_ORG="energy-corp"
# Setup iniziale (solo prima volta)
influx setup \
--host "$INFLUX_HOST" \
--username admin \
--password "SecurePassword123!" \
--org "$INFLUX_ORG" \
--bucket plant_metrics_raw \
--retention 720h \ # 30 giorni dati grezzi
--token "$INFLUX_TOKEN" \
--force
# Bucket dati grezzi PV (alta risoluzione: 30 giorni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_raw" \
--retention "720h" \
--shard-group-duration "24h" # shard giornalieri per query efficienti
# Bucket downsampled 1 ora (retention 1 anno)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1h" \
--retention "8760h" \
--shard-group-duration "168h" # shard settimanali
# Bucket downsampled 1 giorno (retention 5 anni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1d" \
--retention "43800h" \ # ~5 anni
--shard-group-duration "720h" # shard mensili
# Bucket per alerting e eventi
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "energy_alerts" \
--retention "8760h" # 1 anno per audit
# Bucket per stazioni EV
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "ev_metrics_raw" \
--retention "720h"
# Token con permessi limitati per Telegraf (write only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Telegraf write-only token" \
--write-bucket plant_metrics_raw \
--write-bucket ev_metrics_raw \
--write-bucket telegraf_monitoring
# Token per Grafana (read only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Grafana read-only token" \
--read-bucket plant_metrics_raw \
--read-bucket plant_metrics_1h \
--read-bucket plant_metrics_1d \
--read-bucket ev_metrics_raw \
--read-bucket energy_alerts
echo "InfluxDB setup completato"
Úkoly toku: převzorkování a automatické upozornění
Flux Tasks jsou naplánované úlohy, které Telegraf pravidelně spouští v rámci InfluxDB. Používáme je ke třem účelům: automatické převzorkování nezpracovaných dat, výpočet KPI agregátů a detekce anomálií se zápisem do bucket výstrah.
// task_downsample_1h.flux
// Task: Downsampling dati inverter da 5s a 1h
// Schedulato ogni ora, processa ultima ora
option task = {
name: "downsample_pv_inverter_1h",
every: 1h,
offset: 5m, // aspetta 5 min per assicurarsi che tutti i dati siano arrivati
}
from(bucket: "plant_metrics_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) =>
r._field == "ac_power_w" or
r._field == "dc_power_w" or
r._field == "energy_wh_total" or
r._field == "ac_voltage_l1_v" or
r._field == "temp_cabinet_c" or
r._field == "efficiency_pct"
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables
|> mean(column: column), // media oraria
createEmpty: false
)
// Aggiungi statistiche aggiuntive per power
|> map(fn: (r) => ({r with _measurement: "pv_inverter_1h"}))
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_plant_kpi.flux
// Task: Calcolo KPI giornalieri per impianto
// Produccion, efficienza, performance ratio
option task = {
name: "calculate_plant_daily_kpi",
every: 15m,
}
// Potenza attiva totale per impianto (somma tutti inverter)
totalPower = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "total_ac_power_w",
}))
// Energia prodotta totale (max energy_wh_total - min energy_wh_total per periodo)
energyDelta = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "energy_wh_total")
|> difference()
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "energy_delta_wh",
}))
// Temperatura media inverter
avgTemp = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> mean()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "avg_inverter_temp_c",
}))
union(tables: [totalPower, energyDelta, avgTemp])
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_thermal_alert.flux
// Task: Rilevamento surriscaldamento inverter
// Alert se temperatura > 75°C per più di 5 minuti
option task = {
name: "thermal_alert_inverter",
every: 1m,
}
threshold = 75.0
from(bucket: "plant_metrics_raw")
|> range(start: -6m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> filter(fn: (r) => r._value > threshold)
// Conta quanti valori sopra soglia negli ultimi 6 min (= almeno 5 min)
|> aggregateWindow(every: 6m, fn: count, createEmpty: false)
|> filter(fn: (r) => r._value >= 60) // 60 campioni * 5sec = 5 min
|> map(fn: (r) => ({r with
_measurement: "energy_alert",
_field: "alert_type",
_value: "thermal_overtemperature",
severity: "high",
description: "Inverter temperature exceeds " + string(v: threshold) + "°C for > 5 minutes",
}))
|> to(bucket: "energy_alerts", org: "energy-corp")
Query Flux pro fotovoltaickou analýzu
// query_production_analysis.flux
// Analisi produzione giornaliera per impianto PV001
// Confronto con irraggiamento (Performance Ratio)
import "math"
// Produzione AC ogni 5 minuti
production = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
|> group(columns: ["plant_id"])
|> sum() // somma tutti gli inverter per potenza totale impianto
// Irraggiamento dalla stazione meteo
irradiance = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// Join per calcolo Performance Ratio
// PR = (energia prodotta / (irraggiamento * superficie)) * 100
join(
tables: {production: production, irradiance: irradiance},
on: ["_time", "plant_id"]
)
// PV001: 10 MW peak, superficie pannelli ~50.000 m2
|> map(fn: (r) => ({r with
performance_ratio: if r.irradiance_irradiance_wm2 > 50.0 then
(r.production__value / (r.irradiance_irradiance_wm2 * 50000.0)) * 100.0
else
0.0,
}))
|> keep(columns: ["_time", "production__value", "irradiance_irradiance_wm2", "performance_ratio"])
Docker Compose: Kompletní produkční zásobník
Následující Docker Compose je výchozím bodem pro produkční nasazení plný zásobník. Každá služba má kontrolu stavu, zásady restartování a konfigurace svazků trvalé a izolované sítě pro bezpečnost.
# docker-compose.yml
# Stack IoT Energetico: Mosquitto + Telegraf + InfluxDB + Grafana
# Produzione-ready con TLS, health checks e persistent volumes
version: '3.9'
networks:
iot_network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
# Network separata per accesso esterno (solo Grafana e Mosquitto esposti)
external_network:
driver: bridge
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
influxdb_config:
grafana_data:
grafana_provisioning:
telegraf_buffer:
services:
# ============================================================
# MOSQUITTO MQTT BROKER
# ============================================================
mosquitto:
image: eclipse-mosquitto:2.0.20
container_name: mosquitto
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "1883:1883" # MQTT plain (solo per sviluppo/testing locale)
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket over TLS
volumes:
- ./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./config/mosquitto/passwd:/mosquitto/config/passwd:ro
- ./config/mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/etc/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
healthcheck:
test: ["CMD", "mosquitto_pub", "-h", "localhost", "-p", "1883",
"-t", "health", "-m", "ping", "--quiet"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 512m
cpus: '1.0'
# ============================================================
# INFLUXDB 2.7 (stable production)
# ============================================================
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
networks:
- iot_network
ports:
- "8086:8086" # solo su rete interna, proxy via Nginx se necessario
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: {{ "$INFLUXDB_ADMIN_PASSWORD" }}
DOCKER_INFLUXDB_INIT_ORG: energy-corp
DOCKER_INFLUXDB_INIT_BUCKET: plant_metrics_raw
DOCKER_INFLUXDB_INIT_RETENTION: 720h
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: {{ "$INFLUXDB_TOKEN" }}
# Performance tuning
INFLUXD_STORAGE_CACHE_MAX_MEMORY_SIZE: 1073741824 # 1GB cache
INFLUXD_STORAGE_COMPACT_THROUGHPUT_BURST: 50331648 # 48MB/s compaction
INFLUXD_HTTP_MAX_BODY_SIZE: 104857600 # 100MB max body
INFLUXD_QUERY_MEMORY_BYTES: 2147483648 # 2GB query memory limit
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
limits:
memory: 4g
cpus: '2.0'
# ============================================================
# TELEGRAF
# ============================================================
telegraf:
image: telegraf:1.34-alpine
container_name: telegraf
restart: unless-stopped
networks:
- iot_network
- external_network # per raggiungere MQTT su rete esterna
depends_on:
influxdb:
condition: service_healthy
mosquitto:
condition: service_healthy
environment:
INFLUXDB_TOKEN: {{ "$INFLUXDB_TOKEN" }}
MQTT_PASSWORD: {{ "$MQTT_PASSWORD" }}
HOSTNAME: telegraf-energy-01
volumes:
- ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
- telegraf_buffer:/var/lib/telegraf
- /var/log/telegraf:/var/log/telegraf
healthcheck:
test: ["CMD", "telegraf", "--test", "--config", "/etc/telegraf/telegraf.conf"]
interval: 60s
timeout: 30s
retries: 3
deploy:
resources:
limits:
memory: 1g
cpus: '1.0'
# ============================================================
# GRAFANA
# ============================================================
grafana:
image: grafana/grafana-oss:11.4.0
container_name: grafana
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: {{ "$GRAFANA_ADMIN_PASSWORD" }}
GF_SECURITY_SECRET_KEY: {{ "$GRAFANA_SECRET_KEY" }}
GF_SERVER_ROOT_URL: "https://grafana.energy-corp.it"
GF_SERVER_DOMAIN: "grafana.energy-corp.it"
# Analytics disabilitate per privacy
GF_ANALYTICS_REPORTING_ENABLED: "false"
GF_ANALYTICS_CHECK_FOR_UPDATES: "false"
# SMTP per alerting
GF_SMTP_ENABLED: "true"
GF_SMTP_HOST: {{ "$SMTP_HOST" }}
GF_SMTP_USER: {{ "$SMTP_USER" }}
GF_SMTP_PASSWORD: {{ "$SMTP_PASSWORD" }}
GF_SMTP_FROM_ADDRESS: "alerts@energy-corp.it"
# InfluxDB datasource
GF_DATASOURCES_DEFAULT: InfluxDB
INFLUXDB_TOKEN_GRAFANA: {{ "$INFLUXDB_TOKEN_GRAFANA" }}
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning:ro
- ./config/grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
influxdb:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3000/api/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
memory: 512m
cpus: '0.5'
# ============================================================
# NGINX REVERSE PROXY (opzionale ma raccomandato)
# ============================================================
nginx:
image: nginx:1.27-alpine
container_name: nginx
restart: unless-stopped
networks:
- external_network
ports:
- "80:80"
- "443:443"
volumes:
- ./config/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
- /var/log/nginx:/var/log/nginx
depends_on:
- grafana
- mosquitto
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
Automatické poskytování Grafana
Grafana podporuje automatické poskytování datových zdrojů a dashboardů prostřednictvím souborů YAML.
To je zásadní pro kontejnerová prostředí, kde jsou kontejnery znovu vytvářeny.
Soubor config/grafana/provisioning/datasources/influxdb.yml musí obsahovat
konfiguraci zdroje dat InfluxDB, zatímco do něj vstupují řídicí panely ve formátu JSON
config/grafana/dashboards/.
# config/grafana/provisioning/datasources/influxdb.yml
apiVersion: 1
datasources:
- name: InfluxDB-Raw
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_raw
tlsSkipVerify: false
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-1h
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_1h
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-Alerts
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: energy_alerts
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
Grafana: Dashboard and Alerting for Photovoltaic System
Struktura palubní desky FV systému
Efektivní přístrojová deska pro fotovoltaický systém musí odpovídat na provozní otázky za méně než 3 sekundy vizuálního čtení. Doporučená struktura je uspořádána do řádků:
- Řádek 1 – Denní KPI: 4 "stat" panely s dnešní produkcí (kWh), aktuální výkon (kW), výkonový poměr (%), počet poruchových měničů. aktualizace z 30. let.
- Řádek 2 – Dočasná výroba: Plošný graf s celkovým střídavým výkonem za posledních 24 hodin, superponované na normalizované ozáření. Dotaz InfluxDB-Raw s 5minutové souhrnné okno.
- Řádek 3 – Tepelná mapa měniče: Heatmap s osou X = čas, osou Y = inverter_id, hodnota = AC výkon. Umožňuje vizuálně identifikovat měniče, které produkují méně. Aktualizace za 1 minutu.
- Řádek 4 – Teplota a alarmy: Graf měřidla pro maximální teplotu měniče, panel tabulky s posledními 50 alarmy z bucketu energy_alerts seřazených podle časového razítka.
- Řádek 5 – Efektivita a kvalita sítě: Účinnost rozptylového grafu vs teplota, časové řady pro síťovou frekvenci a fázová napětí.
Pokročilé upozornění: Grafana + PagerDuty + Slack
# config/grafana/provisioning/alerting/rules.yml
apiVersion: 1
groups:
- name: "PV Plant Alerts"
folder: "Energy Alerts"
interval: 1m
rules:
# Alert: Produzione impianto drasticamente ridotta
- uid: pv001-production-drop
title: "PV001 - Produzione ridotta >30%"
condition: C
data:
# Query A: Produzione attuale (15 min rolling mean)
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> mean()
|> sum()
# Query B: Irraggiamento attuale (per normalizzare)
- refId: B
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> mean()
# Condition C: se irraggiamento > 200 W/m2 (giorno chiaro)
# e produzione normalizzata < 70% del teorico
- refId: C
datasourceUid: __expr__
model:
type: math
expression: "$A / ($B * 50000) < 0.70 and $B > 200"
for: 10m # alert solo se condizione persiste 10 min
labels:
severity: "warning"
plant_id: "PV001"
category: "production"
annotations:
summary: "PV001: produzione sotto il 70% del teorico"
description: |
Produzione attuale: {{ $A }} W
Irraggiamento: {{ $B }} W/m2
Performance attesa: > 70%
runbook_url: "https://wiki.energy-corp.it/runbook/pv-production-drop"
# Alert: Inverter in fault
- uid: pv001-inverter-fault
title: "PV001 - Inverter Fault Rilevato"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "operating_state")
|> filter(fn: (r) => r._value == 7) // stato fault
|> count()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [0]
type: gt
for: 1m
labels:
severity: "critical"
plant_id: "PV001"
annotations:
summary: "Inverter in stato FAULT rilevato"
# Alert: Temperatura critica
- uid: pv001-thermal-critical
title: "PV001 - Temperatura Inverter Critica"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> max()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [85]
type: gt
for: 5m
labels:
severity: "critical"
category: "thermal"
# Contact points
contactPoints:
- name: "PagerDuty Critical"
receivers:
- uid: pagerduty-critical
type: pagerduty
settings:
integrationKey: $PAGERDUTY_INTEGRATION_KEY
severity: critical
class: "EnergyAlerts"
component: "PV Plant"
- name: "Slack Operations"
receivers:
- uid: slack-ops
type: slack
settings:
url: $SLACK_WEBHOOK_URL
channel: "#energy-ops-alerts"
username: "Grafana Alert Bot"
iconEmoji: ":zap:"
title: "{{ template \"slack.title\" . }}"
text: "{{ template \"slack.message\" . }}"
# Routing
policies:
- receiver: "Slack Operations"
group_by: ["plant_id", "severity"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- receiver: "PagerDuty Critical"
matchers:
- "severity = critical"
continue: true # invia anche a Slack
Zabezpečení: TLS, Autentizace a Segmentace sítě
Zabezpečení energetické a kritické platformy IoT: produkční data jsou citlivé obchodní informace a možnost posílat příkazy do střídačů musí být chráněny před neoprávněným přístupem. Strategie obrany do hloubky je rozdělena do čtyř úrovní.
Generování certifikátů TLS pomocí skriptu
#!/bin/bash
# generate_certs.sh - Genera CA e certificati per mutual TLS
set -euo pipefail
CERTS_DIR="./certs"
mkdir -p "$CERTS_DIR"
# 1. CA (Certificate Authority) - Root of trust
openssl genrsa -out "$CERTS_DIR/ca.key" 4096
openssl req -x509 -new -nodes \
-key "$CERTS_DIR/ca.key" \
-sha256 -days 3650 \
-out "$CERTS_DIR/ca.crt" \
-subj "/C=IT/ST=Puglia/L=Bari/O=EnergyCorp/CN=Energy IoT CA"
# 2. Server certificate (Mosquitto)
openssl genrsa -out "$CERTS_DIR/server.key" 2048
openssl req -new \
-key "$CERTS_DIR/server.key" \
-out "$CERTS_DIR/server.csr" \
-subj "/C=IT/O=EnergyCorp/CN=mosquitto"
# SAN per tutti i possibili hostname
cat > "$CERTS_DIR/server-ext.cnf" <<EOF
[req]
req_extensions = v3_req
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = mosquitto
DNS.2 = mqtt.energy-corp.it
DNS.3 = localhost
IP.1 = 127.0.0.1
IP.2 = 172.20.0.10
EOF
openssl x509 -req \
-in "$CERTS_DIR/server.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/server.crt" \
-days 825 \
-sha256 \
-extfile "$CERTS_DIR/server-ext.cnf" \
-extensions v3_req
# 3. Client certificate per Telegraf
openssl genrsa -out "$CERTS_DIR/telegraf.key" 2048
openssl req -new \
-key "$CERTS_DIR/telegraf.key" \
-out "$CERTS_DIR/telegraf.csr" \
-subj "/C=IT/O=EnergyCorp/CN=telegraf"
openssl x509 -req \
-in "$CERTS_DIR/telegraf.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/telegraf.crt" \
-days 825 -sha256
# 4. Client certificate per gateway PV001
openssl genrsa -out "$CERTS_DIR/gateway_PV001.key" 2048
openssl req -new \
-key "$CERTS_DIR/gateway_PV001.key" \
-out "$CERTS_DIR/gateway_PV001.csr" \
-subj "/C=IT/O=EnergyCorp/CN=gateway_PV001"
openssl x509 -req \
-in "$CERTS_DIR/gateway_PV001.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/gateway_PV001.crt" \
-days 365 -sha256 # 1 anno per gateway (rotazione annuale)
echo "Certificati generati in $CERTS_DIR/"
echo "CA: ca.crt"
echo "Server (Mosquitto): server.crt + server.key"
echo "Client Telegraf: telegraf.crt + telegraf.key"
echo "Client Gateway PV001: gateway_PV001.crt + gateway_PV001.key"
Bezpečnost: Povinné postupy ve výrobě
- Vzájemné TLS vždy: Klient i server musí předložit certifikát. Zabraňuje připojení z neautorizovaných bran, i když znají uživatelské jméno/heslo.
- Tajemství prostřednictvím proměnných prostředí nebo správce tajemství: Nikdy pevný kód InfluxDB tokeny, hesla MQTT nebo klíče API ve vašem kódu nebo konfiguračních souborech oddaný git. Použijte Docker Secrets, HashiCorp Vault nebo AWS Secrets Manager.
- Segmentace sítě: MQTT broker, InfluxDB a Telegraf nemusí nikdy nezveřejňujte přímo na internetu. Grafana pouze přes HTTPS reverzní proxy. Zprostředkovatel MQTT pro okrajové brány lze zpřístupnit prostřednictvím vyhrazené VPN.
- Rotace certifikátu: Certifikáty brány jsou platné 1 rok (365 dní). Nastavte budíky Grafana tak, aby vypršely 30 dní předem.
- Protokoly auditu MQTT: Povolit protokolování všech připojení a témat v Mosquitto. Integrace se SIEM pro detekci anomálního přístupu.
Výkon: Srovnávání a optimalizace pro vysoké měřítko
Kapacita zásobníku pro systém 10 MW
Fotovoltaický park o výkonu 10 MW má obvykle 200 střídačů po 50 kW. Při dotazování každých 5 sekund je objem zpráv MQTT následující:
| Komponent | Metriky | Objem |
|---|---|---|
| 200 měničů x 5s | 15 polí/zpráva | 40 msg/s, 600 datových bodů/s |
| 10 meteostanic x 10s | 8 polí/zpráva | 1 msg/s, 8 datových bodů/s |
| Celkem rostlin x 15min | Tavidlové agregáty | Počítáno v DB |
| Celkový | - | ~650 datových bodů/s, ~56 milionů datových bodů/den |
| Skladování v surovém stavu (30 dní) | ~1,68 miliardy datových bodů | ~8-15 GB (s kompresí InfluxDB) |
Tento svazek lze v hojné míře spravovat pomocí jediného uzlu InfluxDB 2.7 8 GB RAM a NVMe SSD. EMQX na VM se 4 vCPU a 8 GB RAM zvládne 100 000 připojení konkurenti s propustností 1M zpráv/s. Pro jednu instalaci od 200 měničů, jediného Mosquitto brokera na Raspberry Pi 4 a více než dost (testy v reálném světě: 10K msg/s, 500 MB RAM).
Srovnávací skript MQTT
#!/usr/bin/env python3
"""
mqtt_benchmark.py
Benchmark del broker MQTT per simulare 200 inverter
Verifica throughput, latenza e affidabilità
"""
import asyncio
import json
import time
import random
import statistics
from datetime import datetime, timezone
import aiomqtt
async def simulate_inverter(
plant_id: str,
inverter_id: str,
broker_host: str,
broker_port: int,
duration_sec: int,
poll_interval: float = 5.0
) -> dict:
"""Simula un inverter che pubblica metriche su MQTT"""
messages_sent = 0
latencies = []
errors = 0
start_time = time.monotonic()
async with aiomqtt.Client(
hostname=broker_host,
port=broker_port,
client_id=f"bench_{plant_id}_{inverter_id}",
keepalive=30,
) as client:
while time.monotonic() - start_time < duration_sec:
# Simula metriche realistiche
base_power = 50000 * random.uniform(0.3, 1.0) # 0-50 kW
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"plant_id": plant_id,
"inverter_id": inverter_id,
"ac_power_w": base_power,
"ac_voltage_l1_v": random.uniform(225, 235),
"dc_voltage_v": random.uniform(550, 650),
"dc_current_a": base_power / random.uniform(550, 650),
"temp_cabinet_c": random.uniform(35, 65),
"efficiency_pct": random.uniform(94, 98.5),
"operating_state": 4, # MPPT
"error_code": 0,
"energy_wh_total": messages_sent * base_power * poll_interval / 3600,
}
topic = f"plant/{plant_id}/inverter/{inverter_id}/metrics"
t_send = time.monotonic()
try:
await client.publish(
topic=topic,
payload=json.dumps(payload).encode(),
qos=1,
)
latencies.append((time.monotonic() - t_send) * 1000) # ms
messages_sent += 1
except Exception:
errors += 1
await asyncio.sleep(poll_interval)
return {
"inverter_id": inverter_id,
"messages_sent": messages_sent,
"errors": errors,
"avg_latency_ms": statistics.mean(latencies) if latencies else 0,
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
async def run_benchmark(
num_inverters: int = 200,
duration_sec: int = 60,
broker_host: str = "localhost",
broker_port: int = 1883,
) -> None:
"""Esegui benchmark con N inverter simulati"""
print(f"Avvio benchmark: {num_inverters} inverter per {duration_sec}s")
print(f"Broker: {broker_host}:{broker_port}")
print("-" * 60)
start = time.monotonic()
tasks = [
simulate_inverter(
plant_id="BENCH001",
inverter_id=f"INV{i:03d}",
broker_host=broker_host,
broker_port=broker_port,
duration_sec=duration_sec,
poll_interval=5.0,
)
for i in range(1, num_inverters + 1)
]
results = await asyncio.gather(*tasks)
elapsed = time.monotonic() - start
# Aggregazione risultati
total_messages = sum(r["messages_sent"] for r in results)
total_errors = sum(r["errors"] for r in results)
all_latencies_avg = [r["avg_latency_ms"] for r in results]
all_latencies_p99 = [r["p99_latency_ms"] for r in results]
print(f"Risultati benchmark:")
print(f" Durata: {elapsed:.1f}s")
print(f" Messaggi inviati: {total_messages:,}")
print(f" Errori: {total_errors} ({total_errors/total_messages*100:.2f}%)")
print(f" Throughput: {total_messages/elapsed:.1f} msg/sec")
print(f" Latenza media: {statistics.mean(all_latencies_avg):.2f} ms")
print(f" Latenza P99: {statistics.mean(all_latencies_p99):.2f} ms")
print(f" Inverter senza errori: {sum(1 for r in results if r['errors'] == 0)}/{num_inverters}")
if __name__ == "__main__":
asyncio.run(run_benchmark(
num_inverters=200,
duration_sec=120,
broker_host="localhost",
broker_port=1883,
))
Případová studie: Monitorování fotovoltaického parku 10 MW
Tato případová studie popisuje reálnou implementaci energetické platformy IoT pro fotovoltaický park z 10 MW v jižní Itálii s 200 střídačů SMA Sunny Tripower o výkonu 50 kW a 8 stanic pro měření počasí.
Počáteční nastavení a výzvy
Stránky jsou na síti 4G WAN s garantovanou šířkou pásma 10 Mbps a proměnnou latencí mezi 30 a 200 ms. Každý střídač komunikuje přes Modbus TCP v místní síti LAN (192.168.x.x). průmyslový spínač. Okrajovou bránou je Raspberry Pi 4 (8 GB RAM) s 256 GB SSD pro místní ukládání do vyrovnávací paměti, připojený k WAN přes průmyslový 4G router s vyhrazenou datovou SIM kartou.
| Parametr | Hodnota |
|---|---|
| Napájení systému | Špičkový výkon 10,2 MW (SMA STP50-US-40) |
| Číslo měniče | 200 (SMA Sunny Tripower, 50 kW) |
| Meteorologické stanice | 8 (záření, T, HR, vítr) |
| Okrajové brány | Raspberry Pi 4 8GB + 256GB SSD |
| Interval dotazování měniče | 5 sekund |
| Interval dotazování počasí | 10 sekund |
| Objem zpráv MQTT | ~41 msg/s (200 inv x 1/5s + 8 počasí x 1/10s) |
| Použitá šířka pásma WAN | ~180 KB/s (2 % z dostupných 10 Mb/s) |
| Broker MQTT | EMQX 5.8 na cloudovém VM (4 vCPU, 8 GB) |
| InfluxDB | 2.7.10 na vyhrazeném virtuálním počítači (8 vCPU, 32 GB RAM, NVMe) |
Výsledky po 6 měsících provozu
- Dostupnost platformy: 99,94 % (celkový výpadek: 2,6 hodiny za 6 měsíců, vše pro plánovanou údržbu cloudového serveru)
- Data ztracená v důsledku odpojení WAN: 0,002 % (díky místnímu ukládání do vyrovnávací paměti brány a do trvalé relace MQTT)
- Automaticky detekovaná upozornění na poruchu měniče: 47 událostí za 6 měsíců, MTTR (Mean Time To Repair) snížený z 8 hodin na 2,3 hodiny díky okamžitému upozornění na PagerDuty (oproti ručnímu zjišťování další den)
- Průměrný poměr výkonu: 82,3 % oproti 79,1 % v předchozím období (bez monitorování v reálném čase): +3,2 procentního bodu = +320 dalších MWh/rok
- Úspory z prediktivní údržby: Detekce 12 střídačů s časná degradace (teplota chladiče +8°C oproti průměru) před poruchou prohlásil. Preventivní zásah odhadovaný na 45 000 EUR v úsporách oproti nouzové výměně.
- Úložiště InfluxDB po 6 měsících: 18,7 GB pro nezpracovaná data (po 30 dnech), 4,2 GB pro 1h agregáty (6 měsíců), 0,8 GB pro 1d agregáty (celkem 180 dní)
Ponaučení: O vyrovnávací paměti nelze vyjednávat
Během prvního měsíce provozu došlo ke 3 přerušením 4G připojení prodloužena (vždy 2-6 hodin) kvůli problémům telefonního operátora. Bez lokální vyrovnávací paměť na SSD Raspberry Pi, přišli bychom o všechna měření během těch oken. Díky vyrovnávací paměti 64 GB a logice ukládání a předávání pokaždé, když je připojení zpět, všechny nashromážděné zprávy byly publikováno na MQTT v chronologickém pořadí s původními časovými razítky a InfluxDB správně je přijal pro vložení mimo pořadí.
Osvědčené postupy a anti-vzorce
Nejlepší postupy
- Časové razítko v užitečné zátěži, nikoli v zprostředkovateli: Brána musí vždy obsahovat přesné časové razítko zachycení v užitečné zátěži JSON, nespoléhejte na časové razítko příjem brokera MQTT. V případě offline ukládání do vyrovnávací paměti přicházejí zprávy pozdě ale údaje jsou časově správné. InfluxDB přijímá vložení mimo pořadí.
-
Oddělte telemetrické kanály od příkazových kanálů: Témata telemetrie
(
plant/+/inverter/+/metrics) a příkazové (plant/+/commands/#) musí být v samostatných jmenných prostorech s odlišnými seznamy ACL. Příkazy vyžadují QoS 2e přísnější autentizace. - Agresivní downsampling pro historická data: Nezpracovaná data za 5 sekund jsou cenné pro analýzu posmrtných selhání, ale pro historické trendy agregáty stačí v 15 minutách nebo časech. Okamžitě implementujte úlohy downsampling a uchovávání Flux.
- Monitorování stavu brány: Použijte srdeční tep MQTT brány (publikovat každých 60 let na vyhrazené téma) a sledovat v Grafaně. Chybí-li tep na 3 období, kritické upozornění: může to být brána, která spadla, ne problém s makléřem.
- Systémové metriky brány v InfluxDB: Telegraf na okrajové bráně může shromažďovat metriky CPU, RAM, teplota CPU, dostupné místo na disku pro vyrovnávací paměť, a publikovat je na MQTT. Je důležité vědět, zda je Raspberry Pi ve stresu.
Anti-vzory, kterým je třeba se vyhnout
Kritické anti-vzory
-
Témata jsou příliš podrobná bez agregace: Uvolněte každý singl
Modbus registr na samostatné téma (např.
plant/PV001/INV001/register/40001) generuje u brokera desítky tisíc témat a nesmírnou režii metadat. Vždy agregujte související protokoly do jedné datové části JSON na zařízení. - QoS 2 pro vysokofrekvenční telemetrii: Čtyřstranné podání ruky QoS 2 zčtyřnásobuje počet řídicích zpráv. Pro data 1–5 Hz použijte QoS 1 (duplikáty zpracovává InfluxDB prostřednictvím idempotence časového razítka). QoS 2 pouze pro příkazy.
-
Zůstaňte v tématu telemetrie: Zachování je užitečné pro témata stavu
(poslední hodnota je významná), ale na témata vysokofrekvenční telemetrie
zprostředkovatel musí aktualizovat uchovávanou zprávu při každé publikaci s režií
úložiště a CPU. Používejte zachovat pouze pro
/statuse/config. -
Neomezená mohutnost v InfluxDB: Vyhněte se značkám s vysokými hodnotami
mohutnost (např. použití
session_idjako značku pro stanice EV vytvořte nový série pro každou relaci). Použijte místo toho jako pole, nikoli jako značku. - Žádné omezení sazby na InfluxDB: Bez omezení souběžných dotazů, jediný těžký dotaz Grafana (např. export 6 měsíců nezpracovaných dat) může zasytit DB paměti a způsobit zabijáka OOM. Vždy konfigurujte limity dotazů.
Zdroje a technické reference
- Specifikace MQTT 5.0 (OASIS): Oficiální specifikace protokolu MQTT 5.0 podrobně popisuje QoS, zachovejte, budou zprávy, správa relací a vlastnosti uživatele. Základ pro implementace podnik.
- Spotřebitelský plugin Telegraf MQTT: Oficiální dokumentace InfluxData popisuje všechny konfigurační parametry zásuvného modulu mqtt_consumer, včetně analýzy tématu a podporovaných formátů dat. URL: docs.influxdata.com/telegraf
- Dokumentace InfluxDB Flux: Jazyk Flux je mocný, ale má křivku učení. Dokumentace oficiální obsahuje reference kompletní se všemi funkcemi, včetně agregovaného okna, připojit, otočit a zmapovat.
- SunSpec Alliance: Norma SunSpec definuje mapu registru Modbus pro fotovoltaické střídače, baterie a měřiče. Dostupné na sunspec.org. Zajišťuje interoperabilitu mezi zařízení od různých výrobců.
- Dokumentace EMQX: Dokumentace EMQX obsahuje průvodce pro klastrování, moduly pravidel, zabezpečení a integrace s externími úložnými systémy včetně InfluxDB.
- Upozornění Grafana: Dokumentace Grafana pro jednotný výstražný systém (v9+) obsahuje příručky pro poskytování prostřednictvím YAML, konfigurace a směrování kontaktních bodů.
- Zabezpečení IIoT: IEC 62443: Norma IEC 62443 definuje bezpečnostní požadavky na automatizační systémy a průmyslové řízení, včetně energetických systémů. Povinné pro certifikace NIS2 (implementováno v Itálii legislativním nařízením 138/2024).
Závěry a další kroky
Vybudovali jsme kompletní energetickou platformu IoT připravenou k výrobě: z mostu Modbus-MQTT s lokálním ukládáním do vyrovnávací paměti až po vícekanálové upozornění přes Grafana, PagerDuty a Slack. Zásobník MQTT + Telegraf + InfluxDB + Grafana je testován, zdokumentován a škálovatelný: spravuje 650 datových bodů/sekundu pro 10 MW systém s bohatou rezervou as EMQX v clusteringu a více uzlech InfluxDB se lineárně škáluje podle nasazení na úrovni utility stovky MW.
Čísla případových studií hovoří sama za sebe: dostupnost 99,94 %, snížení MTTR od 8 za 2,3 hodiny, +3,2 bodů Performance Ratio a úspora 45 000 EUR na údržbu prediktivní za 6 měsíců. Monitorování v reálném čase není pro energetické systémy luxusem obnovitelné zdroje: a přímý multiplikátor výnosů a doby životnosti aktiv.
Další článek ze série EnergyTech se věnuje tématu uhlíku účetnictví: jak vytvořit platformy ESG, které počítají ušetřené emise, zelené certifikáty a uhlíkový kredit počínaje shromážděnými údaji o výrobě z platformy IoT, kterou jsme právě vytvořili.
Související články
- Řada EnergyTech: Tento článek je součástí vyhrazené série na softwarové inženýrství pro obnovitelné zdroje energie. Předchozí článek o Obrázek doplňuje EV Load Balancing a následující o Carbon Accounting softwarové infrastruktury pro energetický přechod.
- Řada MLOps: Implementováno převzorkování a detekce anomálií s Fluxem jsou pouze prvním krokem. Řada MLOps popisuje, jak integrovat modely ML (LSTM pro prognózování produkce, Isolation Forest pro detekci anomálií) s data pocházející ze stejné platformy InfluxDB.
- Data & AI Business Series: Architektura MQTT-InfluxDB je a Příklad datového jezera IoT. Série Data & AI Business se ponoří do toho, jak na to Přeneste tato data časových řad do kanálů podnikové analýzy pomocí DBT a Airflow.
- Řada PostgreSQL AI: Pro složitější analýzy vyžadující PŘIPOJTE se k relačním datům (např. smluvní data, kmenová data závodu, historie údržba), integrace InfluxDB-PostgreSQL prostřednictvím Foreign Data Wrapper a vystupoval v řadě PostgreSQL AI.







