Od MQTT do InfluxDB: platforma IoT energii w czasie rzeczywistym
Globalny rynek IoT energii przemysłowej przekracza 22 miliardy dolarów w 2025 roku i rośnie z CAGR na poziomie 19,8% do 54 miliardów do 2030 r. Każdy system fotowoltaiczny o mocy 10 MW generuje poza 4 miliony pomiarów dziennie z falowników, czujników promieniowania, mierniki ciągowe i rejestratory danych meteorologicznych. Każda farma wiatrowa dodaje dane wibracyjne, m.in moment obrotowy i temperatura z gondoli. Każda stacja ładowania pojazdów elektrycznych publikuje status połączenia, moc wyjściową i stan złącza co 30 sekund.
Wyzwaniem nie jest gromadzenie tych danych, lecz robienie tego w sposób niezawodny, skalowalny i bez opóźnień subsekundowe, zachowanie historyzacji na przestrzeni lat i możliwość stawiania zapytań analitycznych w czasie rzeczywistym. Protokół MQTT, stworzony dla sieci przemysłowych o ograniczonej przepustowości, stał się wspólny język energetycznego IIoT. InfluxDB i rozwiązanie do przechowywania szeregów czasowych najbardziej przyjęte dla tej domeny. Razem z Telegrafem jako pomostem i Grafaną jako warstwą wizualizacji, tworzą stos, który zarządza od małych systemów domowych po media wielogigawatowe.
W tym artykule zbudowano kompletną platformę energetyczną IoT na podstawie architektury kompleksowo, poprzez szczegółową konfigurację każdego komponentu, aż do Dockera Skład roboczy i studium rzeczywistego przypadku dotyczące parku fotowoltaicznego o mocy 10 MW z 200 inwerterami. Każda sekcja zawiera przetestowany kod i strategie produkcyjne.
Czego dowiesz się w tym artykule
- Architektura typu end-to-end: czujniki → bramka → MQTT → Telegraf → InfluxDB → Grafana
- Głębokie nurkowanie MQTT: QoS, zachowanie, komunikaty woli, projektowanie przestrzeni nazw tematów dla energii
- Porównanie brokerów: Mosquitto vs EMQX vs HiveMQ pod kątem skalowalności i klastrowania
- Telegraf: konfiguracja konsumencka MQTT, potok procesora, dane wyjściowe InfluxDB
- InfluxDB 3.x: projektowanie segmentów, polityka przechowywania, zapytania Flux i planowanie zadań
- Modbus RTU/TCP → Most MQTT z Pythonem i pymodbus
- Ukończ Docker Compose dla stosu produkcyjnego
- Bezpieczeństwo: wzajemne TLS, uwierzytelnianie MQTT, token InfluxDB, segmentacja sieci
- Studium przypadku: Park fotowoltaiczny o mocy 10 MW z 200 falownikami, 100 tys. msg/s
- Zaawansowane alerty: integracja Grafana, PagerDuty i Slack
Seria EnergyTech — lokalizacja artykułu
| # | Przedmiot | Poziom | Państwo |
|---|---|---|---|
| 1 | Protokół OCPP 2.x: Budowa systemów ładowania pojazdów elektrycznych | Zaawansowany | Opublikowany |
| 2 | Architektura DERMS: agregacja milionów rozproszonych zasobów | Zaawansowany | Opublikowany |
| 3 | Prognozowanie energii odnawialnej za pomocą ML: Python LSTM | Zaawansowany | Opublikowany |
| 4 | System zarządzania baterią do przechowywania w skali sieciowej | Zaawansowany | Opublikowany |
| 5 | IEC 61850 dla inżynierów oprogramowania: komunikacja w inteligentnych sieciach | Zaawansowany | Opublikowany |
| 6 | Równoważenie obciążenia ładowania pojazdów elektrycznych: algorytmy czasu rzeczywistego | Zaawansowany | Opublikowany |
| 7 | Jesteś tutaj - Od MQTT do InfluxDB: Platforma Energy IoT w czasie rzeczywistym | Zaawansowany | Aktualny |
| 8 | Architektura oprogramowania do rozliczania emisji dwutlenku węgla: platformy ESG | Zaawansowany | Następny |
| 9 | Cyfrowy bliźniak dla infrastruktury energetycznej: symulacja w czasie rzeczywistym | Zaawansowany | Już wkrótce |
| 10 | Blockchain dla handlu energią P2P: inteligentne kontrakty i ograniczenia | Zaawansowany | Już wkrótce |
Kompleksowa architektura: od czujnika do deski rozdzielczej
Architektura platformy IoT energii działającej w czasie rzeczywistym jest podzielona na pięć odrębnych poziomów, każdy z określonymi obowiązkami i różnymi wymaganiami dotyczącymi niezawodności. Zrozumienie To oddzielenie jest niezbędne przed zapisaniem pierwszej linii konfiguracyjnej.
Poziom 1: Pole (Warstwa pola)
Na najniższym poziomie znajdują się urządzenia fizyczne: falowniki fotowoltaiczne, anemometry, piranometry, mierniki sznurkowe, PMU (jednostki pomiaru fazorów), inteligentne liczniki i stacje ładowania pojazdów elektrycznych. Większość tych urządzeń obsługuje protokoły przemysłowe: Modbus RTU na RS-485, Modbus TCP przez Ethernet, IEC 61850 w podstacjach, DNP3 dla sieci elektroenergetycznych lub zastrzeżone protokoły, takie jak SunSpec dla falowników.
Protokoły te nie są natywnie kompatybilne z nowoczesnym IP lub chmurą. Ich sondaż Typowy cykl waha się od 100 ms (PMU) do 60 sekund (starszy rejestrator danych). Optymalna częstotliwość dla falownik fotowoltaiczny i 1-5 sekund na wielkości elektryczne (napięcie, prąd, moc) i 30-60 sekund dla wielkości termicznych (temperatura modułu, temperatura falownika).
Poziom 2: Brama / Krawędź (Warstwa Krawędziowa)
Brama brzegowa i tłumacz: odczytuje protokoły terenowe i publikuje je za pośrednictwem MQTT dla brokera centralny. Może to być urządzenie wbudowane (Raspberry Pi 4, BeagleBone, Moxa ioThinx), sterownik PLC ze zintegrowanym stosem MQTT lub lokalny serwer brzegowy (NUC, ODROID) w przypadku instalacji wielokrotnych duży. Bramka spełnia trzy krytyczne funkcje:
- Tłumaczenie protokołu: Modbus → MQTT, IEC 61850 → MQTT, DNP3 → MQTT
- Buforowanie lokalne: Gromadzi dane podczas rozłączeń sieci WAN (przechowywanie i przesyłanie)
- Wstępne przetwarzanie krawędzi: filtrowanie, agregacja, wykrywanie lokalnych anomalii
Warstwa 3: Broker wiadomości (warstwa transportowa)
Broker MQTT jest sercem systemu transportowego. Odbiera wiadomości z bramek brzegowych, li dystrybuuje do abonentów (przede wszystkim Telegraf, ale także innych konsumentów, takich jak systemy SCADA lub powiadomienia w czasie rzeczywistym). Wybór brokera zależy od skali: Mosquitto for pojedyncze lub małe instalacje (poniżej 10 tys. połączeń), EMQX lub HiveMQ na wdrożenie korporacyjne i wielooddziałowe.
Poziom 4: Pozyskiwanie i przechowywanie (warstwa danych)
Telegraf subskrybuje tematy MQTT i przekształca wiadomości w metryki InfluxDB za pomocą konfigurowalne potoki procesorów. InfluxDB odbiera i indeksuje szeregi czasowe z zachowaniem zróżnicowane zasady: surowe dane o wysokiej rozdzielczości z 30 dni, zagregowane co godzinę 1 rok, agregaty dzienne przez 5 lat. Próbkowanie w dół według harmonogramu zadań Flux automatyczne i obliczone alerty.
Poziom 5: Wizualizacja i ostrzeganie (warstwa prezentacji)
Grafana łączy się z InfluxDB poprzez wtyczkę źródła danych i renderuje dashboardy w czasie rzeczywistym aktualizacja co 5-30 sekund. System ostrzegania Grafana ocenia zapytania okresowo i wysyłaj powiadomienia do PagerDuty, Slack, poczty elektronicznej lub niestandardowych webhooków, kiedy progi zostały przekroczone.
Kompletny schemat architektoniczny
Przepływ danych przebiega następującą ścieżką: Falowniki/czujniki (Modbus/SunSpec) → Krawędź bramy (Raspberry Pi/NUC z pymodbus) → Broker MQTT (Komar/EMQX) → Telegraf (Konsument MQTT + Parser JSON) → NapływDB (szereg czasowy DB) → Grafana (pulpit + alerty). Równolegle A Menedżer alertów w InfluxDB ocenia zadania Flux co minutę i potrafi pisać zdarzenia w dedykowanym zasobniku lub wywołaj punkt końcowy HTTP.
Głębokie nurkowanie MQTT: protokół i projekt dotyczący energii
Podstawy MQTT 5.0
MQTT (Message Queuing Telemetry Transport) to zaprojektowany protokół publikowania i subskrybowania dla sieci o ograniczonej przepustowości i dużych opóźnieniach. Wersja 5.0 (standard OASIS RFC 2019) dodaje krytyczne funkcje dla aplikacji korporacyjnych: interwał wygaśnięcia sesji, komunikat okres ważności, rozszerzone kody przyczyn, właściwości użytkownika w komunikatach, kontrola przepływu za pomocą otrzymać maksimum i aliasy tematów, aby zmniejszyć rozmiar nagłówków.
QoS 0, 1 i 2: prawidłowy wybór danych dotyczących energii
Jakość usług jest najważniejszym wyborem przy projektowaniu energetycznego systemu MQTT. Te trzy opcje mają bardzo różne konsekwencje dla przepustowości, narzutu sieci i gwarancje dostawy:
| Jakość usług | Gwarancja | Nad głową | Użyj energii przypadku |
|---|---|---|---|
| Jakość usług 0 | Co najwyżej raz (odpal i zapomnij) | Minimalna (1 RTT) | Telemetria wysokiej częstotliwości (1 Hz+), dane, w przypadku których dopuszczalna jest sporadyczna utrata: natężenie promieniowania, temperatura otoczenia |
| Jakość usług 1 | Przynajmniej raz (z duplikatami) | Średni (2 RTT) | Metryki produkcji (kWh, kW), alarmy, stany urządzeń - duplikaty zarządzane przez InfluxDB poprzez znacznik czasu |
| Jakość usług 2 | Dokładnie uncje | Wysoka (4 RTT) | Polecenia sterujące (nastawa falownika, otwarcie/zamknięcie wyłącznika), dane finansowe (energia sprzedana w sieci) |
W przypadku typowego systemu fotowoltaicznego praktyczne zalecenie jest następujące: QoS 0 dla telemetrii meteorologiczne (napromieniowanie, temperatura, wiatr), QoS 1 dla produkcji i alarmów, QoS 2 tylko dla poleceń sterujących. Wybór ten równoważy niezawodność i przepustowość, umożliwiając brokerowi zarządzanie wiadomościami o szybkości 50–100 tys. na sekundę bez obciążania przepustowości sieci WAN.
Zachowaj wiadomości i wiadomości woli
Dwie często niedoceniane, ale krytyczne funkcje MQTT dla systemów energetycznych:
-
Zachowaj wiadomości: Broker przechowuje ostatnią wiadomość z parametrem keep=true
dla każdego tematu. Kiedy nowy abonent połączy się, natychmiast otrzyma wartość
aktualne bez czekania na kolejną publikację. Podstawowe tematy dotyczące statusu:
plant/PV001/statusz zachowaniem zapewnia, że każdy nowy pulpit nawigacyjny będzie widział natychmiast sprawdzić stan systemu. -
Przesłanie woli (ostatnia wola i testament): Brama konfiguruje wiadomość
które broker automatycznie publikuje, jeśli klient rozłączy się w sposób nienormalny.
Służy do sygnalizowania rozłączenia bramy: jeśli Raspberry Pi w zdalnej lokalizacji ulegnie awarii,
broker publikuje
plant/PV001/connectivity offlinenatychmiast, bez czekania na przekroczenie limitu czasu sesji.
Temat Projektowanie przestrzeni nazw dla systemów energetycznych
Projekt przestrzeni nazw tematu to decyzja architektoniczna, która ma wpływ na każdy aspekt system: routing, filtrowanie, bezpieczeństwo ACL i organizacja w InfluxDB. Dobrze zaprojektowana struktura hierarchiczna jest zgodna z następującym wzorcem:
# Schema topic namespace per impianto energetico
# Struttura: {tipo_impianto}/{plant_id}/{subsystem}/{device_id}/{metric_type}
# === FOTOVOLTAICO ===
# Inverter string-level metrics
plant/PV001/inverter/INV001/metrics
plant/PV001/inverter/INV001/alarms
plant/PV001/inverter/INV001/status
# String-level measurements
plant/PV001/string/STR001/metrics
plant/PV001/string/STR001/metrics/voltage
plant/PV001/string/STR001/metrics/current
# Meteorological sensors
plant/PV001/weather/WS001/metrics
# Plant-level aggregates (calcolati dal gateway)
plant/PV001/totals/power_ac
plant/PV001/totals/energy_today
# === EOLICO ===
wind/WF001/turbine/T001/nacelle/metrics
wind/WF001/turbine/T001/blade/metrics
wind/WF001/turbine/T001/gearbox/temperature
wind/WF001/substation/SS001/metrics
# === STAZIONI DI RICARICA EV ===
ev/SITE001/charger/CHRG001/connector/1/metrics
ev/SITE001/charger/CHRG001/session/current
ev/SITE001/charger/CHRG001/status
ev/SITE001/totals/energy_dispensed
# === SISTEMA ===
# Heartbeat e connectivity
plant/PV001/gateway/GW001/heartbeat # ogni 60s
plant/PV001/gateway/GW001/status # retain=true
plant/PV001/gateway/GW001/version # retain=true
# === WILDCARD SUBSCRIPTIONS ===
# Telegraf sottoscrive tutti i metrics:
plant/+/inverter/+/metrics # tutti gli inverter di tutti gli impianti
plant/PV001/+/+/metrics # tutti i metrics dell'impianto PV001
plant/# # tutto il plant PV001 (usare con cautela)
Podstawowe zasady dotyczące przestrzeni nazw: każdy poziom musi mieć znaczenie semantyczne, tematy sterujące (polecenia) muszą być oddzielone od tematów telemetrycznych (dane), używaj Snake_case dla spójności, unikaj znaków specjalnych i spacji, ograniczaj głębokość na maksymalnie 6 poziomach dla kompatybilności z różnymi brokerami i klientami.
Porównanie brokerów MQTT: Mosquitto vs EMQX vs HiveMQ
Wybór brokera MQTT ma fundamentalne znaczenie dla skalowalności i niezawodności systemu. Trzej najczęściej wykorzystywani brokerzy w sektorze energetycznym mają bardzo różne cechy.
| Cechy | Komar 2.x | EMQX 5.x | HiveMQ 4.x |
|---|---|---|---|
| Maks. znajomości | 100 tys. (pojedynczy węzeł) | 100M (klaster 23-węzłowy) | 200M (klastry) |
| Klastrowanie | Żaden rodzimy | Tak (dystrybucja Erlang) | Tak (korporacja) |
| Wersja MQTT | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 |
| Silnik reguł | No | Tak (w oparciu o SQL) | Tak (korporacja) |
| Most | Si | Si | Si |
| Uwierzytelnianie | Pliki, TLS, wtyczki | JWT, OAuth2, LDAP, mTLS | JWT, OAuth2, przedsiębiorstwo |
| Licencja | EPL/EDL (otwarte oprogramowanie) | Apache 2.0 / Enterprise | Społeczność / Przedsiębiorstwo |
| RAM (100 tys. połączeń) | ~500MB | ~2 GB | ~3 GB |
| Użyj energii przypadku | Pojedynczy zakład, bramka brzegowa, laboratorium | Wiele lokalizacji, skala użyteczności publicznej, przedsiębiorstwo | Skala użytkowa, zgodność z przepisami korporacyjnymi |
Konfiguracja Mosquitto do produkcji
# /etc/mosquitto/mosquitto.conf - Produzione con TLS e ACL
# Network listeners
listener 1883 localhost
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true # mutual TLS
use_identity_as_username true
# WebSocket listener per dashboard web
listener 9001
protocol websockets
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# Autenticazione
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistenza
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 60
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S
# Performance tuning
max_queued_messages 10000
max_packet_size 1048576 # 1MB
message_size_limit 262144 # 256KB per messaggio
max_connections 10000
max_keepalive 300
# /etc/mosquitto/acl - Access Control List per ruoli
# Telegraf: legge tutto
user telegraf_reader
topic read plant/#
topic read wind/#
topic read ev/#
# Gateway PV001: scrive solo per il suo impianto
user gateway_PV001
topic write plant/PV001/#
topic read plant/PV001/commands/#
# Gateway EV SITE001: scrive solo per il suo sito
user gateway_SITE001
topic write ev/SITE001/#
# Dashboard/SCADA: solo lettura
user dashboard_user
topic read plant/#
topic read wind/#
topic read ev/#
# Admin: tutto (solo per maintenance)
user mqtt_admin
topic #
Krawędź bramy: Bridge Modbus → MQTT z Pythonem
Bramka brzegowa to komponent łączący świat przemysłowych protokołów terenowych z ekosystemem MQTT. Oto kompletna realizacja dla falowników fotowoltaicznych z Protokół Modbus TCP, obejmujący lokalne buforowanie w celu obsługi rozłączeń WAN.
#!/usr/bin/env python3
"""
gateway_modbus_mqtt.py
Gateway edge per inverter fotovoltaici Modbus TCP → MQTT
Compatibile con registro SunSpec (standard de facto per inverter PV)
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import aiofiles
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import aiomqtt
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('gateway')
@dataclass
class InverterMetrics:
"""Metriche SunSpec inverter (Model 103 - Three Phase Inverter)"""
timestamp: str
plant_id: str
inverter_id: str
# AC output
ac_power_w: float # Potenza AC totale (W)
ac_voltage_l1_v: float # Tensione fase L1 (V)
ac_voltage_l2_v: float # Tensione fase L2 (V)
ac_voltage_l3_v: float # Tensione fase L3 (V)
ac_current_a: float # Corrente totale AC (A)
ac_frequency_hz: float # Frequenza rete (Hz)
# DC input
dc_voltage_v: float # Tensione DC ingresso (V)
dc_current_a: float # Corrente DC ingresso (A)
dc_power_w: float # Potenza DC ingresso (W)
# Energy
energy_wh_total: float # Energia prodotta totale (Wh)
energy_wh_today: float # Energia prodotta oggi (Wh)
# Thermal
temp_cabinet_c: float # Temperatura cabinet inverter (C)
temp_heatsink_c: float # Temperatura heatsink (C)
# Status
operating_state: int # 1=Off, 2=Sleeping, 3=Starting, 4=MPPT, 5=Throttled
error_code: int # Codice errore (0=no error)
# Efficiency
efficiency_pct: float # Efficienza conversione (%)
class ModbusMqttGateway:
"""
Gateway che legge metriche Modbus da inverter SunSpec
e le pubblica su MQTT con buffering locale per disconnessioni WAN
"""
def __init__(self, config: dict):
self.plant_id = config['plant_id']
self.inverters = config['inverters'] # lista di {id, host, port, unit_id}
self.mqtt_host = config['mqtt']['host']
self.mqtt_port = config['mqtt']['port']
self.mqtt_user = config['mqtt']['username']
self.mqtt_password = config['mqtt']['password']
self.poll_interval = config.get('poll_interval_sec', 5)
self.buffer_file = config.get('buffer_file', '/var/lib/gateway/buffer.jsonl')
self.clients: dict[str, AsyncModbusTcpClient] = {}
self._buffer: list[dict] = []
self._mqtt_connected = False
async def connect_inverter(self, inverter_cfg: dict) -> AsyncModbusTcpClient:
"""Crea e connette client Modbus TCP"""
client = AsyncModbusTcpClient(
host=inverter_cfg['host'],
port=inverter_cfg.get('port', 502),
timeout=5,
retries=3,
retry_on_empty=True,
)
await client.connect()
if not client.connected:
raise ConnectionError(
f"Cannot connect to inverter {inverter_cfg['id']} "
f"at {inverter_cfg['host']}:{inverter_cfg.get('port', 502)}"
)
logger.info(f"Connected to inverter {inverter_cfg['id']}")
return client
async def read_sunspec_model103(
self,
client: AsyncModbusTcpClient,
unit_id: int,
plant_id: str,
inverter_id: str
) -> Optional[InverterMetrics]:
"""
Legge registri SunSpec Model 103 (Three Phase Inverter)
SunSpec base address: 40001 (Modbus address 40000)
Model 103 start: 40070 (tipico, dipende da inverter)
"""
try:
# Leggi blocco principale Model 103: 40069 - 40094 (26 registri)
result = await client.read_holding_registers(
address=40069,
count=26,
slave=unit_id
)
if result.isError():
logger.error(f"Modbus error reading INV {inverter_id}: {result}")
return None
regs = result.registers
# Scala con SF (scale factors) SunSpec
ac_current_sf = self._sf(regs[10])
ac_voltage_sf = self._sf(regs[14])
ac_power_sf = self._sf(regs[17])
ac_freq_sf = self._sf(regs[19])
dc_current_sf = self._sf(regs[21])
dc_voltage_sf = self._sf(regs[23])
dc_power_sf = self._sf(regs[25])
# Leggi energia prodotta (registri 40093-40096)
energy_result = await client.read_holding_registers(
address=40093, count=4, slave=unit_id
)
energy_regs = energy_result.registers
energy_total_wh = ((energy_regs[0] << 16) | energy_regs[1]) * 1.0
# Leggi temperatura (registri 40103-40108)
temp_result = await client.read_holding_registers(
address=40103, count=3, slave=unit_id
)
temp_regs = temp_result.registers
ac_power_raw = self._signed(regs[16])
dc_power_w = self._signed(regs[24]) * (10 ** dc_power_sf)
efficiency = (ac_power_raw / dc_power_w * 100) if dc_power_w > 0 else 0.0
return InverterMetrics(
timestamp=datetime.now(timezone.utc).isoformat(),
plant_id=plant_id,
inverter_id=inverter_id,
ac_power_w=ac_power_raw * (10 ** ac_power_sf),
ac_voltage_l1_v=regs[11] * (10 ** ac_voltage_sf),
ac_voltage_l2_v=regs[12] * (10 ** ac_voltage_sf),
ac_voltage_l3_v=regs[13] * (10 ** ac_voltage_sf),
ac_current_a=(regs[7] + regs[8] + regs[9]) * (10 ** ac_current_sf),
ac_frequency_hz=regs[18] * (10 ** ac_freq_sf),
dc_voltage_v=self._signed(regs[22]) * (10 ** dc_voltage_sf),
dc_current_a=self._signed(regs[20]) * (10 ** dc_current_sf),
dc_power_w=dc_power_w,
energy_wh_total=energy_total_wh,
energy_wh_today=0.0, # da registro specifico vendor
temp_cabinet_c=self._signed(temp_regs[0]) * 0.01,
temp_heatsink_c=self._signed(temp_regs[1]) * 0.01,
operating_state=regs[0],
error_code=regs[1],
efficiency_pct=round(efficiency, 2),
)
except ModbusException as e:
logger.error(f"Modbus exception for {inverter_id}: {e}")
return None
def _signed(self, val: int) -> int:
"""Converte uint16 in int16 (signed)"""
return val if val < 32768 else val - 65536
def _sf(self, val: int) -> int:
"""Scale factor SunSpec: int16 nel range -10..10"""
s = self._signed(val)
return s if -10 <= s <= 10 else 0
async def publish_metrics(
self,
mqtt_client: aiomqtt.Client,
metrics: InverterMetrics
) -> None:
"""Pubblica metriche su MQTT con QoS 1"""
topic = f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/metrics"
payload = json.dumps(asdict(metrics), default=str)
await mqtt_client.publish(
topic=topic,
payload=payload.encode(),
qos=1,
retain=False,
)
# Pubblica anche stato con retain
status = {
"timestamp": metrics.timestamp,
"operating_state": metrics.operating_state,
"error_code": metrics.error_code,
"online": True,
}
await mqtt_client.publish(
topic=f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/status",
payload=json.dumps(status).encode(),
qos=1,
retain=True, # retain per status
)
async def run(self):
"""Loop principale del gateway"""
# Carica buffer da disco se esiste
await self._load_buffer()
# Will message: pubblica offline status se il gateway crolla
will = aiomqtt.Will(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": False, "reason": "unexpected_disconnect"}).encode(),
qos=1,
retain=True,
)
async with aiomqtt.Client(
hostname=self.mqtt_host,
port=self.mqtt_port,
username=self.mqtt_user,
password=self.mqtt_password,
will=will,
keepalive=60,
tls_params=aiomqtt.TLSParameters(
ca_certs='/etc/gateway/certs/ca.crt',
certfile='/etc/gateway/certs/gateway.crt',
keyfile='/etc/gateway/certs/gateway.key',
),
) as mqtt:
self._mqtt_connected = True
logger.info("MQTT connected")
# Pubblica online status con retain
await mqtt.publish(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": True}).encode(),
qos=1, retain=True,
)
# Svuota buffer offline
await self._flush_buffer(mqtt)
# Connetti a tutti gli inverter
for inv_cfg in self.inverters:
try:
self.clients[inv_cfg['id']] = await self.connect_inverter(inv_cfg)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
# Loop principale di polling
while True:
poll_start = time.monotonic()
tasks = [
self._poll_and_publish(inv_cfg, mqtt)
for inv_cfg in self.inverters
]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - poll_start
sleep_time = max(0, self.poll_interval - elapsed)
await asyncio.sleep(sleep_time)
async def _poll_and_publish(self, inv_cfg: dict, mqtt: aiomqtt.Client):
"""Polling singolo inverter e pubblicazione"""
inv_id = inv_cfg['id']
client = self.clients.get(inv_id)
if client is None or not client.connected:
try:
client = await self.connect_inverter(inv_cfg)
self.clients[inv_id] = client
except ConnectionError:
return
metrics = await self.read_sunspec_model103(
client, inv_cfg.get('unit_id', 1),
self.plant_id, inv_id
)
if metrics:
try:
await self.publish_metrics(mqtt, metrics)
except Exception as e:
logger.warning(f"MQTT publish failed, buffering: {e}")
self._buffer.append(asdict(metrics))
await self._save_buffer()
async def _load_buffer(self):
"""Carica buffer dal disco"""
try:
async with aiofiles.open(self.buffer_file) as f:
async for line in f:
self._buffer.append(json.loads(line))
logger.info(f"Loaded {len(self._buffer)} buffered messages")
except FileNotFoundError:
pass
async def _save_buffer(self):
"""Salva buffer su disco"""
async with aiofiles.open(self.buffer_file, 'w') as f:
for item in self._buffer:
await f.write(json.dumps(item) + '\n')
async def _flush_buffer(self, mqtt: aiomqtt.Client):
"""Pubblica messaggi bufferizzati"""
if not self._buffer:
return
logger.info(f"Flushing {len(self._buffer)} buffered messages")
published = []
for item in self._buffer:
try:
metrics = InverterMetrics(**item)
await self.publish_metrics(mqtt, metrics)
published.append(item)
except Exception as e:
logger.error(f"Failed to flush buffered message: {e}")
break
self._buffer = [i for i in self._buffer if i not in published]
await self._save_buffer()
if __name__ == '__main__':
config = {
'plant_id': 'PV001',
'poll_interval_sec': 5,
'mqtt': {
'host': '10.0.1.10',
'port': 8883,
'username': 'gateway_PV001',
'password': 'secret',
},
'inverters': [
{'id': 'INV001', 'host': '192.168.1.101', 'port': 502, 'unit_id': 1},
{'id': 'INV002', 'host': '192.168.1.102', 'port': 502, 'unit_id': 1},
# ... fino a INV200 per impianto da 10 MW
],
'buffer_file': '/var/lib/gateway/buffer.jsonl',
}
asyncio.run(ModbusMqttGateway(config).run())
Telegraf: Most MQTT → InfluxDB z rurociągiem transformacji
Telegraf to moduł do gromadzenia danych InfluxData napisany w Go i zawierający ponad 300 wtyczek. Na naszym stosie Telegraf subskrybuje tematy MQTT, dekoduje ładunek JSON, stosuje transformacje poprzez procesor i zapisuje do InfluxDB. Wersja 1.30+ natywnie obsługuje MQTT 5.0 i analizowanie tematów jako tagi.
Pełna konfiguracja Telegrafu
# telegraf.conf - Configurazione completa per piattaforma IoT energetica
# ============================================================
# AGENT SETTINGS
# ============================================================
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "1s"
precision = "1ns"
debug = false
quiet = false
logtarget = "file"
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "50MB"
logfile_rotation_max_archives = 5
# ============================================================
# OUTPUT: InfluxDB 2.x / 3.x
# ============================================================
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "plant_metrics_raw"
timeout = "5s"
# Tag per routing - usato per bucket separati per plant
[outputs.influxdb_v2.tagpass]
data_type = ["inverter_metrics", "weather_metrics", "string_metrics"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "ev_metrics_raw"
timeout = "5s"
[outputs.influxdb_v2.tagpass]
data_type = ["ev_metrics"]
# Output di backup su file locale (per audit e recovery)
[[outputs.file]]
files = ["/var/log/telegraf/metrics.jsonl"]
rotation_interval = "1h"
rotation_max_size = "500MB"
data_format = "json"
[outputs.file.tagpass]
error_code = ["*"] # solo metriche con errori
# ============================================================
# INPUT: MQTT Consumer - Inverter Fotovoltaici
# ============================================================
[[inputs.mqtt_consumer]]
name_override = "pv_inverter"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/inverter/+/metrics"]
qos = 1
connection_timeout = "30s"
max_undelivered_messages = 5000
# Autenticazione mTLS
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-energy-01"
persistent_session = true # QoS 1+ richiede sessione persistente
# Parsing topic come tag
# topic: plant/PV001/inverter/INV001/metrics
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/inverter/+/metrics"
measurement = "_/plant_id/_/inverter_id/_"
tags = "_/plant_id/_/inverter_id/_"
# Estrae plant_id=PV001, inverter_id=INV001 come tag
# Formato payload
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l1_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l2_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l3_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_frequency_hz"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_wh_total"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_cabinet_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_heatsink_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "operating_state"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "error_code"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "efficiency_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "plant_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "inverter_id"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05.999999999Z07:00" # RFC3339Nano
# INPUT: MQTT Consumer - Stazioni Meteo
[[inputs.mqtt_consumer]]
name_override = "pv_weather"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/weather/+/metrics"]
qos = 0 # QoS 0 per dati meteo (perdita occasionale accettabile)
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-weather-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/weather/+/metrics"
tags = "_/plant_id/_/station_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "irradiance_wm2"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temperature_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "wind_speed_ms"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "humidity_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05Z07:00"
# INPUT: MQTT Consumer - Stazioni EV
[[inputs.mqtt_consumer]]
name_override = "ev_charger"
servers = ["ssl://mosquitto:8883"]
topics = ["ev/+/charger/+/connector/+/metrics"]
qos = 1
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-ev-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "ev/+/charger/+/connector/+/metrics"
tags = "_/site_id/_/charger_id/_/connector_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "power_kw"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_kwh"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "status"
type = "string"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "site_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "charger_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "connector_id"
# ============================================================
# PROCESSORS: Trasformazione dati
# ============================================================
# Aggiungi tag data_type per routing output
[[processors.converter]]
namepass = ["pv_inverter"]
[processors.converter.tags]
string = ["data_type"]
[[processors.override]]
namepass = ["pv_inverter"]
[processors.override.tags]
data_type = "inverter_metrics"
[[processors.override]]
namepass = ["ev_charger"]
[processors.override.tags]
data_type = "ev_metrics"
# Filtra metriche con valori anomali (sanity check)
[[processors.dedup]]
dedup_interval = "1s" # evita duplicati QoS 1
[[processors.enum]]
namepass = ["pv_inverter"]
[[processors.enum.mapping]]
field = "operating_state"
dest = "operating_state_str"
[processors.enum.mapping.value_mappings]
1 = "off"
2 = "sleeping"
3 = "starting"
4 = "mppt"
5 = "throttled"
6 = "shutting_down"
7 = "fault"
8 = "standby"
# ============================================================
# MONITORING: Telegraf self-monitoring
# ============================================================
[[inputs.internal]]
collect_memstats = true
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "telegraf_monitoring"
[outputs.influxdb_v2.namepass]
internal = ["*"]
InfluxDB: przechowywanie szeregów czasowych i zapytania o energię
Architektura InfluxDB 2.x / 3.x dla systemów energetycznych
InfluxDB to referencyjna baza danych szeregów czasowych dla energetycznego Internetu Rzeczy. Wersja 2.x (w 2025 r. nadal szeroko stosowany w produkcji) używa Flux jako języka zapytań i wprowadza koncepcja wiaderko ze zintegrowaną polityką przechowywania. Wersja 3.x, przepisany w Rust z Apache Arrow i DataFusion, wprowadza SQL jako język podstawowy i drastycznie poprawia skalowalność w celu uzyskania wysokiej kardynalności.
Dla systemu o mocy 10 MW z 200 falownikami próbkującymi co 5 sekund, objętość danych i znaczne: każdy falownik generuje około 15 pól, w sumie 3000 pól na każde 5 sekund lub 216 000 punktów danych na minutę. W ciągu jednego dnia jest ich około 311 milionów punkty danych. Przechowywanie przez 30 dni wymaga około 9,3 miliarda surowych punktów danych. Strategia downsamplingu jest niezbędna.
Konfiguracja wstępna: Wiadra i organizacja
#!/bin/bash
# setup_influxdb.sh - Configurazione iniziale InfluxDB per piattaforma energetica
INFLUX_HOST="http://localhost:8086"
INFLUX_TOKEN="my-super-secret-admin-token"
INFLUX_ORG="energy-corp"
# Setup iniziale (solo prima volta)
influx setup \
--host "$INFLUX_HOST" \
--username admin \
--password "SecurePassword123!" \
--org "$INFLUX_ORG" \
--bucket plant_metrics_raw \
--retention 720h \ # 30 giorni dati grezzi
--token "$INFLUX_TOKEN" \
--force
# Bucket dati grezzi PV (alta risoluzione: 30 giorni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_raw" \
--retention "720h" \
--shard-group-duration "24h" # shard giornalieri per query efficienti
# Bucket downsampled 1 ora (retention 1 anno)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1h" \
--retention "8760h" \
--shard-group-duration "168h" # shard settimanali
# Bucket downsampled 1 giorno (retention 5 anni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1d" \
--retention "43800h" \ # ~5 anni
--shard-group-duration "720h" # shard mensili
# Bucket per alerting e eventi
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "energy_alerts" \
--retention "8760h" # 1 anno per audit
# Bucket per stazioni EV
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "ev_metrics_raw" \
--retention "720h"
# Token con permessi limitati per Telegraf (write only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Telegraf write-only token" \
--write-bucket plant_metrics_raw \
--write-bucket ev_metrics_raw \
--write-bucket telegraf_monitoring
# Token per Grafana (read only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Grafana read-only token" \
--read-bucket plant_metrics_raw \
--read-bucket plant_metrics_1h \
--read-bucket plant_metrics_1d \
--read-bucket ev_metrics_raw \
--read-bucket energy_alerts
echo "InfluxDB setup completato"
Zadania strumieniowe: próbkowanie w dół i automatyczne ostrzeganie
Zadania Flux to zaplanowane zadania, które Telegraf wykonuje okresowo w InfluxDB. Używamy ich w trzech celach: automatycznego downsamplingu surowych danych, obliczenia KPI agregaty i wykrywanie anomalii z zapisem do alertów segmentu.
// task_downsample_1h.flux
// Task: Downsampling dati inverter da 5s a 1h
// Schedulato ogni ora, processa ultima ora
option task = {
name: "downsample_pv_inverter_1h",
every: 1h,
offset: 5m, // aspetta 5 min per assicurarsi che tutti i dati siano arrivati
}
from(bucket: "plant_metrics_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) =>
r._field == "ac_power_w" or
r._field == "dc_power_w" or
r._field == "energy_wh_total" or
r._field == "ac_voltage_l1_v" or
r._field == "temp_cabinet_c" or
r._field == "efficiency_pct"
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables
|> mean(column: column), // media oraria
createEmpty: false
)
// Aggiungi statistiche aggiuntive per power
|> map(fn: (r) => ({r with _measurement: "pv_inverter_1h"}))
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_plant_kpi.flux
// Task: Calcolo KPI giornalieri per impianto
// Produccion, efficienza, performance ratio
option task = {
name: "calculate_plant_daily_kpi",
every: 15m,
}
// Potenza attiva totale per impianto (somma tutti inverter)
totalPower = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "total_ac_power_w",
}))
// Energia prodotta totale (max energy_wh_total - min energy_wh_total per periodo)
energyDelta = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "energy_wh_total")
|> difference()
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "energy_delta_wh",
}))
// Temperatura media inverter
avgTemp = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> mean()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "avg_inverter_temp_c",
}))
union(tables: [totalPower, energyDelta, avgTemp])
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_thermal_alert.flux
// Task: Rilevamento surriscaldamento inverter
// Alert se temperatura > 75°C per più di 5 minuti
option task = {
name: "thermal_alert_inverter",
every: 1m,
}
threshold = 75.0
from(bucket: "plant_metrics_raw")
|> range(start: -6m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> filter(fn: (r) => r._value > threshold)
// Conta quanti valori sopra soglia negli ultimi 6 min (= almeno 5 min)
|> aggregateWindow(every: 6m, fn: count, createEmpty: false)
|> filter(fn: (r) => r._value >= 60) // 60 campioni * 5sec = 5 min
|> map(fn: (r) => ({r with
_measurement: "energy_alert",
_field: "alert_type",
_value: "thermal_overtemperature",
severity: "high",
description: "Inverter temperature exceeds " + string(v: threshold) + "°C for > 5 minutes",
}))
|> to(bucket: "energy_alerts", org: "energy-corp")
Strumień zapytań dla analityki fotowoltaicznej
// query_production_analysis.flux
// Analisi produzione giornaliera per impianto PV001
// Confronto con irraggiamento (Performance Ratio)
import "math"
// Produzione AC ogni 5 minuti
production = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
|> group(columns: ["plant_id"])
|> sum() // somma tutti gli inverter per potenza totale impianto
// Irraggiamento dalla stazione meteo
irradiance = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// Join per calcolo Performance Ratio
// PR = (energia prodotta / (irraggiamento * superficie)) * 100
join(
tables: {production: production, irradiance: irradiance},
on: ["_time", "plant_id"]
)
// PV001: 10 MW peak, superficie pannelli ~50.000 m2
|> map(fn: (r) => ({r with
performance_ratio: if r.irradiance_irradiance_wm2 > 50.0 then
(r.production__value / (r.irradiance_irradiance_wm2 * 50000.0)) * 100.0
else
0.0,
}))
|> keep(columns: ["_time", "production__value", "irradiance_irradiance_wm2", "performance_ratio"])
Docker Compose: kompletny stos produkcyjny
Poniższy Docker Compose jest punktem wyjścia do wdrożenia produkcyjnego pełny stos. Każda usługa ma kontrolę stanu, zasady ponownego uruchamiania i konfiguracje woluminów trwała i izolowana sieć zapewniająca bezpieczeństwo.
# docker-compose.yml
# Stack IoT Energetico: Mosquitto + Telegraf + InfluxDB + Grafana
# Produzione-ready con TLS, health checks e persistent volumes
version: '3.9'
networks:
iot_network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
# Network separata per accesso esterno (solo Grafana e Mosquitto esposti)
external_network:
driver: bridge
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
influxdb_config:
grafana_data:
grafana_provisioning:
telegraf_buffer:
services:
# ============================================================
# MOSQUITTO MQTT BROKER
# ============================================================
mosquitto:
image: eclipse-mosquitto:2.0.20
container_name: mosquitto
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "1883:1883" # MQTT plain (solo per sviluppo/testing locale)
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket over TLS
volumes:
- ./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./config/mosquitto/passwd:/mosquitto/config/passwd:ro
- ./config/mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/etc/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
healthcheck:
test: ["CMD", "mosquitto_pub", "-h", "localhost", "-p", "1883",
"-t", "health", "-m", "ping", "--quiet"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 512m
cpus: '1.0'
# ============================================================
# INFLUXDB 2.7 (stable production)
# ============================================================
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
networks:
- iot_network
ports:
- "8086:8086" # solo su rete interna, proxy via Nginx se necessario
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: {{ "$INFLUXDB_ADMIN_PASSWORD" }}
DOCKER_INFLUXDB_INIT_ORG: energy-corp
DOCKER_INFLUXDB_INIT_BUCKET: plant_metrics_raw
DOCKER_INFLUXDB_INIT_RETENTION: 720h
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: {{ "$INFLUXDB_TOKEN" }}
# Performance tuning
INFLUXD_STORAGE_CACHE_MAX_MEMORY_SIZE: 1073741824 # 1GB cache
INFLUXD_STORAGE_COMPACT_THROUGHPUT_BURST: 50331648 # 48MB/s compaction
INFLUXD_HTTP_MAX_BODY_SIZE: 104857600 # 100MB max body
INFLUXD_QUERY_MEMORY_BYTES: 2147483648 # 2GB query memory limit
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
limits:
memory: 4g
cpus: '2.0'
# ============================================================
# TELEGRAF
# ============================================================
telegraf:
image: telegraf:1.34-alpine
container_name: telegraf
restart: unless-stopped
networks:
- iot_network
- external_network # per raggiungere MQTT su rete esterna
depends_on:
influxdb:
condition: service_healthy
mosquitto:
condition: service_healthy
environment:
INFLUXDB_TOKEN: {{ "$INFLUXDB_TOKEN" }}
MQTT_PASSWORD: {{ "$MQTT_PASSWORD" }}
HOSTNAME: telegraf-energy-01
volumes:
- ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
- telegraf_buffer:/var/lib/telegraf
- /var/log/telegraf:/var/log/telegraf
healthcheck:
test: ["CMD", "telegraf", "--test", "--config", "/etc/telegraf/telegraf.conf"]
interval: 60s
timeout: 30s
retries: 3
deploy:
resources:
limits:
memory: 1g
cpus: '1.0'
# ============================================================
# GRAFANA
# ============================================================
grafana:
image: grafana/grafana-oss:11.4.0
container_name: grafana
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: {{ "$GRAFANA_ADMIN_PASSWORD" }}
GF_SECURITY_SECRET_KEY: {{ "$GRAFANA_SECRET_KEY" }}
GF_SERVER_ROOT_URL: "https://grafana.energy-corp.it"
GF_SERVER_DOMAIN: "grafana.energy-corp.it"
# Analytics disabilitate per privacy
GF_ANALYTICS_REPORTING_ENABLED: "false"
GF_ANALYTICS_CHECK_FOR_UPDATES: "false"
# SMTP per alerting
GF_SMTP_ENABLED: "true"
GF_SMTP_HOST: {{ "$SMTP_HOST" }}
GF_SMTP_USER: {{ "$SMTP_USER" }}
GF_SMTP_PASSWORD: {{ "$SMTP_PASSWORD" }}
GF_SMTP_FROM_ADDRESS: "alerts@energy-corp.it"
# InfluxDB datasource
GF_DATASOURCES_DEFAULT: InfluxDB
INFLUXDB_TOKEN_GRAFANA: {{ "$INFLUXDB_TOKEN_GRAFANA" }}
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning:ro
- ./config/grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
influxdb:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3000/api/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
memory: 512m
cpus: '0.5'
# ============================================================
# NGINX REVERSE PROXY (opzionale ma raccomandato)
# ============================================================
nginx:
image: nginx:1.27-alpine
container_name: nginx
restart: unless-stopped
networks:
- external_network
ports:
- "80:80"
- "443:443"
volumes:
- ./config/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
- /var/log/nginx:/var/log/nginx
depends_on:
- grafana
- mosquitto
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
Automatyczne udostępnianie Grafana
Grafana obsługuje automatyczne udostępnianie źródeł danych i dashboardów za pośrednictwem plików YAML.
Ma to kluczowe znaczenie w przypadku środowisk kontenerowych, w których kontenery są odtwarzane.
Plik config/grafana/provisioning/datasources/influxdb.yml musi zawierać
konfiguracja źródła danych InfluxDB, natomiast wprowadzane są dashboardy w formacie JSON
config/grafana/dashboards/.
# config/grafana/provisioning/datasources/influxdb.yml
apiVersion: 1
datasources:
- name: InfluxDB-Raw
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_raw
tlsSkipVerify: false
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-1h
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_1h
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-Alerts
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: energy_alerts
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
Grafana: Pulpit nawigacyjny i alerty dla systemu fotowoltaicznego
Struktura panelu sterowania systemu fotowoltaicznego
Efektywny dashboard dla systemu fotowoltaicznego musi odpowiadać na pytania operacyjne w mniej niż 3 sekundy wizualnego odczytu. Zalecana struktura jest podzielona na wiersze:
- Wiersz 1 – Dzienne KPI: 4 panele „stat” z dzisiejszą produkcją (kWh), aktualna moc (kW), współczynnik wydajności (%), liczba uszkodzonych falowników. Aktualizacja z lat 30.
- Linia 2 – Produkcja tymczasowa: Wykres warstwowy z całkowitą mocą prądu przemiennego w ciągu ostatnich 24 godzin, nałożony na znormalizowane natężenie promieniowania. Zapytanie InfluxDB-Raw za pomocą 5-minutowe okno zbiorcze.
- Wiersz 3 – Mapa cieplna falownika: Mapa termiczna z osią X = czas, oś Y = identyfikator_inwertera, wartość = moc prądu przemiennego. Pozwala wizualnie zidentyfikować falowniki, które produkują mniej. Aktualizacja 1 minuta.
- Linia 4 – Temperatura i alarmy: Wykres wskazań maksymalnej temperatury falownika, panel tabeli z 50 ostatnimi alarmami z segmentu Energy_alerts posortowanymi według znacznika czasu.
- Wiersz 5 – Wydajność i jakość sieci: Wykres punktowy wydajności w funkcji temperatury, szeregi czasowe dla częstotliwości sieci i napięć fazowych.
Zaawansowane alerty: Grafana + PagerDuty + Slack
# config/grafana/provisioning/alerting/rules.yml
apiVersion: 1
groups:
- name: "PV Plant Alerts"
folder: "Energy Alerts"
interval: 1m
rules:
# Alert: Produzione impianto drasticamente ridotta
- uid: pv001-production-drop
title: "PV001 - Produzione ridotta >30%"
condition: C
data:
# Query A: Produzione attuale (15 min rolling mean)
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> mean()
|> sum()
# Query B: Irraggiamento attuale (per normalizzare)
- refId: B
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> mean()
# Condition C: se irraggiamento > 200 W/m2 (giorno chiaro)
# e produzione normalizzata < 70% del teorico
- refId: C
datasourceUid: __expr__
model:
type: math
expression: "$A / ($B * 50000) < 0.70 and $B > 200"
for: 10m # alert solo se condizione persiste 10 min
labels:
severity: "warning"
plant_id: "PV001"
category: "production"
annotations:
summary: "PV001: produzione sotto il 70% del teorico"
description: |
Produzione attuale: {{ $A }} W
Irraggiamento: {{ $B }} W/m2
Performance attesa: > 70%
runbook_url: "https://wiki.energy-corp.it/runbook/pv-production-drop"
# Alert: Inverter in fault
- uid: pv001-inverter-fault
title: "PV001 - Inverter Fault Rilevato"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "operating_state")
|> filter(fn: (r) => r._value == 7) // stato fault
|> count()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [0]
type: gt
for: 1m
labels:
severity: "critical"
plant_id: "PV001"
annotations:
summary: "Inverter in stato FAULT rilevato"
# Alert: Temperatura critica
- uid: pv001-thermal-critical
title: "PV001 - Temperatura Inverter Critica"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> max()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [85]
type: gt
for: 5m
labels:
severity: "critical"
category: "thermal"
# Contact points
contactPoints:
- name: "PagerDuty Critical"
receivers:
- uid: pagerduty-critical
type: pagerduty
settings:
integrationKey: $PAGERDUTY_INTEGRATION_KEY
severity: critical
class: "EnergyAlerts"
component: "PV Plant"
- name: "Slack Operations"
receivers:
- uid: slack-ops
type: slack
settings:
url: $SLACK_WEBHOOK_URL
channel: "#energy-ops-alerts"
username: "Grafana Alert Bot"
iconEmoji: ":zap:"
title: "{{ template \"slack.title\" . }}"
text: "{{ template \"slack.message\" . }}"
# Routing
policies:
- receiver: "Slack Operations"
group_by: ["plant_id", "severity"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- receiver: "PagerDuty Critical"
matchers:
- "severity = critical"
continue: true # invia anche a Slack
Bezpieczeństwo: TLS, uwierzytelnianie i segmentacja sieci
Bezpieczeństwo platformy energetycznej i krytycznej IoT: dane produkcyjne są wrażliwe informacje handlowe oraz możliwość wysyłania poleceń do falowników należy chronić przed nieupoważnionym dostępem. Strategia głębokiej obrony jest podzielony na cztery poziomy.
Generowanie certyfikatów TLS za pomocą skryptu
#!/bin/bash
# generate_certs.sh - Genera CA e certificati per mutual TLS
set -euo pipefail
CERTS_DIR="./certs"
mkdir -p "$CERTS_DIR"
# 1. CA (Certificate Authority) - Root of trust
openssl genrsa -out "$CERTS_DIR/ca.key" 4096
openssl req -x509 -new -nodes \
-key "$CERTS_DIR/ca.key" \
-sha256 -days 3650 \
-out "$CERTS_DIR/ca.crt" \
-subj "/C=IT/ST=Puglia/L=Bari/O=EnergyCorp/CN=Energy IoT CA"
# 2. Server certificate (Mosquitto)
openssl genrsa -out "$CERTS_DIR/server.key" 2048
openssl req -new \
-key "$CERTS_DIR/server.key" \
-out "$CERTS_DIR/server.csr" \
-subj "/C=IT/O=EnergyCorp/CN=mosquitto"
# SAN per tutti i possibili hostname
cat > "$CERTS_DIR/server-ext.cnf" <<EOF
[req]
req_extensions = v3_req
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = mosquitto
DNS.2 = mqtt.energy-corp.it
DNS.3 = localhost
IP.1 = 127.0.0.1
IP.2 = 172.20.0.10
EOF
openssl x509 -req \
-in "$CERTS_DIR/server.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/server.crt" \
-days 825 \
-sha256 \
-extfile "$CERTS_DIR/server-ext.cnf" \
-extensions v3_req
# 3. Client certificate per Telegraf
openssl genrsa -out "$CERTS_DIR/telegraf.key" 2048
openssl req -new \
-key "$CERTS_DIR/telegraf.key" \
-out "$CERTS_DIR/telegraf.csr" \
-subj "/C=IT/O=EnergyCorp/CN=telegraf"
openssl x509 -req \
-in "$CERTS_DIR/telegraf.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/telegraf.crt" \
-days 825 -sha256
# 4. Client certificate per gateway PV001
openssl genrsa -out "$CERTS_DIR/gateway_PV001.key" 2048
openssl req -new \
-key "$CERTS_DIR/gateway_PV001.key" \
-out "$CERTS_DIR/gateway_PV001.csr" \
-subj "/C=IT/O=EnergyCorp/CN=gateway_PV001"
openssl x509 -req \
-in "$CERTS_DIR/gateway_PV001.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/gateway_PV001.crt" \
-days 365 -sha256 # 1 anno per gateway (rotazione annuale)
echo "Certificati generati in $CERTS_DIR/"
echo "CA: ca.crt"
echo "Server (Mosquitto): server.crt + server.key"
echo "Client Telegraf: telegraf.crt + telegraf.key"
echo "Client Gateway PV001: gateway_PV001.crt + gateway_PV001.key"
Bezpieczeństwo: obowiązkowe praktyki w produkcji
- Wzajemne TLS zawsze: Zarówno klient, jak i serwer muszą przedstawić certyfikat. Zapobiega połączeniom z nieautoryzowanych bram, nawet jeśli znają one nazwę użytkownika/hasło.
- Sekrety poprzez zmienne środowiskowe lub menedżera sekretów: Nigdy twardy kod Tokeny InfluxDB, hasła MQTT lub klucze API w kodzie lub plikach konfiguracyjnych zaangażowany w git. Użyj sekretów Dockera, HashiCorp Vault lub Menedżera sekretów AWS.
- Segmentacja sieci: Broker MQTT, InfluxDB i Telegraf nie muszą nigdy nie ujawniaj się bezpośrednio w Internecie. Grafana tylko poprzez odwrotne proxy HTTPS. Broker MQTT dla bram brzegowych może być udostępniony poprzez dedykowaną sieć VPN.
- Rotacja certyfikatów: Certyfikaty Gateway są ważne przez 1 rok (365 dni). Ustaw alarmy Grafana na wygaśnięcie z 30-dniowym wyprzedzeniem.
- Dzienniki audytu MQTT: Włącz rejestrowanie wszystkich połączeń i tematów w Mosquicie. Integracja z SIEM w celu wykrywania anomalnego dostępu.
Wydajność: testy porównawcze i optymalizacja na dużą skalę
Pojemność stosu dla systemu o mocy 10 MW
Park fotowoltaiczny o mocy 10 MW składa się zazwyczaj z 200 falowników o mocy 50 kW każdy. Przy odpytywaniu co 5 sekund głośność komunikatów MQTT jest następująca:
| Część | Metryka | Tom |
|---|---|---|
| 200 falowników x 5s | 15 pól/wiadomość | 40 msg/s, 600 punktów danych/s |
| 10 stacji pogodowych x 10 s | 8 pól/wiadomość | 1 msg/s, 8 punktów danych/s |
| Suma roślin x 15 min | Agregaty strumieniowe | Obliczane w bazie danych |
| Całkowity | - | ~650 punktów danych/s, ~56M punktów danych/dzień |
| Przechowywanie surowca (30 dni) | ~1,68 miliarda punktów danych | ~8-15 GB (z kompresją InfluxDB) |
Tym wolumenem można w dużym stopniu zarządzać za pomocą jednego węzła InfluxDB 2.7 8 GB RAM i dysk SSD NVMe. EMQX na maszynie wirtualnej z 4 vCPU i 8 GB RAM obsługuje 100 tys. połączeń konkurenci z przepustowością 1 mln wiadomości/sekundę. Do pojedynczej instalacji z 200 inwerterów, jeden broker Mosquitto na Raspberry Pi 4 i więcej niż potrzeba (testy w świecie rzeczywistym: 10 tys. msg/s, 500 MB RAM).
Skrypt porównawczy MQTT
#!/usr/bin/env python3
"""
mqtt_benchmark.py
Benchmark del broker MQTT per simulare 200 inverter
Verifica throughput, latenza e affidabilità
"""
import asyncio
import json
import time
import random
import statistics
from datetime import datetime, timezone
import aiomqtt
async def simulate_inverter(
plant_id: str,
inverter_id: str,
broker_host: str,
broker_port: int,
duration_sec: int,
poll_interval: float = 5.0
) -> dict:
"""Simula un inverter che pubblica metriche su MQTT"""
messages_sent = 0
latencies = []
errors = 0
start_time = time.monotonic()
async with aiomqtt.Client(
hostname=broker_host,
port=broker_port,
client_id=f"bench_{plant_id}_{inverter_id}",
keepalive=30,
) as client:
while time.monotonic() - start_time < duration_sec:
# Simula metriche realistiche
base_power = 50000 * random.uniform(0.3, 1.0) # 0-50 kW
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"plant_id": plant_id,
"inverter_id": inverter_id,
"ac_power_w": base_power,
"ac_voltage_l1_v": random.uniform(225, 235),
"dc_voltage_v": random.uniform(550, 650),
"dc_current_a": base_power / random.uniform(550, 650),
"temp_cabinet_c": random.uniform(35, 65),
"efficiency_pct": random.uniform(94, 98.5),
"operating_state": 4, # MPPT
"error_code": 0,
"energy_wh_total": messages_sent * base_power * poll_interval / 3600,
}
topic = f"plant/{plant_id}/inverter/{inverter_id}/metrics"
t_send = time.monotonic()
try:
await client.publish(
topic=topic,
payload=json.dumps(payload).encode(),
qos=1,
)
latencies.append((time.monotonic() - t_send) * 1000) # ms
messages_sent += 1
except Exception:
errors += 1
await asyncio.sleep(poll_interval)
return {
"inverter_id": inverter_id,
"messages_sent": messages_sent,
"errors": errors,
"avg_latency_ms": statistics.mean(latencies) if latencies else 0,
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
async def run_benchmark(
num_inverters: int = 200,
duration_sec: int = 60,
broker_host: str = "localhost",
broker_port: int = 1883,
) -> None:
"""Esegui benchmark con N inverter simulati"""
print(f"Avvio benchmark: {num_inverters} inverter per {duration_sec}s")
print(f"Broker: {broker_host}:{broker_port}")
print("-" * 60)
start = time.monotonic()
tasks = [
simulate_inverter(
plant_id="BENCH001",
inverter_id=f"INV{i:03d}",
broker_host=broker_host,
broker_port=broker_port,
duration_sec=duration_sec,
poll_interval=5.0,
)
for i in range(1, num_inverters + 1)
]
results = await asyncio.gather(*tasks)
elapsed = time.monotonic() - start
# Aggregazione risultati
total_messages = sum(r["messages_sent"] for r in results)
total_errors = sum(r["errors"] for r in results)
all_latencies_avg = [r["avg_latency_ms"] for r in results]
all_latencies_p99 = [r["p99_latency_ms"] for r in results]
print(f"Risultati benchmark:")
print(f" Durata: {elapsed:.1f}s")
print(f" Messaggi inviati: {total_messages:,}")
print(f" Errori: {total_errors} ({total_errors/total_messages*100:.2f}%)")
print(f" Throughput: {total_messages/elapsed:.1f} msg/sec")
print(f" Latenza media: {statistics.mean(all_latencies_avg):.2f} ms")
print(f" Latenza P99: {statistics.mean(all_latencies_p99):.2f} ms")
print(f" Inverter senza errori: {sum(1 for r in results if r['errors'] == 0)}/{num_inverters}")
if __name__ == "__main__":
asyncio.run(run_benchmark(
num_inverters=200,
duration_sec=120,
broker_host="localhost",
broker_port=1883,
))
Studium przypadku: Monitoring parku fotowoltaicznego o mocy 10 MW
W tym studium przypadku opisano rzeczywiste wdrożenie platformy energetycznej IoT dla parku fotowoltaicznego od 10 MW w południowych Włoszech z 200 falowników SMA Sunny Tripower o mocy 50 kW oraz 8 stacji meteorologicznych.
Konfiguracja wstępna i wyzwania
Strona znajduje się w sieci WAN 4G z gwarantowaną przepustowością 10 Mb/s i zmiennym opóźnieniem pomiędzy 30 i 200 ms. Każdy falownik porozumiewa się za pośrednictwem protokołu Modbus TCP w lokalnej sieci LAN (192.168.xx). przełącznik przemysłowy. Bramką brzegową jest Raspberry Pi 4 (8 GB RAM) z dyskiem SSD 256 GB do lokalnego buforowania, podłączony do sieci WAN poprzez przemysłowy router 4G z dedykowaną kartą SIM do transmisji danych.
| Parametr | Wartość |
|---|---|
| Moc systemu | Szczyt 10,2 MW (SMA STP50-US-40) |
| Numer falownika | 200 (SMA Sunny Tripower, 50 kW) |
| Stacje pogodowe | 8 (napromienienie, T, HR, wiatr) |
| Bramy brzegowe | Raspberry Pi 4 8 GB + dysk SSD 256 GB |
| Falownik interwału odpytywania | 5 sekund |
| Interwał sondowania pogody | 10 sekund |
| Głośność komunikatów MQTT | ~41 msg/s (200 inw. x 1/5 s + 8 pogody x 1/10 s) |
| Wykorzystana przepustowość sieci WAN | ~180 KB/s (2% dostępnych 10 Mb/s) |
| Broker MQTT | EMQX 5.8 na maszynie wirtualnej w chmurze (4 vCPU, 8 GB) |
| NapływDB | 2.7.10 na dedykowanej maszynie wirtualnej (8 vCPU, 32 GB RAM, NVMe) |
Wyniki po 6 miesiącach działania
- Dostępność platformy: 99,94% (całkowity czas przestoju: 2,6 godziny w ciągu 6 miesięcy, wszystko w celu zaplanowanej konserwacji serwera w chmurze)
- Dane utracone z powodu rozłączenia sieci WAN: 0,002% (dzięki lokalnemu buforowaniu bramy i sesji trwałej MQTT)
- Automatycznie wykryte alerty o usterkach falownika: 47 wydarzeń w 6 miesięcy, Dzięki natychmiastowemu ostrzeganiu MTTR (średni czas naprawy) skrócono z 8 godzin do 2,3 godziny na PagerDuty (w porównaniu z ręcznym wykrywaniem następnego dnia)
- Średni współczynnik wydajności: 82,3% wobec 79,1% w poprzednim okresie (bez monitoring w czasie rzeczywistym): +3,2 punktu procentowego = +320 dodatkowych MWh/rok
- Oszczędności wynikające z konserwacji predykcyjnej: Wykrywanie 12 falowników za pomocą wczesna degradacja (temperatura radiatora +8°C w porównaniu do średniej) przed awarią ogłoszony. Interwencja zapobiegawcza szacowana na 45 tys. EUR oszczędności w porównaniu z awaryjną wymianą.
- Przechowywanie InfluxDB po 6 miesiącach: 18,7 GB na surowe dane (30 dni kroczące), 4,2 GB dla agregatów 1h (6 miesięcy), 0,8 GB dla agregatów 1d (łącznie 180 dni)
Wyciągnięta lekcja: Przewaga buforowania nie podlega negocjacjom
W pierwszym miesiącu działania połączenie 4G doznało 3 przerw przedłużony (po 2-6 godzin) ze względu na problemy operatora telefonicznego. Bez lokalnego buforowania na dysku SSD Raspberry Pi, utracilibyśmy wszystkie pomiary podczas tych okien. Dzięki buforowi 64 GB i logice przechowywania i przekazywania, za każdym razem, gdy połączenie zostanie przywrócone, wszystkie zgromadzone wiadomości zostaną usunięte opublikowane na MQTT w porządku chronologicznym z oryginalnymi znacznikami czasu i InfluxDB zaakceptował je poprawnie do wstawienia poza kolejnością.
Najlepsze praktyki i anty-wzorce
Najlepsze praktyki
- Znacznik czasu w ładunku, a nie w brokerze: Brama musi zawsze zawierać dokładnego znacznika czasu przechwycenia w ładunku JSON, nie polegaj na znaczniku czasu odbiór brokera MQTT. W przypadku buforowania offline wiadomości docierają z opóźnieniem ale dane są tymczasowe. InfluxDB akceptuje wstawki poza kolejnością.
-
Oddziel kanały telemetryczne od kanałów poleceń: Tematy telemetrii
(
plant/+/inverter/+/metrics) i rozkazowe (plant/+/commands/#) muszą znajdować się w oddzielnych przestrzeniach nazw z odrębnymi listami ACL. Polecenia wymagają QoS 2 e bardziej rygorystyczne uwierzytelnianie. - Agresywne próbkowanie w dół dla danych historycznych: Surowe dane po 5 sekundach są one cenne dla analizy pośmiertnej awarii, ale w przypadku trendów historycznych wystarczą agregaty co 15 minut lub razy. Natychmiast realizuj zadania próbkowania i przechowywania strumienia.
- Monitorowanie stanu bramy: Użyj pulsu MQTT bramy (publikuje co 60 lat na dedykowany temat) i monitoruje w Grafanie. Jeśli brakuje bicia serca przez 3 okresy, alert krytyczny: przyczyną może być awaria bramy, a nie problem z brokerem.
- Metryki systemu Gateway w InfluxDB: Telegraf na bramce brzegowej może zbierać dane dotyczące procesora, pamięci RAM, temperatury procesora, dostępnego miejsca na dysku dla bufora, i publikuj je na MQTT. Niezbędne jest, aby wiedzieć, czy Raspberry Pi jest pod obciążeniem.
Anty-wzorce, których należy unikać
Krytyczne anty-wzorce
-
Tematy zbyt szczegółowe bez agregacji: Wydaj każdy
Rejestr Modbus na osobny temat (np.
plant/PV001/INV001/register/40001) generuje dziesiątki tysięcy tematów i ogromny narzut metadanych w brokerze. Zawsze agreguj powiązane dzienniki w jeden ładunek JSON na urządzenie. - QoS 2 dla telemetrii wysokiej częstotliwości: Czterostronny uścisk dłoni QoS 2 czterokrotnie zwiększa liczbę komunikatów kontrolnych. W przypadku danych 1–5 Hz użyj QoS 1 (duplikaty obsługiwane przez InfluxDB poprzez idempotencję znacznika czasu). QoS 2 tylko dla poleceń.
-
Zachowaj temat telemetrii: Zachowanie jest przydatne w przypadku tematów statusu
(ostatnia wartość jest znacząca), ale w tematyce telemetrii wysokiej częstotliwości
broker musi zaktualizować zachowaną wiadomość przy każdej publikacji, co wiąże się z narzutem
pamięci i procesora. Użyj zachowania tylko dla
/statuse/config. -
Nieograniczona kardynalność w InfluxDB: Unikaj tagów z wysokimi wartościami
liczność (np. użycie
session_idjako znacznik dla stacji EV utwórz nowy seria dla każdej sesji). Zamiast tego użyj jako pola, a nie tagu. - Brak ograniczeń stawek w InfluxDB: Bez limitu jednoczesnych zapytań, pojedyncze, ciężkie zapytanie Grafana (np. eksport surowych danych z 6 miesięcy) może się nasycić Pamięć DB i powoduje zabójczy OOM. Zawsze konfiguruj limity zapytań.
Zasoby i referencje techniczne
- Specyfikacja MQTT 5.0 (OASIS): Oficjalna specyfikacja protokołu MQTT 5.0 szczegółowo opisuje QoS, zachowanie, komunikaty Will, zarządzanie sesją i właściwości użytkownika. Podstawa wdrożeń przedsiębiorstwo.
- Wtyczka konsumencka Telegraf MQTT: Oficjalna dokumentacja InfluxData opisuje wszystkie parametry konfiguracyjne wtyczki mqtt_consumer, w tym analizowanie tematów i obsługiwane formaty danych. Adres URL: docs.influxdata.com/telegraf
- Dokumentacja strumienia InfluxDB: Język Flux jest potężny, ale wymaga nauki. Dokumentacja oficjalny zawiera kompletną referencję ze wszystkimi funkcjami, w tym agregatemWindow, dołącz, obracaj i mapuj.
- Sojusz SunSpec: Standard SunSpec definiuje mapę rejestrów Modbus dla falowników fotowoltaicznych, akumulatory i mierniki. Dostępne na sunspec.org. Zapewnia interoperacyjność pomiędzy urządzeń różnych producentów.
- Dokumentacja EMQX: Dokumentacja EMQX zawiera przewodniki dotyczące klastrowania, silników reguł, zabezpieczeń i integracja z zewnętrznymi systemami pamięci masowej, w tym InfluxDB.
- Alarm Grafany: Dokumentacja Grafana dla ujednoliconego systemu ostrzegania (v9+) zawiera przewodniki dotyczące udostępniania poprzez YAML, konfiguracji i routingu punktów kontaktowych.
- Bezpieczeństwo IIoT: IEC 62443: Norma IEC 62443 określa wymagania bezpieczeństwa dla systemów automatyki i sterowania przemysłowego, w tym systemów energetycznych. Obowiązkowe dla certyfikatów NIS2 (wdrożony we Włoszech dekretem legislacyjnym 138/2024).
Wnioski i dalsze kroki
Zbudowaliśmy kompletną i gotową do produkcji platformę energetyczną IoT: od mostu Modbus-MQTT z lokalnym buforowaniem aż do wielokanałowego powiadamiania poprzez Grafana, PagerDuty i Luz. Stos MQTT + Telegraf + InfluxDB + Grafana jest przetestowany, udokumentowany i skalowalny: zarządza 650 punktami danych na sekundę dla systemu o mocy 10 MW z dużym marginesem i przy EMQX w klastrach i wielu węzłach InfluxDB skaluje się liniowo do wdrożeń na skalę użyteczności publicznej setki MW.
Liczby ze studiów przypadków mówią same za siebie: dostępność na poziomie 99,94%, redukcja MTTR od 8 po 2,3 godziny, +3,2 punktu współczynnika wydajności i oszczędności w zakresie konserwacji wynoszące 45 000 EUR przewidywalne za 6 miesięcy. Monitoring w czasie rzeczywistym nie jest luksusem dla systemów energetycznych odnawialne źródła energii: oraz bezpośredni mnożnik przychodów i okresu użytkowania aktywów.
Kolejny artykuł z serii EnergyTech porusza temat węgiel księgowość: jak budować platformy ESG, które obliczają uniknięte emisje, zielone certyfikaty i kredyty węglowe, począwszy od zebranych danych produkcyjnych z platformy IoT, którą właśnie zbudowaliśmy.
Powiązane artykuły
- Seria EnergyTech: Artykuł ten jest częścią dedykowanej serii do inżynierii oprogramowania dla energii odnawialnej. Poprzedni artykuł nt Obrazu dopełniają EV Load Balancing i kolejne dotyczące Carbon Accounting infrastruktury oprogramowania na potrzeby transformacji energetycznej.
- Seria MLOps: Zaimplementowano downsampling i wykrywanie anomalii z Fluxem to dopiero pierwszy krok. Seria MLOps opisuje, jak integrować modele ML (LSTM do prognozowania produkcji, las izolacyjny do wykrywania anomalii) z dane pochodzące z tej samej platformy InfluxDB.
- Seria biznesowa dotycząca danych i sztucznej inteligencji: Architektura MQTT-InfluxDB to: Przykład Lakehouse danych IoT. Seria Data & AI Business omawia, jak to zrobić Wprowadź te dane szeregów czasowych do potoków analityki korporacyjnej za pomocą DBT i Airflow.
- Seria PostgreSQL AI: Do bardziej skomplikowanych analiz wymagających POŁĄCZ z danymi relacyjnymi (np. danymi kontraktowymi, danymi podstawowymi zakładu, historią konserwacja), integracja InfluxDB-PostgreSQL poprzez Foreign Data Wrapper i opisywany w serii PostgreSQL AI.







