De la MQTT la InfluxDB: Platformă IoT pentru energie în timp real
Piața globală a energiei industriale IoT depășește 22 de miliarde de dolari în 2025 și crește cu un CAGR de 19,8% până la 54 de miliarde până în 2030. Fiecare sistem fotovoltaic de 10 MW generează dincolo 4 milioane de măsurători pe zi de la invertoare, senzori de radiații, contoare și înregistratoare de date meteorologice. Fiecare parc eolian adaugă date vibraționale, de cuplul și temperatura de la nacele. Fiecare stație de încărcare EV publică starea conexiunii, puterea de ieșire și starea de sănătate a conectorului la fiecare 30 de secunde.
Provocarea este să nu colectați aceste date: și să o faceți în mod fiabil, scalabil și cu latență sub-al doilea, menținerea istoricizării de-a lungul anilor și capacitatea de a face interogări analitice în timp real. Protocolul MQTT, creat pentru rețelele industriale cu lățime de bandă limitată, a devenit limbajul comun al energiei IIoT. InfluxDB și soluția de stocare în serie de timp cele mai adoptate pentru acest domeniu. Împreună, cu Telegraf ca pod și Grafana ca strat vizualizare, formează o stivă care gestionează de la mici sisteme casnice până la utilități multi-gigawatt.
Acest articol construiește o platformă IoT energetică completă din arhitectură end-to-end, prin configurarea detaliată a fiecărei componente, până la un Docker Compoziția de lucru și un studiu de caz real pe un parc fotovoltaic de 10 MW cu 200 de invertoare. Fiecare secțiune include cod testat și strategii de producție.
Ce veți învăța în acest articol
- Arhitectură end-to-end: senzori → gateway → MQTT → Telegraf → InfluxDB → Grafana
- MQTT deep dive: QoS, retain, will messages, topic namespace design for energy
- Comparație broker: Mosquitto vs EMQX vs HiveMQ pentru scalabilitate și clustering
- Telegraf: configurație consumer MQTT, conductă procesor, ieșire InfluxDB
- InfluxDB 3.x: proiectare compartiment, politică de reținere, interogări Flux și programare a sarcinilor
- Modbus RTU/TCP → MQTT bridge cu Python și pymodbus
- Docker Compose complet pentru stiva de producție
- Securitate: TLS mutual, autentificare MQTT, token InfluxDB, segmentare a rețelei
- Studiu de caz: parc fotovoltaic de 10 MW cu 200 de invertoare, 100K msg/sec
- Alertă avansată: integrare Grafana, PagerDuty și Slack
Seria EnergyTech - Locația articolului
| # | Articol | Nivel | Stat |
|---|---|---|---|
| 1 | Protocol OCPP 2.x: Construirea sistemelor de încărcare a vehiculelor electrice | Avansat | Publicat |
| 2 | Arhitectura DERMS: agregarea a milioane de resurse distribuite | Avansat | Publicat |
| 3 | Prognoza energiei regenerabile cu ML: Python LSTM | Avansat | Publicat |
| 4 | Sistem de management al bateriei pentru stocarea la scară în rețea | Avansat | Publicat |
| 5 | IEC 61850 pentru ingineri software: Comunicare Smart Grid | Avansat | Publicat |
| 6 | Echilibrare încărcare EV: algoritmi în timp real | Avansat | Publicat |
| 7 | Sunteți aici - De la MQTT la InfluxDB: Platformă IoT pentru energie în timp real | Avansat | Actual |
| 8 | Arhitectura software de contabilitate a carbonului: Platforme ESG | Avansat | Următorul |
| 9 | Digital Twin pentru infrastructură energetică: simulare în timp real | Avansat | În curând |
| 10 | Blockchain pentru tranzacționarea energiei P2P: contracte inteligente și constrângeri | Avansat | În curând |
Arhitectură de la capăt la capăt: de la senzor la tabloul de bord
Arhitectura unei platforme IoT de energie în timp real este împărțită în cinci niveluri distincte, fiecare cu responsabilități specifice și cerințe diferite de fiabilitate. Înțelegerea Această separare este esențială înainte de a scrie prima linie de configurare.
Nivelul 1: Câmp (Stratul câmpului)
La cel mai de jos nivel găsim dispozitivele fizice: invertoare fotovoltaice, anemometre, piranometre, contoare, PMU (Phasor Measurement Units), contoare inteligente și stații de încărcare pentru vehicule electrice. Cele mai multe dintre aceste dispozitive vorbesc protocoale de câmp industrial: Modbus RTU pe RS-485, Modbus TCP prin Ethernet, IEC 61850 în substații, DNP3 pentru rețele de utilități sau protocoale proprietare precum SunSpec pentru invertoare.
Aceste protocoale nu sunt compatibile nativ cu IP-ul modern sau cloud-ul. Sondajul lor Ciclul tipic variază de la 100 ms (PMU) la 60 de secunde (logger de date vechi). Frecvența optimă pentru un invertor fotovoltaic și 1-5 secunde pentru cantități electrice (tensiune, curent, putere) si 30-60 secunde pentru marimi termice (temperatura modulului, temperatura invertorului).
Nivelul 2: Gateway/Edge (Edge Layer)
Gateway-ul și traducătorul edge: citește protocoalele de câmp și publică prin MQTT către broker centrală. Poate fi un dispozitiv încorporat (Raspberry Pi 4, BeagleBone, Moxa ioThinx), un PLC cu stivă MQTT integrată sau un server edge local (NUC, ODROID) pentru mai multe instalări mare. Gateway-ul îndeplinește trei funcții critice:
- Traducerea protocolului: Modbus → MQTT, IEC 61850 → MQTT, DNP3 → MQTT
- Buffering local: Acumulează date în timpul deconectărilor WAN (stocare și redirecționare)
- Preprocesarea marginilor: filtrare, agregare, detectarea anomaliilor locale
Stratul 3: Broker de mesaje (Strat de transport)
Brokerul MQTT este inima sistemului de transport. Primește mesaje de la edge gateway-uri, li distribuie către abonați (Telegraf în primul rând, dar și alți consumatori, cum ar fi sistemele SCADA sau notificări în timp real). Alegerea brokerului depinde de scară: Mosquitto pentru instalații simple sau mici (sub 10.000 conexiuni), EMQX sau HiveMQ per implementare întreprindere și multi-site.
Nivelul 4: Ingestie și stocare (stratul de date)
Telegraf se abonează la subiectele MQTT și transformă mesajele în metrici InfluxDB folosind conducte de procesor configurabile. InfluxDB primește și indexează serii temporale cu retenție politici diferențiate: date brute de înaltă rezoluție pentru 30 de zile, agregate pe oră pentru 1 an, agregate zilnice timp de 5 ani. Sarcinile de flux programează reducerea eșantionării alerte automate și calculate.
Nivelul 5: Vizualizare și alertare (Stratul de prezentare)
Grafana se conectează la InfluxDB prin intermediul pluginului sursă de date și redă tablouri de bord în timp real actualizați la fiecare 5-30 de secunde. Sistemul de alertă Grafana evaluează interogările periodic și trimite notificări către PagerDuty, Slack, e-mail sau webhook-uri personalizate când pragurile sunt depășite.
Diagrama arhitecturală completă
Fluxul de date urmează această cale: Invertoare/Senzori (Modbus/SunSpec) → Gateway Edge (Raspberry Pi/NUC cu pymodbus) → Broker MQTT (Tânțari/EMQX) → Telegraf (MQTT Consumer + JSON Parser) → InfluxDB (serie temporală DB) → Grafana (tabloul de bord + alertă). În paralel, a Manager de alerte în InfluxDB evaluează sarcinile Flux în fiecare minut și poate scrie evenimente pe o găleată dedicată sau un punct final HTTP de apel.
MQTT Deep Dive: Protocol și design pentru energie
Fundamentele MQTT 5.0
MQTT (Message Queuing Telemetry Transport) este un protocol proiectat de publicare-abonare pentru rețele cu lățime de bandă limitată și latență mare. Versiunea 5.0 (OASIS Standard RFC 2019) adaugă funcții critice pentru aplicațiile de întreprindere: interval de expirare a sesiunii, mesaj interval de expirare, coduri motiv extinse, proprietăți utilizator în mesaje, controlul fluxului cu primiți maximum și aliasuri de subiect pentru a reduce dimensiunea antetelor.
QoS 0, 1 și 2: alegerea corectă pentru datele energetice
Calitatea serviciului este cea mai critică alegere în proiectarea unui sistem energetic MQTT. Cele trei opțiuni au implicații foarte diferite pentru debit, supraîncărcarea rețelei și garanții de livrare:
| QoS | garanție | deasupra capului | Utilizați energia pentru caz |
|---|---|---|---|
| QoS 0 | Cel mult o dată (foc și uită) | Minimum (1 RTT) | Telemetrie de înaltă frecvență (1Hz+), date în care pierderea ocazională este acceptabilă: iradiere, temperatura ambiantă |
| QoS 1 | Cel puțin o dată (cu duplicate) | Medie (2 RTT-uri) | Măsuri de producție (kWh, kW), alarme, stări dispozitiv - duplicate gestionate de InfluxDB prin marcaj de timp |
| QoS 2 | Exact uncii | Ridicat (4 RTT) | Comenzi de control (setare invertor, deschidere/închidere întrerupător), date financiare (energie vândută în rețea) |
Pentru un sistem fotovoltaic tipic, recomandarea practică este: QoS 0 pentru telemetrie meteorologice (iradiere, temperatură, vânt), QoS 1 pentru producție și alarme, QoS 2 numai pentru comenzi de control. Această alegere echilibrează fiabilitatea și debitul, permițând brokerului să gestioneze 50-100K mesaje/secundă fără a satura lățimea de bandă WAN.
Reține mesajele și mesajele Will
Două caracteristici MQTT adesea subestimate, dar critice pentru sistemele energetice:
-
Păstrați mesajele: Brokerul stochează ultimul mesaj cu retain=true
pentru fiecare subiect. Când un nou abonat se conectează, acesta primește imediat valoarea
actual fără a aștepta următoarea publicație. Fundamental pentru subiectele de stare:
plant/PV001/statuscu retain asigură că fiecare nou tablou de bord vede starea sistemului imediat. -
Mesaje testamentului (Ultima voință și testament): Gateway-ul configurează un mesaj
pe care brokerul îl publică automat dacă clientul se deconectează anormal.
Folosit pentru a semnala deconectarea gateway-ului: dacă Raspberry Pi al site-ului la distanță cade,
brokerul publică
plant/PV001/connectivity offlineimediat, fără a aștepta expirarea sesiunii.
Subiect Design spațiu de nume pentru sisteme energetice
Designul spațiului de nume de subiect este o decizie arhitecturală care afectează fiecare aspect sistem: rutare, filtrare, securitate ACL și organizare în InfluxDB. O structură ierarhică bine concepută urmează acest model:
# Schema topic namespace per impianto energetico
# Struttura: {tipo_impianto}/{plant_id}/{subsystem}/{device_id}/{metric_type}
# === FOTOVOLTAICO ===
# Inverter string-level metrics
plant/PV001/inverter/INV001/metrics
plant/PV001/inverter/INV001/alarms
plant/PV001/inverter/INV001/status
# String-level measurements
plant/PV001/string/STR001/metrics
plant/PV001/string/STR001/metrics/voltage
plant/PV001/string/STR001/metrics/current
# Meteorological sensors
plant/PV001/weather/WS001/metrics
# Plant-level aggregates (calcolati dal gateway)
plant/PV001/totals/power_ac
plant/PV001/totals/energy_today
# === EOLICO ===
wind/WF001/turbine/T001/nacelle/metrics
wind/WF001/turbine/T001/blade/metrics
wind/WF001/turbine/T001/gearbox/temperature
wind/WF001/substation/SS001/metrics
# === STAZIONI DI RICARICA EV ===
ev/SITE001/charger/CHRG001/connector/1/metrics
ev/SITE001/charger/CHRG001/session/current
ev/SITE001/charger/CHRG001/status
ev/SITE001/totals/energy_dispensed
# === SISTEMA ===
# Heartbeat e connectivity
plant/PV001/gateway/GW001/heartbeat # ogni 60s
plant/PV001/gateway/GW001/status # retain=true
plant/PV001/gateway/GW001/version # retain=true
# === WILDCARD SUBSCRIPTIONS ===
# Telegraf sottoscrive tutti i metrics:
plant/+/inverter/+/metrics # tutti gli inverter di tutti gli impianti
plant/PV001/+/+/metrics # tutti i metrics dell'impianto PV001
plant/# # tutto il plant PV001 (usare con cautela)
Reguli fundamentale pentru spațiul de nume: fiecare nivel trebuie să aibă sens semantic, subiectele de control (comenzi) trebuie separate de subiectele de telemetrie (date), utilizați snake_case pentru consecvență, evitați caracterele speciale și spațiile, limitați adâncimea la maximum 6 niveluri pentru compatibilitate cu diferiți brokeri și clienți.
Comparație broker MQTT: Mosquitto vs EMQX vs HiveMQ
Alegerea brokerului MQTT este fundamentală pentru scalabilitatea și fiabilitatea sistemului. Cei mai folosiți trei brokeri din sectorul energetic au caracteristici foarte diferite.
| Caracteristici | Mosquitto 2.x | EMQX 5.x | HiveMQ 4.x |
|---|---|---|---|
| Max. conexiuni | 100K (un singur nod) | 100M (cluster cu 23 de noduri) | 200M (clustere) |
| Clustering | Nici un nativ | Da (distribuit Erlang) | Da (întreprindere) |
| Versiunea MQTT | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 | 3.1, 3.1.1, 5.0 |
| Rule Engine | No | Da (bazat pe SQL) | Da (întreprindere) |
| Pod | Si | Si | Si |
| Autentificare | Fișiere, TLS, pluginuri | JWT, OAuth2, LDAP, mTLS | JWT, OAuth2, întreprindere |
| Licenţă | EPL/EDL (sursă deschisă) | Apache 2.0 / Enterprise | Comunitate / Întreprindere |
| RAM (conexiune 100K) | ~500 MB | ~2 GB | ~3 GB |
| Utilizați energia de caz | O singură plantă, gateway margine, laborator | Multi-site, la scară de utilitate, întreprindere | Conformitate la scară de utilitate, întreprindere |
Configurație Mosquitto pentru producție
# /etc/mosquitto/mosquitto.conf - Produzione con TLS e ACL
# Network listeners
listener 1883 localhost
listener 8883
protocol mqtt
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true # mutual TLS
use_identity_as_username true
# WebSocket listener per dashboard web
listener 9001
protocol websockets
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# Autenticazione
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistenza
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 60
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S
# Performance tuning
max_queued_messages 10000
max_packet_size 1048576 # 1MB
message_size_limit 262144 # 256KB per messaggio
max_connections 10000
max_keepalive 300
# /etc/mosquitto/acl - Access Control List per ruoli
# Telegraf: legge tutto
user telegraf_reader
topic read plant/#
topic read wind/#
topic read ev/#
# Gateway PV001: scrive solo per il suo impianto
user gateway_PV001
topic write plant/PV001/#
topic read plant/PV001/commands/#
# Gateway EV SITE001: scrive solo per il suo sito
user gateway_SITE001
topic write ev/SITE001/#
# Dashboard/SCADA: solo lettura
user dashboard_user
topic read plant/#
topic read wind/#
topic read ev/#
# Admin: tutto (solo per maintenance)
user mqtt_admin
topic #
Gateway Edge: Bridge Modbus → MQTT cu Python
Gateway-ul edge este componenta care conectează lumea protocoalelor de câmp industrial cu ecosistemul MQTT. Iată o implementare completă pentru invertoarele fotovoltaice cu Protocol Modbus TCP, inclusiv buffering local pentru a gestiona deconexiunile WAN.
#!/usr/bin/env python3
"""
gateway_modbus_mqtt.py
Gateway edge per inverter fotovoltaici Modbus TCP → MQTT
Compatibile con registro SunSpec (standard de facto per inverter PV)
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import aiofiles
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
import aiomqtt
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('gateway')
@dataclass
class InverterMetrics:
"""Metriche SunSpec inverter (Model 103 - Three Phase Inverter)"""
timestamp: str
plant_id: str
inverter_id: str
# AC output
ac_power_w: float # Potenza AC totale (W)
ac_voltage_l1_v: float # Tensione fase L1 (V)
ac_voltage_l2_v: float # Tensione fase L2 (V)
ac_voltage_l3_v: float # Tensione fase L3 (V)
ac_current_a: float # Corrente totale AC (A)
ac_frequency_hz: float # Frequenza rete (Hz)
# DC input
dc_voltage_v: float # Tensione DC ingresso (V)
dc_current_a: float # Corrente DC ingresso (A)
dc_power_w: float # Potenza DC ingresso (W)
# Energy
energy_wh_total: float # Energia prodotta totale (Wh)
energy_wh_today: float # Energia prodotta oggi (Wh)
# Thermal
temp_cabinet_c: float # Temperatura cabinet inverter (C)
temp_heatsink_c: float # Temperatura heatsink (C)
# Status
operating_state: int # 1=Off, 2=Sleeping, 3=Starting, 4=MPPT, 5=Throttled
error_code: int # Codice errore (0=no error)
# Efficiency
efficiency_pct: float # Efficienza conversione (%)
class ModbusMqttGateway:
"""
Gateway che legge metriche Modbus da inverter SunSpec
e le pubblica su MQTT con buffering locale per disconnessioni WAN
"""
def __init__(self, config: dict):
self.plant_id = config['plant_id']
self.inverters = config['inverters'] # lista di {id, host, port, unit_id}
self.mqtt_host = config['mqtt']['host']
self.mqtt_port = config['mqtt']['port']
self.mqtt_user = config['mqtt']['username']
self.mqtt_password = config['mqtt']['password']
self.poll_interval = config.get('poll_interval_sec', 5)
self.buffer_file = config.get('buffer_file', '/var/lib/gateway/buffer.jsonl')
self.clients: dict[str, AsyncModbusTcpClient] = {}
self._buffer: list[dict] = []
self._mqtt_connected = False
async def connect_inverter(self, inverter_cfg: dict) -> AsyncModbusTcpClient:
"""Crea e connette client Modbus TCP"""
client = AsyncModbusTcpClient(
host=inverter_cfg['host'],
port=inverter_cfg.get('port', 502),
timeout=5,
retries=3,
retry_on_empty=True,
)
await client.connect()
if not client.connected:
raise ConnectionError(
f"Cannot connect to inverter {inverter_cfg['id']} "
f"at {inverter_cfg['host']}:{inverter_cfg.get('port', 502)}"
)
logger.info(f"Connected to inverter {inverter_cfg['id']}")
return client
async def read_sunspec_model103(
self,
client: AsyncModbusTcpClient,
unit_id: int,
plant_id: str,
inverter_id: str
) -> Optional[InverterMetrics]:
"""
Legge registri SunSpec Model 103 (Three Phase Inverter)
SunSpec base address: 40001 (Modbus address 40000)
Model 103 start: 40070 (tipico, dipende da inverter)
"""
try:
# Leggi blocco principale Model 103: 40069 - 40094 (26 registri)
result = await client.read_holding_registers(
address=40069,
count=26,
slave=unit_id
)
if result.isError():
logger.error(f"Modbus error reading INV {inverter_id}: {result}")
return None
regs = result.registers
# Scala con SF (scale factors) SunSpec
ac_current_sf = self._sf(regs[10])
ac_voltage_sf = self._sf(regs[14])
ac_power_sf = self._sf(regs[17])
ac_freq_sf = self._sf(regs[19])
dc_current_sf = self._sf(regs[21])
dc_voltage_sf = self._sf(regs[23])
dc_power_sf = self._sf(regs[25])
# Leggi energia prodotta (registri 40093-40096)
energy_result = await client.read_holding_registers(
address=40093, count=4, slave=unit_id
)
energy_regs = energy_result.registers
energy_total_wh = ((energy_regs[0] << 16) | energy_regs[1]) * 1.0
# Leggi temperatura (registri 40103-40108)
temp_result = await client.read_holding_registers(
address=40103, count=3, slave=unit_id
)
temp_regs = temp_result.registers
ac_power_raw = self._signed(regs[16])
dc_power_w = self._signed(regs[24]) * (10 ** dc_power_sf)
efficiency = (ac_power_raw / dc_power_w * 100) if dc_power_w > 0 else 0.0
return InverterMetrics(
timestamp=datetime.now(timezone.utc).isoformat(),
plant_id=plant_id,
inverter_id=inverter_id,
ac_power_w=ac_power_raw * (10 ** ac_power_sf),
ac_voltage_l1_v=regs[11] * (10 ** ac_voltage_sf),
ac_voltage_l2_v=regs[12] * (10 ** ac_voltage_sf),
ac_voltage_l3_v=regs[13] * (10 ** ac_voltage_sf),
ac_current_a=(regs[7] + regs[8] + regs[9]) * (10 ** ac_current_sf),
ac_frequency_hz=regs[18] * (10 ** ac_freq_sf),
dc_voltage_v=self._signed(regs[22]) * (10 ** dc_voltage_sf),
dc_current_a=self._signed(regs[20]) * (10 ** dc_current_sf),
dc_power_w=dc_power_w,
energy_wh_total=energy_total_wh,
energy_wh_today=0.0, # da registro specifico vendor
temp_cabinet_c=self._signed(temp_regs[0]) * 0.01,
temp_heatsink_c=self._signed(temp_regs[1]) * 0.01,
operating_state=regs[0],
error_code=regs[1],
efficiency_pct=round(efficiency, 2),
)
except ModbusException as e:
logger.error(f"Modbus exception for {inverter_id}: {e}")
return None
def _signed(self, val: int) -> int:
"""Converte uint16 in int16 (signed)"""
return val if val < 32768 else val - 65536
def _sf(self, val: int) -> int:
"""Scale factor SunSpec: int16 nel range -10..10"""
s = self._signed(val)
return s if -10 <= s <= 10 else 0
async def publish_metrics(
self,
mqtt_client: aiomqtt.Client,
metrics: InverterMetrics
) -> None:
"""Pubblica metriche su MQTT con QoS 1"""
topic = f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/metrics"
payload = json.dumps(asdict(metrics), default=str)
await mqtt_client.publish(
topic=topic,
payload=payload.encode(),
qos=1,
retain=False,
)
# Pubblica anche stato con retain
status = {
"timestamp": metrics.timestamp,
"operating_state": metrics.operating_state,
"error_code": metrics.error_code,
"online": True,
}
await mqtt_client.publish(
topic=f"plant/{metrics.plant_id}/inverter/{metrics.inverter_id}/status",
payload=json.dumps(status).encode(),
qos=1,
retain=True, # retain per status
)
async def run(self):
"""Loop principale del gateway"""
# Carica buffer da disco se esiste
await self._load_buffer()
# Will message: pubblica offline status se il gateway crolla
will = aiomqtt.Will(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": False, "reason": "unexpected_disconnect"}).encode(),
qos=1,
retain=True,
)
async with aiomqtt.Client(
hostname=self.mqtt_host,
port=self.mqtt_port,
username=self.mqtt_user,
password=self.mqtt_password,
will=will,
keepalive=60,
tls_params=aiomqtt.TLSParameters(
ca_certs='/etc/gateway/certs/ca.crt',
certfile='/etc/gateway/certs/gateway.crt',
keyfile='/etc/gateway/certs/gateway.key',
),
) as mqtt:
self._mqtt_connected = True
logger.info("MQTT connected")
# Pubblica online status con retain
await mqtt.publish(
topic=f"plant/{self.plant_id}/gateway/GW001/status",
payload=json.dumps({"online": True}).encode(),
qos=1, retain=True,
)
# Svuota buffer offline
await self._flush_buffer(mqtt)
# Connetti a tutti gli inverter
for inv_cfg in self.inverters:
try:
self.clients[inv_cfg['id']] = await self.connect_inverter(inv_cfg)
except ConnectionError as e:
logger.error(f"Failed to connect: {e}")
# Loop principale di polling
while True:
poll_start = time.monotonic()
tasks = [
self._poll_and_publish(inv_cfg, mqtt)
for inv_cfg in self.inverters
]
await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - poll_start
sleep_time = max(0, self.poll_interval - elapsed)
await asyncio.sleep(sleep_time)
async def _poll_and_publish(self, inv_cfg: dict, mqtt: aiomqtt.Client):
"""Polling singolo inverter e pubblicazione"""
inv_id = inv_cfg['id']
client = self.clients.get(inv_id)
if client is None or not client.connected:
try:
client = await self.connect_inverter(inv_cfg)
self.clients[inv_id] = client
except ConnectionError:
return
metrics = await self.read_sunspec_model103(
client, inv_cfg.get('unit_id', 1),
self.plant_id, inv_id
)
if metrics:
try:
await self.publish_metrics(mqtt, metrics)
except Exception as e:
logger.warning(f"MQTT publish failed, buffering: {e}")
self._buffer.append(asdict(metrics))
await self._save_buffer()
async def _load_buffer(self):
"""Carica buffer dal disco"""
try:
async with aiofiles.open(self.buffer_file) as f:
async for line in f:
self._buffer.append(json.loads(line))
logger.info(f"Loaded {len(self._buffer)} buffered messages")
except FileNotFoundError:
pass
async def _save_buffer(self):
"""Salva buffer su disco"""
async with aiofiles.open(self.buffer_file, 'w') as f:
for item in self._buffer:
await f.write(json.dumps(item) + '\n')
async def _flush_buffer(self, mqtt: aiomqtt.Client):
"""Pubblica messaggi bufferizzati"""
if not self._buffer:
return
logger.info(f"Flushing {len(self._buffer)} buffered messages")
published = []
for item in self._buffer:
try:
metrics = InverterMetrics(**item)
await self.publish_metrics(mqtt, metrics)
published.append(item)
except Exception as e:
logger.error(f"Failed to flush buffered message: {e}")
break
self._buffer = [i for i in self._buffer if i not in published]
await self._save_buffer()
if __name__ == '__main__':
config = {
'plant_id': 'PV001',
'poll_interval_sec': 5,
'mqtt': {
'host': '10.0.1.10',
'port': 8883,
'username': 'gateway_PV001',
'password': 'secret',
},
'inverters': [
{'id': 'INV001', 'host': '192.168.1.101', 'port': 502, 'unit_id': 1},
{'id': 'INV002', 'host': '192.168.1.102', 'port': 502, 'unit_id': 1},
# ... fino a INV200 per impianto da 10 MW
],
'buffer_file': '/var/lib/gateway/buffer.jsonl',
}
asyncio.run(ModbusMqttGateway(config).run())
Telegraf: MQTT Bridge → InfluxDB cu Transformation Pipeline
Telegraf este colectorul de date al InfluxData, scris în Go, cu peste 300 de plugin-uri. În stiva noastră, Telegraf se abonează la subiecte MQTT, decodifică sarcina utilă JSON, aplică transformări prin procesor și scrie în InfluxDB. Versiunea 1.30+ acceptă nativ MQTT 5.0 și analiza subiectelor ca etichete.
Configurare Telegraf completă
# telegraf.conf - Configurazione completa per piattaforma IoT energetica
# ============================================================
# AGENT SETTINGS
# ============================================================
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
collection_jitter = "1s"
flush_interval = "10s"
flush_jitter = "1s"
precision = "1ns"
debug = false
quiet = false
logtarget = "file"
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_interval = "24h"
logfile_rotation_max_size = "50MB"
logfile_rotation_max_archives = 5
# ============================================================
# OUTPUT: InfluxDB 2.x / 3.x
# ============================================================
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "plant_metrics_raw"
timeout = "5s"
# Tag per routing - usato per bucket separati per plant
[outputs.influxdb_v2.tagpass]
data_type = ["inverter_metrics", "weather_metrics", "string_metrics"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "ev_metrics_raw"
timeout = "5s"
[outputs.influxdb_v2.tagpass]
data_type = ["ev_metrics"]
# Output di backup su file locale (per audit e recovery)
[[outputs.file]]
files = ["/var/log/telegraf/metrics.jsonl"]
rotation_interval = "1h"
rotation_max_size = "500MB"
data_format = "json"
[outputs.file.tagpass]
error_code = ["*"] # solo metriche con errori
# ============================================================
# INPUT: MQTT Consumer - Inverter Fotovoltaici
# ============================================================
[[inputs.mqtt_consumer]]
name_override = "pv_inverter"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/inverter/+/metrics"]
qos = 1
connection_timeout = "30s"
max_undelivered_messages = 5000
# Autenticazione mTLS
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-energy-01"
persistent_session = true # QoS 1+ richiede sessione persistente
# Parsing topic come tag
# topic: plant/PV001/inverter/INV001/metrics
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/inverter/+/metrics"
measurement = "_/plant_id/_/inverter_id/_"
tags = "_/plant_id/_/inverter_id/_"
# Estrae plant_id=PV001, inverter_id=INV001 come tag
# Formato payload
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l1_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l2_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_voltage_l3_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "ac_frequency_hz"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "dc_power_w"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_wh_total"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_cabinet_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temp_heatsink_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "operating_state"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "error_code"
type = "int"
[[inputs.mqtt_consumer.json_v2.field]]
path = "efficiency_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "plant_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "inverter_id"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05.999999999Z07:00" # RFC3339Nano
# INPUT: MQTT Consumer - Stazioni Meteo
[[inputs.mqtt_consumer]]
name_override = "pv_weather"
servers = ["ssl://mosquitto:8883"]
topics = ["plant/+/weather/+/metrics"]
qos = 0 # QoS 0 per dati meteo (perdita occasionale accettabile)
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-weather-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "plant/+/weather/+/metrics"
tags = "_/plant_id/_/station_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "irradiance_wm2"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "temperature_c"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "wind_speed_ms"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "humidity_pct"
type = "float"
[[inputs.mqtt_consumer.json_v2.timestamp]]
path = "timestamp"
format = "2006-01-02T15:04:05Z07:00"
# INPUT: MQTT Consumer - Stazioni EV
[[inputs.mqtt_consumer]]
name_override = "ev_charger"
servers = ["ssl://mosquitto:8883"]
topics = ["ev/+/charger/+/connector/+/metrics"]
qos = 1
tls_ca = "/etc/telegraf/certs/ca.crt"
tls_cert = "/etc/telegraf/certs/telegraf.crt"
tls_key = "/etc/telegraf/certs/telegraf.key"
username = "telegraf_reader"
password = "$MQTT_PASSWORD"
client_id = "telegraf-ev-01"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "ev/+/charger/+/connector/+/metrics"
tags = "_/site_id/_/charger_id/_/connector_id/_"
data_format = "json_v2"
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.field]]
path = "power_kw"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "energy_kwh"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "current_a"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "voltage_v"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "status"
type = "string"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "site_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "charger_id"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "connector_id"
# ============================================================
# PROCESSORS: Trasformazione dati
# ============================================================
# Aggiungi tag data_type per routing output
[[processors.converter]]
namepass = ["pv_inverter"]
[processors.converter.tags]
string = ["data_type"]
[[processors.override]]
namepass = ["pv_inverter"]
[processors.override.tags]
data_type = "inverter_metrics"
[[processors.override]]
namepass = ["ev_charger"]
[processors.override.tags]
data_type = "ev_metrics"
# Filtra metriche con valori anomali (sanity check)
[[processors.dedup]]
dedup_interval = "1s" # evita duplicati QoS 1
[[processors.enum]]
namepass = ["pv_inverter"]
[[processors.enum.mapping]]
field = "operating_state"
dest = "operating_state_str"
[processors.enum.mapping.value_mappings]
1 = "off"
2 = "sleeping"
3 = "starting"
4 = "mppt"
5 = "throttled"
6 = "shutting_down"
7 = "fault"
8 = "standby"
# ============================================================
# MONITORING: Telegraf self-monitoring
# ============================================================
[[inputs.internal]]
collect_memstats = true
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUXDB_TOKEN"
organization = "energy-corp"
bucket = "telegraf_monitoring"
[outputs.influxdb_v2.namepass]
internal = ["*"]
InfluxDB: Stocare în serie de timp și interogare pentru energie
Arhitectură InfluxDB 2.x / 3.x pentru sisteme energetice
InfluxDB este baza de date de referință în serie de timp pentru energie IoT. Versiunea 2.x (folosit încă pe scară largă în producție în 2025) folosește Flux ca limbaj de interogare și introduce conceptul de găleată cu politică integrată de reținere. Versiunea 3.x, rescris în Rust cu Apache Arrow și DataFusion, introduce SQL ca limbaj principal și îmbunătățește drastic scalabilitatea pentru cardinalitate ridicată.
Pentru un sistem de 10 MW cu 200 de invertoare prelevate la fiecare 5 secunde, volumul de date și considerabil: fiecare invertor generează aproximativ 15 câmpuri, pentru un total de 3.000 de câmpuri la fiecare 5 secunde sau 216.000 de puncte de date pe minut. Într-o singură zi sunt aproximativ 311 milioane puncte de date. O păstrare de 30 de zile necesită aproximativ 9,3 miliarde de puncte de date brute. Strategia de eșantionare este esențială.
Configurare inițială: găleți și organizare
#!/bin/bash
# setup_influxdb.sh - Configurazione iniziale InfluxDB per piattaforma energetica
INFLUX_HOST="http://localhost:8086"
INFLUX_TOKEN="my-super-secret-admin-token"
INFLUX_ORG="energy-corp"
# Setup iniziale (solo prima volta)
influx setup \
--host "$INFLUX_HOST" \
--username admin \
--password "SecurePassword123!" \
--org "$INFLUX_ORG" \
--bucket plant_metrics_raw \
--retention 720h \ # 30 giorni dati grezzi
--token "$INFLUX_TOKEN" \
--force
# Bucket dati grezzi PV (alta risoluzione: 30 giorni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_raw" \
--retention "720h" \
--shard-group-duration "24h" # shard giornalieri per query efficienti
# Bucket downsampled 1 ora (retention 1 anno)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1h" \
--retention "8760h" \
--shard-group-duration "168h" # shard settimanali
# Bucket downsampled 1 giorno (retention 5 anni)
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "plant_metrics_1d" \
--retention "43800h" \ # ~5 anni
--shard-group-duration "720h" # shard mensili
# Bucket per alerting e eventi
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "energy_alerts" \
--retention "8760h" # 1 anno per audit
# Bucket per stazioni EV
influx bucket create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--name "ev_metrics_raw" \
--retention "720h"
# Token con permessi limitati per Telegraf (write only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Telegraf write-only token" \
--write-bucket plant_metrics_raw \
--write-bucket ev_metrics_raw \
--write-bucket telegraf_monitoring
# Token per Grafana (read only)
influx auth create \
--host "$INFLUX_HOST" \
--token "$INFLUX_TOKEN" \
--org "$INFLUX_ORG" \
--description "Grafana read-only token" \
--read-bucket plant_metrics_raw \
--read-bucket plant_metrics_1h \
--read-bucket plant_metrics_1d \
--read-bucket ev_metrics_raw \
--read-bucket energy_alerts
echo "InfluxDB setup completato"
Sarcini de flux: eșantionare și alertare automată
Sarcinile Flux sunt joburi programate pe care Telegraf le execută periodic în InfluxDB. Le folosim în trei scopuri: eșantionarea automată a datelor brute, calculul KPI agregate și detectarea anomaliilor cu alerte de scriere în compartiment.
// task_downsample_1h.flux
// Task: Downsampling dati inverter da 5s a 1h
// Schedulato ogni ora, processa ultima ora
option task = {
name: "downsample_pv_inverter_1h",
every: 1h,
offset: 5m, // aspetta 5 min per assicurarsi che tutti i dati siano arrivati
}
from(bucket: "plant_metrics_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) =>
r._field == "ac_power_w" or
r._field == "dc_power_w" or
r._field == "energy_wh_total" or
r._field == "ac_voltage_l1_v" or
r._field == "temp_cabinet_c" or
r._field == "efficiency_pct"
)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) => tables
|> mean(column: column), // media oraria
createEmpty: false
)
// Aggiungi statistiche aggiuntive per power
|> map(fn: (r) => ({r with _measurement: "pv_inverter_1h"}))
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_plant_kpi.flux
// Task: Calcolo KPI giornalieri per impianto
// Produccion, efficienza, performance ratio
option task = {
name: "calculate_plant_daily_kpi",
every: 15m,
}
// Potenza attiva totale per impianto (somma tutti inverter)
totalPower = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 15m, fn: sum, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "total_ac_power_w",
}))
// Energia prodotta totale (max energy_wh_total - min energy_wh_total per periodo)
energyDelta = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "energy_wh_total")
|> difference()
|> group(columns: ["plant_id", "_time"])
|> sum()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "energy_delta_wh",
}))
// Temperatura media inverter
avgTemp = from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> aggregateWindow(every: 15m, fn: mean, createEmpty: false)
|> group(columns: ["plant_id", "_time"])
|> mean()
|> map(fn: (r) => ({r with
_measurement: "plant_totals",
_field: "avg_inverter_temp_c",
}))
union(tables: [totalPower, energyDelta, avgTemp])
|> to(bucket: "plant_metrics_1h", org: "energy-corp")
// task_thermal_alert.flux
// Task: Rilevamento surriscaldamento inverter
// Alert se temperatura > 75°C per più di 5 minuti
option task = {
name: "thermal_alert_inverter",
every: 1m,
}
threshold = 75.0
from(bucket: "plant_metrics_raw")
|> range(start: -6m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> filter(fn: (r) => r._value > threshold)
// Conta quanti valori sopra soglia negli ultimi 6 min (= almeno 5 min)
|> aggregateWindow(every: 6m, fn: count, createEmpty: false)
|> filter(fn: (r) => r._value >= 60) // 60 campioni * 5sec = 5 min
|> map(fn: (r) => ({r with
_measurement: "energy_alert",
_field: "alert_type",
_value: "thermal_overtemperature",
severity: "high",
description: "Inverter temperature exceeds " + string(v: threshold) + "°C for > 5 minutes",
}))
|> to(bucket: "energy_alerts", org: "energy-corp")
Flux de interogări pentru analiză fotovoltaică
// query_production_analysis.flux
// Analisi produzione giornaliera per impianto PV001
// Confronto con irraggiamento (Performance Ratio)
import "math"
// Produzione AC ogni 5 minuti
production = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
|> group(columns: ["plant_id"])
|> sum() // somma tutti gli inverter per potenza totale impianto
// Irraggiamento dalla stazione meteo
irradiance = from(bucket: "plant_metrics_raw")
|> range(start: today(), stop: now())
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: true)
// Join per calcolo Performance Ratio
// PR = (energia prodotta / (irraggiamento * superficie)) * 100
join(
tables: {production: production, irradiance: irradiance},
on: ["_time", "plant_id"]
)
// PV001: 10 MW peak, superficie pannelli ~50.000 m2
|> map(fn: (r) => ({r with
performance_ratio: if r.irradiance_irradiance_wm2 > 50.0 then
(r.production__value / (r.irradiance_irradiance_wm2 * 50000.0)) * 100.0
else
0.0,
}))
|> keep(columns: ["_time", "production__value", "irradiance_irradiance_wm2", "performance_ratio"])
Docker Compose: Stack complet de producție
Următorul Docker Compose este punctul de plecare pentru o implementare de producție a teanc complet. Fiecare serviciu are configurații de verificare a stării de sănătate, politică de repornire și volum rețele persistente și izolate pentru securitate.
# docker-compose.yml
# Stack IoT Energetico: Mosquitto + Telegraf + InfluxDB + Grafana
# Produzione-ready con TLS, health checks e persistent volumes
version: '3.9'
networks:
iot_network:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/24
# Network separata per accesso esterno (solo Grafana e Mosquitto esposti)
external_network:
driver: bridge
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
influxdb_config:
grafana_data:
grafana_provisioning:
telegraf_buffer:
services:
# ============================================================
# MOSQUITTO MQTT BROKER
# ============================================================
mosquitto:
image: eclipse-mosquitto:2.0.20
container_name: mosquitto
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "1883:1883" # MQTT plain (solo per sviluppo/testing locale)
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket over TLS
volumes:
- ./config/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./config/mosquitto/passwd:/mosquitto/config/passwd:ro
- ./config/mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/etc/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
healthcheck:
test: ["CMD", "mosquitto_pub", "-h", "localhost", "-p", "1883",
"-t", "health", "-m", "ping", "--quiet"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 512m
cpus: '1.0'
# ============================================================
# INFLUXDB 2.7 (stable production)
# ============================================================
influxdb:
image: influxdb:2.7-alpine
container_name: influxdb
restart: unless-stopped
networks:
- iot_network
ports:
- "8086:8086" # solo su rete interna, proxy via Nginx se necessario
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: {{ "$INFLUXDB_ADMIN_PASSWORD" }}
DOCKER_INFLUXDB_INIT_ORG: energy-corp
DOCKER_INFLUXDB_INIT_BUCKET: plant_metrics_raw
DOCKER_INFLUXDB_INIT_RETENTION: 720h
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: {{ "$INFLUXDB_TOKEN" }}
# Performance tuning
INFLUXD_STORAGE_CACHE_MAX_MEMORY_SIZE: 1073741824 # 1GB cache
INFLUXD_STORAGE_COMPACT_THROUGHPUT_BURST: 50331648 # 48MB/s compaction
INFLUXD_HTTP_MAX_BODY_SIZE: 104857600 # 100MB max body
INFLUXD_QUERY_MEMORY_BYTES: 2147483648 # 2GB query memory limit
volumes:
- influxdb_data:/var/lib/influxdb2
- influxdb_config:/etc/influxdb2
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
limits:
memory: 4g
cpus: '2.0'
# ============================================================
# TELEGRAF
# ============================================================
telegraf:
image: telegraf:1.34-alpine
container_name: telegraf
restart: unless-stopped
networks:
- iot_network
- external_network # per raggiungere MQTT su rete esterna
depends_on:
influxdb:
condition: service_healthy
mosquitto:
condition: service_healthy
environment:
INFLUXDB_TOKEN: {{ "$INFLUXDB_TOKEN" }}
MQTT_PASSWORD: {{ "$MQTT_PASSWORD" }}
HOSTNAME: telegraf-energy-01
volumes:
- ./config/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
- telegraf_buffer:/var/lib/telegraf
- /var/log/telegraf:/var/log/telegraf
healthcheck:
test: ["CMD", "telegraf", "--test", "--config", "/etc/telegraf/telegraf.conf"]
interval: 60s
timeout: 30s
retries: 3
deploy:
resources:
limits:
memory: 1g
cpus: '1.0'
# ============================================================
# GRAFANA
# ============================================================
grafana:
image: grafana/grafana-oss:11.4.0
container_name: grafana
restart: unless-stopped
networks:
- iot_network
- external_network
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: {{ "$GRAFANA_ADMIN_PASSWORD" }}
GF_SECURITY_SECRET_KEY: {{ "$GRAFANA_SECRET_KEY" }}
GF_SERVER_ROOT_URL: "https://grafana.energy-corp.it"
GF_SERVER_DOMAIN: "grafana.energy-corp.it"
# Analytics disabilitate per privacy
GF_ANALYTICS_REPORTING_ENABLED: "false"
GF_ANALYTICS_CHECK_FOR_UPDATES: "false"
# SMTP per alerting
GF_SMTP_ENABLED: "true"
GF_SMTP_HOST: {{ "$SMTP_HOST" }}
GF_SMTP_USER: {{ "$SMTP_USER" }}
GF_SMTP_PASSWORD: {{ "$SMTP_PASSWORD" }}
GF_SMTP_FROM_ADDRESS: "alerts@energy-corp.it"
# InfluxDB datasource
GF_DATASOURCES_DEFAULT: InfluxDB
INFLUXDB_TOKEN_GRAFANA: {{ "$INFLUXDB_TOKEN_GRAFANA" }}
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning:ro
- ./config/grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
influxdb:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:3000/api/health || exit 1"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
memory: 512m
cpus: '0.5'
# ============================================================
# NGINX REVERSE PROXY (opzionale ma raccomandato)
# ============================================================
nginx:
image: nginx:1.27-alpine
container_name: nginx
restart: unless-stopped
networks:
- external_network
ports:
- "80:80"
- "443:443"
volumes:
- ./config/nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
- /var/log/nginx:/var/log/nginx
depends_on:
- grafana
- mosquitto
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
Aprovizionare automată Grafana
Grafana acceptă furnizarea automată a surselor de date și a tablourilor de bord prin fișiere YAML.
Acest lucru este crucial pentru mediile containerizate în care containerele sunt recreate.
Dosarul config/grafana/provisioning/datasources/influxdb.yml trebuie să conţină
configurația sursei de date InfluxDB, în timp ce tablourile de bord în format JSON intră
config/grafana/dashboards/.
# config/grafana/provisioning/datasources/influxdb.yml
apiVersion: 1
datasources:
- name: InfluxDB-Raw
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_raw
tlsSkipVerify: false
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-1h
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: plant_metrics_1h
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
- name: InfluxDB-Alerts
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: energy-corp
defaultBucket: energy_alerts
secureJsonData:
token: $INFLUXDB_TOKEN_GRAFANA
Grafana: tablou de bord și alerte pentru sistem fotovoltaic
Structura tabloului de bord al sistemului fotovoltaic
Un tablou de bord eficient pentru un sistem fotovoltaic trebuie să răspundă la întrebări operaționale în mai puțin de 3 secunde de citire vizuală. Structura recomandată este organizată pe rânduri:
- Rândul 1 - KPI-uri zilnice: 4 panouri „stat” cu producția de astăzi (kWh), puterea curentă (kW), raportul de performanță (%), numărul de invertoare defect. Actualizare anii 30.
- Linia 2 - Producție temporală: Diagrama zonei cu putere totală de curent alternativ in ultimele 24 de ore, suprapus iradiantei normalizate. Interogați InfluxDB-Raw cu Fereastra agregată de 5 minute.
- Linia 3 - Harta termică a invertorului: Hartă termică cu axa X = timp, axa Y = inverter_id, valoare = putere AC. Vă permite să identificați vizual invertoarele care produc mai puțin. Actualizare de 1 minut.
- Linia 4 - Temperatură și alarme: Diagrama de măsurare pentru temperatura maximă a invertorului, panou de tabel cu ultimele 50 de alarme din grupul energy_alerts sortate după marcaj de timp.
- Linia 5 - Eficiența și calitatea rețelei: Eficiența diagramei de dispersie față de temperatură, serie de timp pentru frecvența rețelei și tensiunile de fază.
Alertă avansată: Grafana + PagerDuty + Slack
# config/grafana/provisioning/alerting/rules.yml
apiVersion: 1
groups:
- name: "PV Plant Alerts"
folder: "Energy Alerts"
interval: 1m
rules:
# Alert: Produzione impianto drasticamente ridotta
- uid: pv001-production-drop
title: "PV001 - Produzione ridotta >30%"
condition: C
data:
# Query A: Produzione attuale (15 min rolling mean)
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "ac_power_w")
|> mean()
|> sum()
# Query B: Irraggiamento attuale (per normalizzare)
- refId: B
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "pv_weather")
|> filter(fn: (r) => r.plant_id == "PV001")
|> filter(fn: (r) => r._field == "irradiance_wm2")
|> mean()
# Condition C: se irraggiamento > 200 W/m2 (giorno chiaro)
# e produzione normalizzata < 70% del teorico
- refId: C
datasourceUid: __expr__
model:
type: math
expression: "$A / ($B * 50000) < 0.70 and $B > 200"
for: 10m # alert solo se condizione persiste 10 min
labels:
severity: "warning"
plant_id: "PV001"
category: "production"
annotations:
summary: "PV001: produzione sotto il 70% del teorico"
description: |
Produzione attuale: {{ $A }} W
Irraggiamento: {{ $B }} W/m2
Performance attesa: > 70%
runbook_url: "https://wiki.energy-corp.it/runbook/pv-production-drop"
# Alert: Inverter in fault
- uid: pv001-inverter-fault
title: "PV001 - Inverter Fault Rilevato"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "operating_state")
|> filter(fn: (r) => r._value == 7) // stato fault
|> count()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [0]
type: gt
for: 1m
labels:
severity: "critical"
plant_id: "PV001"
annotations:
summary: "Inverter in stato FAULT rilevato"
# Alert: Temperatura critica
- uid: pv001-thermal-critical
title: "PV001 - Temperatura Inverter Critica"
condition: C
data:
- refId: A
datasourceUid: influxdb-raw-uid
model:
query: |
from(bucket: "plant_metrics_raw")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "pv_inverter")
|> filter(fn: (r) => r._field == "temp_cabinet_c")
|> max()
- refId: C
datasourceUid: __expr__
model:
type: threshold
expression: "$A"
conditions:
- evaluator:
params: [85]
type: gt
for: 5m
labels:
severity: "critical"
category: "thermal"
# Contact points
contactPoints:
- name: "PagerDuty Critical"
receivers:
- uid: pagerduty-critical
type: pagerduty
settings:
integrationKey: $PAGERDUTY_INTEGRATION_KEY
severity: critical
class: "EnergyAlerts"
component: "PV Plant"
- name: "Slack Operations"
receivers:
- uid: slack-ops
type: slack
settings:
url: $SLACK_WEBHOOK_URL
channel: "#energy-ops-alerts"
username: "Grafana Alert Bot"
iconEmoji: ":zap:"
title: "{{ template \"slack.title\" . }}"
text: "{{ template \"slack.message\" . }}"
# Routing
policies:
- receiver: "Slack Operations"
group_by: ["plant_id", "severity"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- receiver: "PagerDuty Critical"
matchers:
- "severity = critical"
continue: true # invia anche a Slack
Securitate: TLS, autentificare și segmentare a rețelei
Securitatea unei platforme energetice și critice IoT: datele de producție sunt informații comerciale sensibile și capacitatea de a trimite comenzi către invertoare trebuie protejat împotriva accesului neautorizat. Strategia de apărare în profunzime este împărțit în patru niveluri.
Generarea de certificate TLS cu Script
#!/bin/bash
# generate_certs.sh - Genera CA e certificati per mutual TLS
set -euo pipefail
CERTS_DIR="./certs"
mkdir -p "$CERTS_DIR"
# 1. CA (Certificate Authority) - Root of trust
openssl genrsa -out "$CERTS_DIR/ca.key" 4096
openssl req -x509 -new -nodes \
-key "$CERTS_DIR/ca.key" \
-sha256 -days 3650 \
-out "$CERTS_DIR/ca.crt" \
-subj "/C=IT/ST=Puglia/L=Bari/O=EnergyCorp/CN=Energy IoT CA"
# 2. Server certificate (Mosquitto)
openssl genrsa -out "$CERTS_DIR/server.key" 2048
openssl req -new \
-key "$CERTS_DIR/server.key" \
-out "$CERTS_DIR/server.csr" \
-subj "/C=IT/O=EnergyCorp/CN=mosquitto"
# SAN per tutti i possibili hostname
cat > "$CERTS_DIR/server-ext.cnf" <<EOF
[req]
req_extensions = v3_req
[v3_req]
subjectAltName = @alt_names
[alt_names]
DNS.1 = mosquitto
DNS.2 = mqtt.energy-corp.it
DNS.3 = localhost
IP.1 = 127.0.0.1
IP.2 = 172.20.0.10
EOF
openssl x509 -req \
-in "$CERTS_DIR/server.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/server.crt" \
-days 825 \
-sha256 \
-extfile "$CERTS_DIR/server-ext.cnf" \
-extensions v3_req
# 3. Client certificate per Telegraf
openssl genrsa -out "$CERTS_DIR/telegraf.key" 2048
openssl req -new \
-key "$CERTS_DIR/telegraf.key" \
-out "$CERTS_DIR/telegraf.csr" \
-subj "/C=IT/O=EnergyCorp/CN=telegraf"
openssl x509 -req \
-in "$CERTS_DIR/telegraf.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/telegraf.crt" \
-days 825 -sha256
# 4. Client certificate per gateway PV001
openssl genrsa -out "$CERTS_DIR/gateway_PV001.key" 2048
openssl req -new \
-key "$CERTS_DIR/gateway_PV001.key" \
-out "$CERTS_DIR/gateway_PV001.csr" \
-subj "/C=IT/O=EnergyCorp/CN=gateway_PV001"
openssl x509 -req \
-in "$CERTS_DIR/gateway_PV001.csr" \
-CA "$CERTS_DIR/ca.crt" \
-CAkey "$CERTS_DIR/ca.key" \
-CAcreateserial \
-out "$CERTS_DIR/gateway_PV001.crt" \
-days 365 -sha256 # 1 anno per gateway (rotazione annuale)
echo "Certificati generati in $CERTS_DIR/"
echo "CA: ca.crt"
echo "Server (Mosquitto): server.crt + server.key"
echo "Client Telegraf: telegraf.crt + telegraf.key"
echo "Client Gateway PV001: gateway_PV001.crt + gateway_PV001.key"
Siguranță: practici obligatorii în producție
- TLS reciproc întotdeauna: Atât clientul, cât și serverul trebuie să prezinte certificat. Împiedică conexiunile de la gateway-uri neautorizate chiar dacă cunosc nume de utilizator/parolă.
- Secrete prin variabile de mediu sau manager secret: Niciodată un cod dur Jetoane InfluxDB, parole MQTT sau chei API din codul sau fișierele de configurare angajat la git. Utilizați secretele Docker, HashiCorp Vault sau AWS Secrets Manager.
- Segmentarea rețelei: Brokerul MQTT, InfluxDB și Telegraf nu trebuie nu fi niciodată expus direct pe internet. Grafana numai prin proxy invers HTTPS. Brokerul MQTT pentru gateway-uri edge poate fi expus printr-un VPN dedicat.
- Rotația certificatelor: Certificatele de gateway sunt valabile 1 an (365 de zile). Setați alarmele Grafana să expire cu 30 de zile înainte.
- Jurnalele de audit MQTT: Activați înregistrarea tuturor conexiunilor și subiectelor în Mosquitto. Integrați cu un SIEM pentru detectarea accesului anormal.
Performanță: Benchmarking și optimizare pentru scară ridicată
Capacitate stiva pentru sistem de 10 MW
Un parc fotovoltaic de 10 MW are de obicei 200 de invertoare de 50 kW fiecare. Cu sondajul la fiecare 5 secunde, volumul mesajelor MQTT este după cum urmează:
| Componentă | Metrici | Volum |
|---|---|---|
| 200 invertoare x 5s | 15 câmpuri/mesaj | 40 msg/sec, 600 puncte de date/sec |
| 10 stații meteo x 10s | 8 câmpuri/mesaj | 1 msg/sec, 8 puncte de date/sec |
| Total plante x 15 min | Agregate de flux | Calculat în DB |
| Total | - | ~650 de puncte de date/sec, ~56 de milioane de puncte de date/zi |
| Depozitare brută (30 de zile) | ~1,68 miliarde de puncte de date | ~8-15 GB (cu compresie InfluxDB) |
Acest volum este gestionabil abundent de un singur nod InfluxDB 2.7 cu 8 GB RAM și SSD NVMe. EMQX pe VM cu 4 vCPU și 8 GB RAM gestionează 100.000 conexiuni concurenți cu un debit de 1 milion de mesaje/secundă. Pentru instalare unică de la 200 de invertoare, un singur broker Mosquitto pe Raspberry Pi 4 și mai mult decât suficient (teste din lumea reală: 10K msg/sec, 500 MB RAM).
Script de evaluare comparativă MQTT
#!/usr/bin/env python3
"""
mqtt_benchmark.py
Benchmark del broker MQTT per simulare 200 inverter
Verifica throughput, latenza e affidabilità
"""
import asyncio
import json
import time
import random
import statistics
from datetime import datetime, timezone
import aiomqtt
async def simulate_inverter(
plant_id: str,
inverter_id: str,
broker_host: str,
broker_port: int,
duration_sec: int,
poll_interval: float = 5.0
) -> dict:
"""Simula un inverter che pubblica metriche su MQTT"""
messages_sent = 0
latencies = []
errors = 0
start_time = time.monotonic()
async with aiomqtt.Client(
hostname=broker_host,
port=broker_port,
client_id=f"bench_{plant_id}_{inverter_id}",
keepalive=30,
) as client:
while time.monotonic() - start_time < duration_sec:
# Simula metriche realistiche
base_power = 50000 * random.uniform(0.3, 1.0) # 0-50 kW
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"plant_id": plant_id,
"inverter_id": inverter_id,
"ac_power_w": base_power,
"ac_voltage_l1_v": random.uniform(225, 235),
"dc_voltage_v": random.uniform(550, 650),
"dc_current_a": base_power / random.uniform(550, 650),
"temp_cabinet_c": random.uniform(35, 65),
"efficiency_pct": random.uniform(94, 98.5),
"operating_state": 4, # MPPT
"error_code": 0,
"energy_wh_total": messages_sent * base_power * poll_interval / 3600,
}
topic = f"plant/{plant_id}/inverter/{inverter_id}/metrics"
t_send = time.monotonic()
try:
await client.publish(
topic=topic,
payload=json.dumps(payload).encode(),
qos=1,
)
latencies.append((time.monotonic() - t_send) * 1000) # ms
messages_sent += 1
except Exception:
errors += 1
await asyncio.sleep(poll_interval)
return {
"inverter_id": inverter_id,
"messages_sent": messages_sent,
"errors": errors,
"avg_latency_ms": statistics.mean(latencies) if latencies else 0,
"p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
}
async def run_benchmark(
num_inverters: int = 200,
duration_sec: int = 60,
broker_host: str = "localhost",
broker_port: int = 1883,
) -> None:
"""Esegui benchmark con N inverter simulati"""
print(f"Avvio benchmark: {num_inverters} inverter per {duration_sec}s")
print(f"Broker: {broker_host}:{broker_port}")
print("-" * 60)
start = time.monotonic()
tasks = [
simulate_inverter(
plant_id="BENCH001",
inverter_id=f"INV{i:03d}",
broker_host=broker_host,
broker_port=broker_port,
duration_sec=duration_sec,
poll_interval=5.0,
)
for i in range(1, num_inverters + 1)
]
results = await asyncio.gather(*tasks)
elapsed = time.monotonic() - start
# Aggregazione risultati
total_messages = sum(r["messages_sent"] for r in results)
total_errors = sum(r["errors"] for r in results)
all_latencies_avg = [r["avg_latency_ms"] for r in results]
all_latencies_p99 = [r["p99_latency_ms"] for r in results]
print(f"Risultati benchmark:")
print(f" Durata: {elapsed:.1f}s")
print(f" Messaggi inviati: {total_messages:,}")
print(f" Errori: {total_errors} ({total_errors/total_messages*100:.2f}%)")
print(f" Throughput: {total_messages/elapsed:.1f} msg/sec")
print(f" Latenza media: {statistics.mean(all_latencies_avg):.2f} ms")
print(f" Latenza P99: {statistics.mean(all_latencies_p99):.2f} ms")
print(f" Inverter senza errori: {sum(1 for r in results if r['errors'] == 0)}/{num_inverters}")
if __name__ == "__main__":
asyncio.run(run_benchmark(
num_inverters=200,
duration_sec=120,
broker_host="localhost",
broker_port=1883,
))
Studiu de caz: Monitorizare parc fotovoltaic de 10 MW
Acest studiu de caz descrie implementarea în lumea reală a unei platforme energetice IoT pentru un parc fotovoltaic din 10 MW în sudul Italiei cu 200 de invertoare SMA Sunny Tripower de 50 kW și 8 stații de măsurare a vremii.
Configurare inițială și provocări
Site-ul se află pe o rețea 4G WAN cu lățime de bandă garantată de 10 Mbps și latență variabilă între 30 și 200 ms. Fiecare invertor vorbește Modbus TCP pe rețeaua LAN locală (192.168.x.x) prin comutator industrial. Gateway-ul edge este un Raspberry Pi 4 (8 GB RAM) cu 256 GB SSD pentru buffering local, conectat la WAN printr-un router industrial 4G cu SIM de date dedicat.
| Parametru | Valoare |
|---|---|
| Puterea sistemului | 10,2 MW vârf (SMA STP50-US-40) |
| Numărul invertorului | 200 (SMA Sunny Tripower, 50 kW) |
| Stații meteo | 8 (Iradianta, T, HR, vant) |
| Gateway-uri Edge | Raspberry Pi 4 8GB + 256GB SSD |
| Invertor de interval de sondare | 5 secunde |
| Interval de sondaj meteo | 10 secunde |
| Volumul mesajului MQTT | ~41 msg/sec (200 inv x 1/5s + 8 vreme x 1/10s) |
| Lățimea de bandă WAN utilizată | ~180 KB/s (2% din cei 10 Mbps disponibile) |
| Broker MQTT | EMQX 5.8 pe cloud VM (4 vCPU, 8 GB) |
| InfluxDB | 2.7.10 pe VM dedicat (8 vCPU, 32 GB RAM, NVMe) |
Rezultate dupa 6 luni de operare
- Disponibilitatea platformei: 99,94% (timp total de nefuncționare: 2,6 ore în 6 luni, toate pentru întreținerea programată a serverului cloud)
- Date pierdute din cauza deconectarii WAN: 0,002% (mulțumită tamponării locale al gateway-ului și al sesiunii persistente MQTT)
- Alerte de eroare a invertorului detectate automat: 47 de evenimente în 6 luni, MTTR (Timpul mediu de reparare) redus de la 8 ore la 2,3 ore datorită alertării imediate pe PagerDuty (comparativ cu descoperirea manuală a doua zi)
- Raportul mediu de performanță: 82,3% față de 79,1% în perioada anterioară (fără monitorizare în timp real): +3,2 puncte procentuale = +320 MWh suplimentari/an
- Economii de la întreținerea predictivă: Detectarea a 12 invertoare cu degradare timpurie (temperatura radiatorului +8°C față de medie) înainte de defecțiune proclamat. Intervenție preventivă estimată la 45.000 EUR în economii față de înlocuirea de urgență.
- Stocare InfluxDB după 6 luni: 18,7 GB pentru date brute (30 de zile rulante), 4,2 GB pentru agregate de 1 oră (6 luni), 0,8 GB pentru agregate de 1 zi (total 180 de zile)
Lecția învățată: marginea tamponării nu este negociabilă
În prima lună de funcționare, conexiunea 4G a suferit 3 întreruperi prelungit (2-6 ore fiecare) din cauza problemelor operatorului de telefonie. Fără tamponare locală pe SSD-ul Raspberry Pi, am fi pierdut toate măsurătorile în timpul acelor ferestre. Datorită memoriei tampon de 64 GB și logicii de stocare și redirecționare, de fiecare dată când conexiunea este din nou, toate mesajele acumulate au fost publicat pe MQTT în ordine cronologică cu marcajele de timp originale și InfluxDB le-a acceptat corect pentru inserare în afara ordinului.
Cele mai bune practici și anti-modele
Cele mai bune practici
- Marca temporală în sarcina utilă, nu în broker: Poarta de acces trebuie să includă întotdeauna marcajul de timp exact al capturii în sarcina utilă JSON, nu vă bazați pe marcajul de timp al recepția brokerului MQTT. În cazul tamponării offline, mesajele ajung cu întârziere dar datele sunt corecte din punct de vedere temporal. InfluxDB acceptă inserții necomandate.
-
Separați canalele de telemetrie de canalele de comandă: Subiecte de telemetrie
(
plant/+/inverter/+/metrics) și cele de comandă (plant/+/commands/#) acestea trebuie să fie pe spații de nume separate cu ACL-uri distincte. Comenzile necesită QoS 2 e autentificare mai strictă. - Eșantionare agresivă pentru datele istorice: Datele brute la 5 secunde sunt valoroase pentru analiza defecțiunilor post-mortem, dar pentru tendințele istorice agregatele sunt suficiente la 15 minute sau ori. Implementați imediat sarcinile de subeșantionare și reținere Flux.
- Monitorizarea sănătății Gateway: Utilizați ritmul cardiac MQTT al gateway-ului (publicați la fiecare 60 de ani pe un subiect dedicat) și monitorizați în Grafana. Dacă bătăile inimii lipsesc timp de 3 perioade, alertă critică: ar putea fi poarta care a căzut, nu o problemă de broker.
- Valorile sistemului de gateway în InfluxDB: Telegraf pe gateway margine poate colecta valori CPU, RAM, temperatura CPU, spațiu disponibil pe disc pentru buffer, și publică-le pe MQTT. Esențial pentru a ști dacă Raspberry Pi este sub stres.
Anti-modele de evitat
Anti-modele critice
-
Subiecte prea granulare fără agregare: Eliberați fiecare
Registrul Modbus pe subiect separat (de ex.
plant/PV001/INV001/register/40001) generează zeci de mii de subiecte și metadate imense în broker. Agregați întotdeauna jurnalele asociate într-o singură sarcină utilă JSON pe dispozitiv. - QoS 2 pentru telemetrie de înaltă frecvență: Strângerea de mână în patru căi a QoS 2 multiplică de patru ori numărul de mesaje de control. Pentru date de 1-5 Hz, utilizați QoS 1 (duplicatele gestionate de InfluxDB prin timestamp idempote). QoS 2 numai pentru comenzi.
-
Păstrați subiectul de telemetrie: Reținerea este utilă pentru subiectele de stare
(ultima valoare este semnificativă), dar pe subiecte de telemetrie de înaltă frecvență
brokerul trebuie să actualizeze mesajul reținut la fiecare publicație, cu overhead
de stocare și CPU. Utilizați reținere numai pentru
/statuse/config. -
Cardinalitate nelimitată în InfluxDB: Evitați etichetele cu valori mari
cardinalitatea (de exemplu, utilizarea
session_idca etichetă pentru stațiile EV creați un nou serie pentru fiecare sesiune). Utilizați în schimb ca un câmp, nu o etichetă. - Fără limitare a ratei pentru InfluxDB: Fără limită de interogări concurente, o singură interogare grea Grafana (de exemplu, exportul de 6 luni de date brute) se poate satura Memoria DB și cauza OOM ucigaș. Configurați întotdeauna limitele de interogare.
Resurse și referințe tehnice
- Specificație MQTT 5.0 (OASIS): Specificația oficială a protocolului MQTT 5.0 descrie în detaliu QoS, retain, mesajele vor, gestionarea sesiunii și proprietățile utilizatorului. Fundamental pentru implementări întreprindere.
- Plugin pentru consumatori Telegraf MQTT: Documentația oficială InfluxData descrie toți parametrii de configurare a pluginului mqtt_consumer, inclusiv analizarea subiectelor și formatele de date acceptate. Adresa URL: docs.influxdata.com/telegraf
- Documentația fluxului InfluxDB: Limbajul Flux este puternic, dar are o curbă de învățare. Documentația oficial include referință completă cu toate funcțiile, inclusiv aggregateWindow, alăturați, pivotați și hărțiți.
- Alianța SunSpec: Standardul SunSpec definește harta registrului Modbus pentru invertoarele fotovoltaice, baterii și contoare. Disponibil pe sunspec.org. Asigură interoperabilitatea între dispozitive de la diferiți producători.
- Documentația EMQX: Documentația EMQX include ghiduri pentru clustering, motoare de reguli, securitate și integrare cu sisteme de stocare externe, inclusiv InfluxDB.
- Alerta Grafana: Documentația Grafana pentru sistemul unificat de alertă (v9+) include ghiduri pentru furnizarea prin YAML, configurarea punctelor de contact și rutare.
- Securitate IIoT: IEC 62443: Standardul IEC 62443 definește cerințele de siguranță pentru sistemele de automatizare și control industrial, inclusiv sisteme energetice. Obligatoriu pentru certificari NIS2 (implementat în Italia cu Decretul Legislativ 138/2024).
Concluzii și pașii următori
Am construit o platformă IoT energetică completă și pregătită pentru producție: de la punte Modbus-MQTT cu tamponare locală până la alerte multicanal prin Grafana, PagerDuty și Slack. Stack-ul MQTT + Telegraf + InfluxDB + Grafana este testat, documentat și scalabil: gestionează 650 de puncte de date/secundă pentru un sistem de 10 MW cu marjă abundentă, și cu EMQX în clustering și noduri multiple InfluxDB se scalează liniar la implementări la scară de utilitate sute de MW.
Cifrele studiului de caz vorbesc de la sine: disponibilitate de 99,94%, reducere MTTR de la 8 la 2,3 ore, +3,2 puncte de raport de performanță și economii de întreținere de 45.000 EUR predictiv în 6 luni. Monitorizarea în timp real nu este un lux pentru sistemele energetice surse regenerabile: și un multiplicator direct al veniturilor și al duratei de viață utilă a activelor.
Următorul articol din seria EnergyTech abordează subiectul carbon contabilitate: cum să construiți platforme ESG care calculează emisiile evitate, certificate verzi și credit de carbon pornind de la datele de producție colectate de pe platforma IoT pe care tocmai am construit-o.
Articole înrudite
- Seria EnergyTech: Acest articol face parte din seria dedicată la inginerie software pentru energie regenerabilă. Articolul anterior despre Echilibrarea sarcinii EV și cea ulterioară privind Contabilitatea carbonului completează imaginea a infrastructurii software pentru tranziția energetică.
- Seria MLOps: Eșantionarea și detectarea anomaliilor implementate cu Flux sunt doar primul pas. Seria MLOps descrie modul de integrare a modelelor ML (LSTM pentru prognoza producției, Pădure de izolare pentru detectarea anomaliilor) cu date care provin de la aceeași platformă InfluxDB.
- Seria de afaceri de date și IA: Arhitectura MQTT-InfluxDB este a Exemplu de data lakehouse IoT. Seria Data & AI Business analizează cum Aduceți aceste date din seria temporală în conductele de analiză ale întreprinderii cu DBT și Airflow.
- Seria PostgreSQL AI: Pentru analize mai complexe care necesită JOIN cu date relaționale (de exemplu, date contractuale, date principale ale fabricii, istoric întreținere), integrarea InfluxDB-PostgreSQL prin Foreign Data Wrapper și prezentat în seria PostgreSQL AI.







