Digitální dvojče pro energetickou infrastrukturu: Simulace v reálném čase
Globální trh digitálního dvojčete pro energetický sektor dosáhl 34,1 miliardy dolarů v roce 2025 a do roku 2034 poroste s CAGR 34,7 %, podle Fortune Business Insights. Toto není budoucí příslib: veřejné služby Evropské země již provozují digitální dvojčata VN/VN rozvoden a větrných elektráren offshore a distribuční sítě, které synchronizují tisíce měřicích bodů za sekundu. V Itálii spustily společnosti Terna a Enel pilotní programy digitálního dvojčete pro rozvodnou síť vnitrostátní přenos jako součást iniciativ PNRR pro energetickou transformaci.
Un energetické digitální dvojče není to pouze simulační model statické. Je to obousměrný systém, který odráží fyzický stav rostliny v čase reálné, umožňuje spouštět prediktivní simulace, vyhodnocovat scénáře typu what-if a příkazy fyzický majetek prostřednictvím zpětné vazby. Rozdíl oproti jednoduchému SCADA je hluboký: kde SCADA monitoruje a řídí, digitální dvojče rozumí, simulovat e předpokládá.
V tomto článku zkoumáme praktickou implementaci digitálních dvojčat pro infrastrukturu energie: z fyzikálního modelování elektrických sítí s síla panda e PyPSA, integrace se SCADA/IoT, prediktivní údržba na transformátorech s ML až po reálné případy rozvoden VN/VN a pobřežních větrných elektráren.
Co se naučíte
- Taxonomie digitálního dvojčete: deskriptivní, informativní, prediktivní, preskriptivní
- Architektura vrstvy: fyzická entita, datová vrstva, modelová vrstva, vizualizace
- Fyzikální modely: Newton-Raphsonův výkonový tok, transformátory tepelného modelu, degradační křivky
- Příjem dat ze SCADA, IoT senzorů, počasí API a chytrých měřičů
- Simulace sítě s pandapower a PyPSA v Pythonu
- Synchronizace v reálném čase: digitální stín vs obousměrné digitální dvojče
- Prediktivní údržba: Random Forest a XGBoost pro RUL na transformátorech
- Co když scénáře: N-1 nepředvídané události a plánování rozšíření sítě
- CIM IEC 61970/61968 a CGMES pro modelovou interoperabilitu
- Azure Digital Twins vs AWS IoT TwinMaker: výběr cloudové platformy
- Případová studie: VN/VN rozvodna a pobřežní větrná farma
- Digitální dvojče Edge pro minimální latenci na průmyslových branách
Řada EnergyTech: 10 článků o digitální energii
Toto je devátý článek ze série EnergyTech, věnované protokolům, na softwarové architektury a algoritmy, které transformují energetický management elektřiny v éře energetické transformace.
| # | Položka | Technologie | Úroveň |
|---|---|---|---|
| 1 | Protokol OCPP 2.x: Budování nabíjecích systémů EV | OCPP, WebSocket, Python, ISO 15118 | Moderní |
| 2 | Architektura DERMS: Agregace milionů distribuovaných zdrojů | DERMS, REST, MQTT, PostgreSQL | Moderní |
| 3 | Prognóza obnovitelné energie s ML: LSTM pro solární a větrnou energii | Python, LSTM, Prophet, FastAPI | Moderní |
| 4 | Battery Management System pro Grid-Scale Storage | BMS, SoC/SoH, Python, sběrnice CAN | Moderní |
| 5 | IEC 61850 pro softwarové inženýry: Smart Grid Communication | IEC 61850, GOOSE, MMS, SCADA | Moderní |
| 6 | EV Charging Load Balancing: Real-Time Algorithms | Algoritmy, Python, OCPP, WebSocket | Moderní |
| 7 | Od MQTT k InfluxDB: Energetická platforma IoT v reálném čase | MQTT, InfluxDB, Telegraf, Grafana | Střední |
| 8 | Softwarová architektura uhlíkového účetnictví: Platformy ESG | Protokol GHG, Python, API, reporting | Střední |
| 9 | Digitální dvojče pro energetickou infrastrukturu (jste zde) | pandapower, PyPSA, ML, Azure DT, Python | Moderní |
| 10 | Blockchain pro P2P obchodování s energií: Chytré smlouvy a omezení | Solidita, Ethereum, chytré smlouvy | Moderní |
Taxonomie energetických digitálních dvojčat
Ne všechna „digitální dvojčata“ jsou si rovni. Přijatý rámec Grieten-Kritzinger podle IEC a mnoha průmyslových prodejců, definuje čtyři úrovně vyspělosti v na základě směru toku dat a kapacity zpracování:
| Úroveň | Typ | Datový tok | kapacita | Příklad energie |
|---|---|---|---|---|
| L1 | Digitální stín (popisný) | Fyzické → Digitální | Sledování aktuálního stavu | Řídicí panel SCADA v reálném čase |
| L2 | Informační digitální dvojče | Fyzické ↔ Digitální (historické) | Historická analýza, KPI, trendy | Transformátor pro sledování stavu majetku |
| L3 | Prediktivní digitální dvojče | Fyzické ↔ Digitální + ML | Predikce poruch, RUL, simulace | Předpověď životnosti VN kabelu |
| L4 | Předpis digitálního dvojčete | Fyzický ↔ Digitální (obousměrný aktivní) | Doporučení/provedení optimálních akcí | Automatické přeplánování údržby |
Digitální stín vs digitální dvojče
Un digitální stín sbírá data z těla, ale neovlivňuje je (jednosměrné proudění). A digitální dvojče skutečný má a kanál zpětné vazby: může posílat nastavené hodnoty, příkazy nebo parametry do fyzického majetku, uzavření regulační smyčky. V praxi většina průmyslových systémů dnes funguje na L1-L2 s cílem dosáhnout L3-L4 do roku 2027.
Architektura vrstev energetického digitálního dvojčete
Digitální dvojče pro energetickou infrastrukturu na podnikové úrovni se skládá ze čtyř různé vrstvy, z nichž každá má přesně definované odpovědnosti:
Vrstva 1: Fyzické entity
Instrumentovaná fyzická aktiva: VN/VN transformátory se senzory DGA (Dissolved Gas). Analýza), přípojnice s proudovými transformátory, venkovní vedení s teplotními čidly a větru, generátory s monitorováním vibrací, baterie se senzory SoC/SoH. Vrstva fyzik produkuje nezpracovaná data s proměnnými frekvencemi: od 1 vzorku za hodinu pro teplotu prostředí při 1 vzorku/ms pro ochranná opatření IEC 61850.
Vrstva 2: Datová vrstva
Vrstva příjmu dat, normalizace a ukládání. Zahrnuje:
- SCADA/EMS: provozní data v reálném čase přes DNP3, Modbus, IEC 61850
- Platforma IoT: bezdrátových senzorů přes MQTT/AMQP do cloud brokera
- Time-Series DB: InfluxDB nebo TimescaleDB pro vysokorychlostní telemetrii
- Datové jezero: historické úložiště pro školení ML (S3, ADLS)
- Počasí API: meteorologická data pro tepelné modely a předpovědi slunce/větru
- Chytré měřiče: profily zatížení agregované prostřednictvím DLMS/COSEM
Vrstva 3: Vrstva modelu
Počítačové srdce: fyzické a ML modely, které replikují chování aktiv. Zahrnuje řešení toku energie, tepelné modely, modely degradace a ML pro prediktivní údržbu.
Vrstva 4: Vizualizace a integrace
Provozní dashboardy (Grafana, Power BI), 3D vykreslování rozvoden (Three.js, Unity, Siemens NX), REST/WebSocket API pro systémy třetích stran (ERP, CMMS, EMS) a rozhraní pro normativní kontrolu.
Fyzikální modely: Tok energie s Newton-Raphsonem
Srdcem každého digitálního dvojčete elektrické sítě je řešič toku energie. Newton-Raphsonův algoritmus řeší nelineární rovnice, které popisují rovnováhu činného a jalového výkonu na každém uzlu (sběrnici) sítě. S síla panda můžeme postavit kompletní model rozvodny a simulovat její chování během několika milisekund.
Instalace a nastavení
# Installazione dependencies
pip install pandapower pypsa numpy pandas scikit-learn xgboost
pip install influxdb-client paho-mqtt asyncio aiohttp
pip install plotly dash # per visualizzazione
# Versioni consigliate (2025)
# pandapower==2.14.x
# pypsa==0.31.x
# scikit-learn==1.5.x
# xgboost==2.1.x
Síťové modelování pomocí pandapower
Stavíme kompletní model VN/VN rozvodny s rozvody, transformátory, zátěže a distribuované generátory:
import pandapower as pp
import pandapower.networks as pn
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
import json
@dataclass
class SubstationConfig:
"""Configurazione sottostazione AT/MT"""
name: str
voltage_hv_kv: float = 132.0 # Alta Tensione kV
voltage_mv_kv: float = 20.0 # Media Tensione kV
transformer_mva: float = 40.0 # Potenza trasformatore MVA
n_feeders: int = 6 # Numero feeder MT
peak_load_mw: float = 28.0 # Carico picco MW
pv_capacity_mw: float = 4.5 # Fotovoltaico connesso MW
class EnergyDigitalTwin:
"""
Digital Twin per sottostazione AT/MT con simulazione power flow real-time.
Implementa aggiornamento continuo dallo stato SCADA.
"""
def __init__(self, config: SubstationConfig):
self.config = config
self.net = pp.create_empty_network(name=config.name)
self._build_network_model()
self._state_cache: dict = {}
def _build_network_model(self) -> None:
"""Costruisce il modello di rete dalla configurazione."""
cfg = self.config
# Bus AT (slack bus = riferimento di tensione)
self.bus_ht = pp.create_bus(
self.net,
vn_kv=cfg.voltage_hv_kv,
name="Bus_AT_Main",
type="b"
)
# Bus MT primario (lato secondario trasformatore)
self.bus_mt_primary = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name="Bus_MT_Sbarra",
type="b"
)
# Trasformatore AT/MT principale
pp.create_transformer_from_parameters(
self.net,
hv_bus=self.bus_ht,
lv_bus=self.bus_mt_primary,
sn_mva=cfg.transformer_mva,
vn_hv_kv=cfg.voltage_hv_kv,
vn_lv_kv=cfg.voltage_mv_kv,
vkr_percent=0.3, # perdite rame %
vk_percent=8.5, # tensione di cortocircuito %
pfe_kw=35.0, # perdite ferro kW
i0_percent=0.08, # corrente a vuoto %
name="TR_AT_MT_Main",
tap_pos=0, # posizione tap changer
tap_neutral=0,
tap_min=-8,
tap_max=8,
tap_step_percent=1.25
)
# Rete esterna AT (equivalente di Thevenin della rete)
pp.create_ext_grid(
self.net,
bus=self.bus_ht,
vm_pu=1.0,
va_degree=0.0,
name="Rete_Trasmissione",
s_sc_max_mva=3500.0, # potenza di cortocircuito rete
rx_max=0.1
)
# Crea feeder MT con carichi e generazione
self.feeder_buses = []
self.load_ids = []
self.pv_ids = []
for i in range(cfg.n_feeders):
# Bus feeder
bus_feeder = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name=f"Bus_Feeder_{i+1:02d}"
)
self.feeder_buses.append(bus_feeder)
# Linea MT dal bus principale al feeder (cavo XLPE 150mm2)
pp.create_line_from_parameters(
self.net,
from_bus=self.bus_mt_primary,
to_bus=bus_feeder,
length_km=2.5 + i * 0.8,
r_ohm_per_km=0.206, # resistenza XLPE 150mm2
x_ohm_per_km=0.083,
c_nf_per_km=320.0,
max_i_ka=0.261,
name=f"Linea_Feeder_{i+1:02d}"
)
# Carico per feeder (distribuzione uniforme + varianza)
load_mw = cfg.peak_load_mw / cfg.n_feeders * (0.8 + 0.4 * np.random.rand())
load_id = pp.create_load(
self.net,
bus=bus_feeder,
p_mw=load_mw,
q_mvar=load_mw * 0.3, # power factor ~ 0.958
name=f"Carico_Feeder_{i+1:02d}",
controllable=False
)
self.load_ids.append(load_id)
# Generatore fotovoltaico (bus MT primario)
pv_id = pp.create_sgen(
self.net,
bus=self.bus_mt_primary,
p_mw=cfg.pv_capacity_mw,
q_mvar=0.0,
name="FV_Rooftop",
type="PV",
controllable=True
)
self.pv_ids.append(pv_id)
def update_from_scada(self, scada_data: dict) -> None:
"""
Aggiorna il modello con dati real-time da SCADA/IoT.
Args:
scada_data: dizionario con misure correnti
- load_mw_per_feeder: lista potenze attive
- pv_mw: generazione fotovoltaica attuale
- tap_position: posizione tap changer
- ambient_temp_c: temperatura ambiente
"""
# Aggiorna carichi per feeder
loads = scada_data.get("load_mw_per_feeder", [])
for i, (load_id, p_mw) in enumerate(zip(self.load_ids, loads)):
q_mvar = p_mw * 0.30 # power factor fisso (da modello)
self.net.load.at[load_id, "p_mw"] = p_mw
self.net.load.at[load_id, "q_mvar"] = q_mvar
# Aggiorna generazione PV
pv_mw = scada_data.get("pv_mw", 0.0)
if self.pv_ids:
self.net.sgen.at[self.pv_ids[0], "p_mw"] = pv_mw
# Aggiorna posizione tap changer
tap = scada_data.get("tap_position", 0)
self.net.trafo.at[0, "tap_pos"] = np.clip(tap, -8, 8)
# Cache stato per confronto
self._state_cache = scada_data.copy()
def run_power_flow(self) -> dict:
"""
Esegue simulazione Newton-Raphson e restituisce risultati.
Returns:
Dizionario con tensioni, correnti, perdite e stato componenti
"""
try:
pp.runpp(
self.net,
algorithm="nr", # Newton-Raphson
calculate_voltage_angles=True,
max_iteration=50,
tolerance_mva=1e-8,
enforce_q_lims=True
)
results = {
"converged": True,
"buses": self._extract_bus_results(),
"lines": self._extract_line_results(),
"transformer": self._extract_trafo_results(),
"losses_mw": float(self.net.res_ext_grid["p_mw"].sum() -
self.net.res_load["p_mw"].sum() +
self.net.res_sgen["p_mw"].sum()),
"timestamp": pd.Timestamp.now().isoformat()
}
return results
except pp.LoadflowNotConverged:
return {
"converged": False,
"error": "Power flow non convergente - possibile violazione limiti",
"timestamp": pd.Timestamp.now().isoformat()
}
def _extract_bus_results(self) -> list:
results = []
for idx, row in self.net.res_bus.iterrows():
bus_name = self.net.bus.at[idx, "name"]
vn_kv = self.net.bus.at[idx, "vn_kv"]
results.append({
"bus_id": int(idx),
"name": bus_name,
"vm_pu": float(row["vm_pu"]),
"vm_kv": float(row["vm_pu"] * vn_kv),
"va_degree": float(row["va_degree"]),
"within_limits": 0.95 <= float(row["vm_pu"]) <= 1.05
})
return results
def _extract_line_results(self) -> list:
results = []
for idx, row in self.net.res_line.iterrows():
results.append({
"line_id": int(idx),
"name": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"]),
"i_ka": float(row["i_ka"]),
"pl_mw": float(row["pl_mw"]),
"overloaded": float(row["loading_percent"]) > 100.0
})
return results
def _extract_trafo_results(self) -> dict:
row = self.net.res_trafo.iloc[0]
return {
"loading_percent": float(row["loading_percent"]),
"pl_mw": float(row["pl_mw"]),
"i_hv_ka": float(row["i_hv_ka"]),
"i_lv_ka": float(row["i_lv_ka"]),
"overloaded": float(row["loading_percent"]) > 100.0
}
def check_n1_contingency(self, line_to_remove: int) -> dict:
"""
Simulazione contingenza N-1: rimuove una linea e verifica la tenuta della rete.
Args:
line_to_remove: ID della linea da togliere fuori servizio
Returns:
Risultati power flow post-contingenza con violazioni
"""
# Salva stato originale
original_in_service = self.net.line.at[line_to_remove, "in_service"]
# Applica contingenza
self.net.line.at[line_to_remove, "in_service"] = False
try:
pp.runpp(self.net, algorithm="nr", max_iteration=50)
violations = []
for idx, row in self.net.res_bus.iterrows():
if not (0.90 <= float(row["vm_pu"]) <= 1.10):
violations.append({
"type": "voltage",
"bus": self.net.bus.at[idx, "name"],
"vm_pu": float(row["vm_pu"])
})
for idx, row in self.net.res_line.iterrows():
if float(row["loading_percent"]) > 120.0:
violations.append({
"type": "overload",
"line": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"])
})
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": True,
"n_violations": len(violations),
"violations": violations,
"secure": len(violations) == 0
}
except pp.LoadflowNotConverged:
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": False,
"secure": False,
"error": "Rete non sicura: power flow non convergente post-contingenza"
}
finally:
# Ripristina stato originale
self.net.line.at[line_to_remove, "in_service"] = original_in_service
return result
Tepelný model transformátoru
Transformátor je nejkritičtějším aktivem rozvodny. Jeho zdravotní stav závisí přímo na teplotě oleje a vinutí. Standard IEC 60076-7 definuje základní analytický tepelný model z většiny digitálních dvojčat průmyslových transformátorů.
import numpy as np
from dataclasses import dataclass
@dataclass
class TransformerThermalModel:
"""
Modello termico trasformatore secondo IEC 60076-7.
Implementa il modello esponenziale per calcolo temperatura olio e avvolgimenti.
"""
# Parametri costruttivi (da nameplate / misure FAT)
rated_power_mva: float = 40.0
theta_amb_ref: float = 20.0 # temperatura ambiente riferimento
delta_theta_or: float = 55.0 # sopratemperatura olio a pieno carico
delta_theta_hr: float = 23.0 # gradiente avvolgimento-olio a pieno carico
tau_o: float = 3.0 * 3600 # costante di tempo termica olio [s]
tau_w: float = 10 * 60 # costante di tempo avvolgimenti [s]
n_exp: float = 0.9 # esponente di carico olio (ONAN)
m_exp: float = 0.8 # esponente di carico avvolgimenti
r_load: float = 6.0 # rapporto perdite carico/vuoto
# Stato iniziale
theta_oil: float = 20.0 # temperatura olio corrente
theta_winding: float = 20.0 # temperatura avvolgimento corrente
def step(
self,
load_fraction: float,
ambient_temp: float,
dt_seconds: float = 60.0
) -> dict:
"""
Aggiornamento modello con passo temporale dt.
Args:
load_fraction: carico normalizzato (0=vuoto, 1=pieno carico)
ambient_temp: temperatura ambiente in gradi C
dt_seconds: passo temporale in secondi
Returns:
Stato termico aggiornato con temperatura olio, avvolgimenti e HST
"""
# Sopratemperatura olio a regime per questo carico
k = load_fraction # fattore di carico
delta_theta_o_steady = self.delta_theta_or * (
(1 + self.r_load * k**2) / (1 + self.r_load)
) ** self.n_exp
# Dinamica olio (prima equazione differenziale)
d_theta_oil = (
(ambient_temp + delta_theta_o_steady - self.theta_oil) / self.tau_o
) * dt_seconds
self.theta_oil = self.theta_oil + d_theta_oil
# Gradiente avvolgimento a regime
delta_theta_h_steady = self.delta_theta_hr * (k ** (2 * self.m_exp))
# Hot Spot Temperature (HST) - temperatura punto caldo avvolgimento
# Dinamica più rapida degli avvolgimenti
d_theta_winding = (
(self.theta_oil + delta_theta_h_steady - self.theta_winding) / self.tau_w
) * dt_seconds
self.theta_winding = self.theta_winding + d_theta_winding
# Aging Acceleration Factor (AAF) per calcolo consumo vita isolante
# Riferimento: theta_ref = 98°C per isolante classe A
theta_ref = 98.0
aaf = np.exp(
15000.0 / (theta_ref + 273.0) - 15000.0 / (self.theta_winding + 273.0)
)
return {
"theta_oil_c": round(self.theta_oil, 2),
"theta_winding_hst_c": round(self.theta_winding, 2),
"theta_ambient_c": ambient_temp,
"load_fraction": load_fraction,
"aaf": round(float(aaf), 4),
"thermal_limit_exceeded": self.theta_winding > 120.0, # limite continuo
"emergency_limit_exceeded": self.theta_winding > 140.0 # limite emergenza
}
def calculate_rul_hours(self, avg_hst: float) -> float:
"""
Stima Remaining Useful Life (RUL) dell'isolante cellulosico.
Basato sul modello di Arrhenius (IEC 60076-7 Section 8).
Args:
avg_hst: temperatura media Hot Spot in gradi C
Returns:
Ore di vita residua stimate
"""
# Vita nominale isolante a 98°C = 65.000 ore (IEC 60076-7)
nominal_life_hours = 65_000.0
theta_ref = 98.0
# Aging rate normalizzato
per_unit_life_remaining = 1.0 # assume vita residua 100%
aaf_avg = np.exp(
15000.0 / (theta_ref + 273.15) - 15000.0 / (avg_hst + 273.15)
)
# Ore residue
rul_hours = nominal_life_hours * per_unit_life_remaining / aaf_avg
return round(rul_hours, 0)
Příjem dat: Od SCADA a IoT po digitální dvojče
Kvalita digitálního dvojčete přímo závisí na kvalitě a latenci dat u vchodu. Ve skutečném energetickém systému musíme řídit heterogenní zdroje: RTU/IED přes IEC 61850/DNP3, IoT senzory přes MQTT, předpověď počasí přes REST API, inteligentní měřič přes DLMS/COSEM.
import asyncio
import aiohttp
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
from datetime import datetime, timezone
from typing import Callable
class EnergyDataIngestion:
"""
Gestione ingestione dati multi-sorgente per digital twin energetico.
Supporta MQTT (IoT sensors), REST API (weather, market), SCADA (simulato).
"""
def __init__(
self,
influx_url: str,
influx_token: str,
influx_org: str,
influx_bucket: str,
mqtt_broker: str,
mqtt_port: int = 1883
):
# InfluxDB client per time-series storage
self._influx = InfluxDBClient(
url=influx_url,
token=influx_token,
org=influx_org
)
self._write_api = self._influx.write_api(write_options=SYNCHRONOUS)
self._bucket = influx_bucket
self._org = influx_org
# MQTT client per sensori IoT
self._mqtt_client = mqtt.Client(client_id="dt-ingestion-01")
self._mqtt_client.on_connect = self._on_mqtt_connect
self._mqtt_client.on_message = self._on_mqtt_message
self._mqtt_broker = mqtt_broker
self._mqtt_port = mqtt_port
# Callback per aggiornare il modello in tempo reale
self._model_update_callbacks: list[Callable] = []
def register_model_callback(self, cb: Callable) -> None:
"""Registra callback chiamata a ogni aggiornamento dati."""
self._model_update_callbacks.append(cb)
def _on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
# Sottoscrizione ai topic energetici
topics = [
"substation/+/transformer/+/measurements",
"substation/+/feeder/+/measurements",
"substation/+/weather/measurements",
"substation/+/pv/measurements"
]
for topic in topics:
client.subscribe(topic, qos=1)
print(f"MQTT connesso al broker - subscribed a {len(topics)} topic")
def _on_mqtt_message(self, client, userdata, msg):
"""Elaborazione messaggio MQTT con parsing e storage."""
try:
payload = json.loads(msg.payload.decode())
topic_parts = msg.topic.split("/")
# Estrai metadati dal topic
substation_id = topic_parts[1]
asset_type = topic_parts[2]
asset_id = topic_parts[3]
# Crea punto InfluxDB
point = (
Point(asset_type)
.tag("substation", substation_id)
.tag("asset_id", asset_id)
.time(
datetime.fromisoformat(
payload.get("timestamp", datetime.now(timezone.utc).isoformat())
)
)
)
# Aggiungi misure come fields
for key, value in payload.get("measurements", {}).items():
if isinstance(value, (int, float)):
point = point.field(key, float(value))
# Scrivi su InfluxDB
self._write_api.write(
bucket=self._bucket,
org=self._org,
record=point
)
# Notifica callbacks (aggiornamento modello)
for cb in self._model_update_callbacks:
cb(asset_type, asset_id, payload)
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Errore parsing messaggio MQTT: {e} - topic: {msg.topic}")
async def fetch_weather_forecast(
self,
lat: float,
lon: float,
session: aiohttp.ClientSession
) -> dict:
"""
Recupera previsioni meteo da Open-Meteo API (gratuita, no API key).
Usata per correzione temperatura ambiente nel modello termico trasformatore.
"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,windspeed_10m,direct_radiation",
"forecast_days": 3,
"timezone": "Europe/Rome"
}
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.json()
return {
"hourly_temp_c": data["hourly"]["temperature_2m"],
"hourly_wind_ms": data["hourly"]["windspeed_10m"],
"hourly_radiation_wm2": data["hourly"]["direct_radiation"],
"times": data["hourly"]["time"]
}
else:
raise ValueError(f"Weather API error: HTTP {resp.status}")
def start_mqtt(self) -> None:
"""Avvia connessione MQTT in thread separato."""
self._mqtt_client.connect(self._mqtt_broker, self._mqtt_port, keepalive=60)
self._mqtt_client.loop_start()
def stop(self) -> None:
"""Cleanup risorse."""
self._mqtt_client.loop_stop()
self._mqtt_client.disconnect()
self._influx.close()
Synchronizace v reálném čase a digitální dvojitá smyčka
„Digitální dvojitá smyčka“ je nepřetržitá smyčka, která udržuje digitální model synchronizovaný s tělem. Frekvence synchronizace závisí na dynamice procesu: pro měření stavu SCADA obvykle stačí 30-60 sekund, pro ochrany IEC 61850 klesá na milisekundy.
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger(__name__)
class DigitalTwinOrchestrator:
"""
Orchestratore principale del digital twin energetico.
Gestisce il loop di sincronizzazione e coordina i diversi modelli.
"""
def __init__(
self,
twin: EnergyDigitalTwin,
ingestion: EnergyDataIngestion,
thermal_model: TransformerThermalModel,
sync_interval_s: float = 30.0
):
self.twin = twin
self.ingestion = ingestion
self.thermal_model = thermal_model
self.sync_interval_s = sync_interval_s
self._running = False
self._latest_scada: dict = {}
self._latest_weather: dict = {}
# Registra callback per aggiornamenti IoT
self.ingestion.register_model_callback(self._handle_sensor_update)
def _handle_sensor_update(
self,
asset_type: str,
asset_id: str,
payload: dict
) -> None:
"""Callback IoT: aggiorna cache dati SCADA."""
measurements = payload.get("measurements", {})
if asset_type == "transformer":
self._latest_scada["transformer"] = measurements
elif asset_type == "feeder":
feeder_data = self._latest_scada.get("feeders", {})
feeder_data[asset_id] = measurements
self._latest_scada["feeders"] = feeder_data
async def run(self) -> None:
"""Loop principale del digital twin - esegue in continuo."""
self._running = True
self.ingestion.start_mqtt()
logger.info(f"Digital Twin '{self.twin.config.name}' avviato")
async with aiohttp.ClientSession() as session:
while self._running:
cycle_start = asyncio.get_event_loop().time()
try:
await self._sync_cycle(session)
except Exception as e:
logger.error(f"Errore nel ciclo di sync: {e}", exc_info=True)
# Mantieni frequenza di sync
elapsed = asyncio.get_event_loop().time() - cycle_start
sleep_time = max(0, self.sync_interval_s - elapsed)
await asyncio.sleep(sleep_time)
async def _sync_cycle(self, session: aiohttp.ClientSession) -> None:
"""Singolo ciclo di sincronizzazione."""
# 1. Aggiorna weather ogni 30 minuti
now = datetime.now(timezone.utc)
if now.minute % 30 == 0:
try:
self._latest_weather = await self.ingestion.fetch_weather_forecast(
lat=40.85, # Bari, esempio
lon=16.55,
session=session
)
except Exception as e:
logger.warning(f"Weather fetch fallito: {e}")
# 2. Assembla dati SCADA per aggiornamento modello
scada_snapshot = self._build_scada_snapshot()
# 3. Aggiorna rete elettrica nel digital twin
self.twin.update_from_scada(scada_snapshot)
# 4. Esegui power flow
pf_result = self.twin.run_power_flow()
# 5. Aggiorna modello termico trasformatore
ambient_temp = self._get_ambient_temperature()
trafo_loading = pf_result.get("transformer", {}).get("loading_percent", 0) / 100.0
thermal_state = self.thermal_model.step(
load_fraction=trafo_loading,
ambient_temp=ambient_temp,
dt_seconds=self.sync_interval_s
)
# 6. Verifica allarmi e genera alert
self._check_alerts(pf_result, thermal_state)
# 7. Pubblica stato su InfluxDB (per Grafana dashboard)
self._publish_twin_state(pf_result, thermal_state)
logger.debug(
f"Sync OK | Conv={pf_result['converged']} | "
f"HST={thermal_state['theta_winding_hst_c']}°C | "
f"Trafo={pf_result.get('transformer', {}).get('loading_percent', 0):.1f}%"
)
def _build_scada_snapshot(self) -> dict:
"""Costruisce snapshot SCADA normalizzato per il modello."""
feeders = self._latest_scada.get("feeders", {})
n_feeders = self.twin.config.n_feeders
# Estrai potenze per feeder (con fallback a ultimo valore noto)
load_per_feeder = []
for i in range(n_feeders):
feeder_key = f"feeder_{i+1:02d}"
feeder_data = feeders.get(feeder_key, {})
p_mw = feeder_data.get("active_power_mw",
self.twin.config.peak_load_mw / n_feeders * 0.7)
load_per_feeder.append(float(p_mw))
trafo_data = self._latest_scada.get("transformer", {})
return {
"load_mw_per_feeder": load_per_feeder,
"pv_mw": trafo_data.get("pv_injection_mw", 0.0),
"tap_position": int(trafo_data.get("tap_position", 0)),
"ambient_temp_c": self._get_ambient_temperature()
}
def _get_ambient_temperature(self) -> float:
"""Restituisce temperatura ambiente corrente (da weather API o default)."""
weather = self._latest_weather
if weather and "hourly_temp_c" in weather:
# Prendi la temperatura dell'ora corrente
hour_idx = datetime.now().hour
temps = weather["hourly_temp_c"]
if 0 <= hour_idx < len(temps):
return float(temps[hour_idx])
return 20.0 # default
def _check_alerts(self, pf_result: dict, thermal_state: dict) -> None:
"""Genera alert per condizioni anomale."""
alerts = []
if not pf_result.get("converged", True):
alerts.append({"severity": "CRITICAL", "msg": "Power flow non convergente"})
if thermal_state.get("thermal_limit_exceeded"):
hst = thermal_state["theta_winding_hst_c"]
alerts.append({
"severity": "WARNING",
"msg": f"HST trasformatore {hst}°C supera limite continuo 120°C"
})
trafo = pf_result.get("transformer", {})
if trafo.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Trasformatore sovraccarico: {trafo['loading_percent']:.1f}%"
})
for line in pf_result.get("lines", []):
if line.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Linea {line['name']} sovraccarica: {line['loading_percent']:.1f}%"
})
for alert in alerts:
logger.warning(f"[ALERT {alert['severity']}] {alert['msg']}")
def _publish_twin_state(self, pf_result: dict, thermal_state: dict) -> None:
"""Pubblica stato corrente su InfluxDB per dashboard Grafana."""
from influxdb_client import Point
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
points = []
# Punto power flow
trafo = pf_result.get("transformer", {})
pf_point = (
Point("digital_twin_powerflow")
.tag("substation", self.twin.config.name)
.field("converged", int(pf_result.get("converged", 0)))
.field("trafo_loading_pct", trafo.get("loading_percent", 0.0))
.field("trafo_losses_mw", trafo.get("pl_mw", 0.0))
.field("total_losses_mw", pf_result.get("losses_mw", 0.0))
.time(now)
)
points.append(pf_point)
# Punto thermal
th_point = (
Point("digital_twin_thermal")
.tag("substation", self.twin.config.name)
.field("theta_oil_c", thermal_state["theta_oil_c"])
.field("theta_hst_c", thermal_state["theta_winding_hst_c"])
.field("aaf", thermal_state["aaf"])
.field("ambient_c", thermal_state["theta_ambient_c"])
.time(now)
)
points.append(th_point)
self.ingestion._write_api.write(
bucket=self.ingestion._bucket,
org=self.ingestion._org,
record=points
)
def stop(self) -> None:
self._running = False
self.ingestion.stop()
Prediktivní údržba s ML: RUL na transformátorech
Prediktivní údržba je jedním z nejvyspělejších případů použití digitálních dvojčat energie. Transformátor VN/VN stojí mezi 500 000 a 5 000 000 eur: a neočekávané selhání v důsledku výpadku proudu, mimořádných nákladů a regulačních sankcí. S dobře trénovaným modelem ML můžete předvídat selhání do 3-6 týdnů předem, snížení o 35 % neočekávaných poruch podle údajů z roku 2025.
Feature Engineering pro transformátory
Nejvíce prediktivní vlastnosti pro zdraví transformátoru kombinují opatření elektrická, tepelná a chemická analýza oleje (DGA - Analýza rozpuštěných plynů):
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, mean_absolute_error
import xgboost as xgb
import joblib
from pathlib import Path
class TransformerHealthPredictor:
"""
Modello ML per predizione stato di salute e RUL trasformatore.
Usa Random Forest per classificazione anomalia + XGBoost per regressione RUL.
"""
# Feature set secondo IEC 60599 (interpretazione DGA) + misure elettriche/termiche
FEATURE_COLUMNS = [
# DGA - Dissolved Gas Analysis (ppm)
"h2_ppm", # idrogeno - indice scariche parziali
"ch4_ppm", # metano - surriscaldamento olio
"c2h2_ppm", # acetilene - archi elettrici (critico)
"c2h4_ppm", # etilene - surriscaldamento grave
"c2h6_ppm", # etano - surriscaldamento lieve
"co_ppm", # monossido di carbonio - degradazione cellulosa
"co2_ppm", # biossido di carbonio - degradazione cellulosa
# Misure elettriche
"load_factor", # fattore di carico medio (0-1)
"load_factor_peak", # picco fattore di carico
"tap_operations_30d", # operazioni tap changer negli ultimi 30 giorni
"voltage_thd_pct", # distorsione armonica tensione %
# Misure termiche
"avg_hst_30d", # temperatura media hot spot 30 giorni
"max_hst_30d", # picco temperatura hot spot 30 giorni
"ambient_temp_avg", # temperatura ambiente media
# Misure olio
"moisture_ppm", # umidita olio (degrado isolante)
"acidity_mg_koh_g", # acidita olio
"breakdown_voltage_kv", # rigidita dielettrica olio
# Feature ingegnerizzate
"total_dissolved_gas_ppm", # somma gas combustibili
"methane_ratio", # CH4/(CH4+C2H4) ratio Duval
"age_years", # eta trasformatore
"cumulative_aaf", # AAF cumulato (consumo vita isolante)
]
def __init__(self, model_dir: str = "models/transformer_health"):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(parents=True, exist_ok=True)
self.scaler = StandardScaler()
# Classificatore anomalia: normal / warning / critical
self.anomaly_classifier = RandomForestClassifier(
n_estimators=200,
max_depth=12,
min_samples_leaf=5,
class_weight="balanced", # gestisce sbilanciamento classi
n_jobs=-1,
random_state=42
)
# Regressore RUL: ore residue di vita isolante
self.rul_regressor = xgb.XGBRegressor(
n_estimators=300,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror",
eval_metric="mae",
random_state=42
)
self._is_fitted = False
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calcola feature derivate da misure grezze."""
result = df.copy()
# Gas combustibili totali (TDCG secondo IEEE C57.104)
gas_cols = ["h2_ppm", "ch4_ppm", "c2h2_ppm", "c2h4_ppm", "c2h6_ppm"]
available_gas = [c for c in gas_cols if c in result.columns]
result["total_dissolved_gas_ppm"] = result[available_gas].sum(axis=1)
# Ratio di Duval per diagnosi tipo di guasto
if "ch4_ppm" in result.columns and "c2h4_ppm" in result.columns:
denom = result["ch4_ppm"] + result["c2h4_ppm"]
result["methane_ratio"] = np.where(
denom > 0,
result["ch4_ppm"] / denom,
0.5
)
# Carico cumulato normalizzato (proxy degradazione)
if "avg_hst_30d" in result.columns and "age_years" in result.columns:
theta_ref = 98.0
result["cumulative_aaf"] = np.exp(
15000.0 / (theta_ref + 273.15) -
15000.0 / (result["avg_hst_30d"] + 273.15)
) * result["age_years"]
# Colonne mancanti riempite con mediana
for col in self.FEATURE_COLUMNS:
if col not in result.columns:
result[col] = result.get(col, 0.0)
return result[self.FEATURE_COLUMNS].fillna(0.0)
def fit(
self,
df_train: pd.DataFrame,
y_anomaly: pd.Series,
y_rul_hours: pd.Series
) -> dict:
"""
Addestramento entrambi i modelli.
Args:
df_train: DataFrame con misure storiche
y_anomaly: etichette {0=normal, 1=warning, 2=critical}
y_rul_hours: ore di vita residua reali (da storico guasti)
Returns:
Metriche di training
"""
X = self.engineer_features(df_train)
X_scaled = self.scaler.fit_transform(X)
# Split train/validation
X_tr, X_val, y_a_tr, y_a_val, y_r_tr, y_r_val = train_test_split(
X_scaled, y_anomaly, y_rul_hours,
test_size=0.2, random_state=42, stratify=y_anomaly
)
# Training classificatore
self.anomaly_classifier.fit(X_tr, y_a_tr)
y_pred_class = self.anomaly_classifier.predict(X_val)
cv_scores = cross_val_score(
self.anomaly_classifier, X_scaled, y_anomaly,
cv=5, scoring="f1_weighted"
)
# Training regressore RUL
self.rul_regressor.fit(
X_tr, y_r_tr,
eval_set=[(X_val, y_r_val)],
verbose=False
)
y_pred_rul = self.rul_regressor.predict(X_val)
mae_rul = mean_absolute_error(y_r_val, y_pred_rul)
self._is_fitted = True
# Salva modelli
joblib.dump(self.scaler, self.model_dir / "scaler.pkl")
joblib.dump(self.anomaly_classifier, self.model_dir / "anomaly_rf.pkl")
self.rul_regressor.save_model(str(self.model_dir / "rul_xgb.json"))
return {
"anomaly_f1_cv": float(cv_scores.mean()),
"anomaly_f1_std": float(cv_scores.std()),
"rul_mae_hours": float(mae_rul),
"n_train_samples": len(X_tr),
"feature_importances": dict(zip(
self.FEATURE_COLUMNS,
self.anomaly_classifier.feature_importances_.tolist()
))
}
def predict(self, measurements: dict) -> dict:
"""
Predizione stato salute per un trasformatore.
Args:
measurements: dizionario con misure correnti
Returns:
Stato salute, probabilità e RUL stimato
"""
if not self._is_fitted:
raise RuntimeError("Modello non addestrato. Chiama fit() prima.")
df = pd.DataFrame([measurements])
X = self.engineer_features(df)
X_scaled = self.scaler.transform(X)
anomaly_label = int(self.anomaly_classifier.predict(X_scaled)[0])
anomaly_proba = self.anomaly_classifier.predict_proba(X_scaled)[0].tolist()
rul_hours = max(0.0, float(self.rul_regressor.predict(X_scaled)[0]))
status_map = {0: "NORMAL", 1: "WARNING", 2: "CRITICAL"}
return {
"health_status": status_map[anomaly_label],
"probability_normal": round(anomaly_proba[0], 3),
"probability_warning": round(anomaly_proba[1], 3),
"probability_critical": round(anomaly_proba[2], 3),
"rul_hours": round(rul_hours, 0),
"rul_days": round(rul_hours / 24, 1),
"maintenance_urgency": "IMMEDIATE" if anomaly_label == 2 else
"PLANNED" if anomaly_label == 1 else "ROUTINE",
"top_risk_features": self._get_top_risk_features(X)
}
def _get_top_risk_features(self, X: pd.DataFrame) -> list:
"""Restituisce top-3 feature che contribuiscono al rischio."""
importances = self.anomaly_classifier.feature_importances_
feature_values = X.iloc[0].values
risk_scores = importances * np.abs(feature_values)
top_indices = np.argsort(risk_scores)[::-1][:3]
return [
{
"feature": self.FEATURE_COLUMNS[i],
"importance": round(float(importances[i]), 4),
"value": round(float(feature_values[i]), 3)
}
for i in top_indices
]
Optimalizace sítě s PyPSA: scénáře co kdyby
PyPSA (Python pro analýzu energetického systému) a referenční nástroj pro optimalizaci energetických sítí v národním měřítku. Umožňuje simulovat komplexní scénáře typu „co kdyby“: přidání obnovitelné výroby, rozšíření sítě, integrace skladů, ekonomická optimalizace expedice.
import pypsa
import pandas as pd
import numpy as np
class GridExpansionTwin:
"""
Digital twin per pianificazione espansione rete con PyPSA.
Usa ottimizzazione lineare per trovare mix ottimale di generazione + storage.
"""
def __init__(self, network_name: str = "Rete_Puglia_Sud"):
self.network = pypsa.Network()
self.network.name = network_name
# Risoluzione temporale: 8760 ore (un anno)
self.network.set_snapshots(
pd.date_range("2025-01-01", periods=8760, freq="h")
)
def build_regional_network(
self,
buses_df: pd.DataFrame,
lines_df: pd.DataFrame,
generators_df: pd.DataFrame,
loads_df: pd.DataFrame
) -> None:
"""
Costruisce modello di rete da dati CIM/CSV.
Args:
buses_df: nodi con voltaggio nominale
lines_df: linee con impedenza e capacità
generators_df: generatori con curve di costo
loads_df: profili di carico orari annui
"""
# Aggiungi bus
for _, bus in buses_df.iterrows():
self.network.add(
"Bus",
name=bus["name"],
v_nom=bus["v_nom_kv"],
x=bus.get("longitude", 0.0),
y=bus.get("latitude", 0.0)
)
# Aggiungi linee
for _, line in lines_df.iterrows():
self.network.add(
"Line",
name=line["name"],
bus0=line["bus0"],
bus1=line["bus1"],
r=line["r_ohm"],
x=line["x_ohm"],
s_nom=line["s_nom_mva"],
s_nom_extendable=bool(line.get("extendable", False)),
s_nom_max=line.get("s_nom_max_mva", float("inf"))
)
# Aggiungi generatori con costi marginali
for _, gen in generators_df.iterrows():
self.network.add(
"Generator",
name=gen["name"],
bus=gen["bus"],
p_nom=gen["p_nom_mw"],
p_nom_extendable=bool(gen.get("extendable", False)),
p_nom_max=gen.get("p_nom_max_mw", float("inf")),
marginal_cost=gen.get("marginal_cost_eur_mwh", 0.0),
capital_cost=gen.get("capital_cost_eur_mw", 0.0),
carrier=gen.get("carrier", "gas"),
efficiency=gen.get("efficiency", 0.45),
committable=False
)
# Aggiungi carichi con profili orari
for _, load in loads_df.iterrows():
p_set = load.get("hourly_profile") # serie temporale 8760 valori
self.network.add(
"Load",
name=load["name"],
bus=load["bus"],
p_set=p_set if p_set is not None else load["p_mw"]
)
def add_storage(
self,
bus: str,
p_nom_mw: float,
energy_capacity_mwh: float,
capital_cost_eur_mw: float = 800_000
) -> None:
"""Aggiunge sistema di accumulo (BESS) alla rete."""
self.network.add(
"StorageUnit",
name=f"BESS_{bus}",
bus=bus,
p_nom=p_nom_mw,
p_nom_extendable=True,
p_nom_max=p_nom_mw * 3,
max_hours=energy_capacity_mwh / p_nom_mw,
capital_cost=capital_cost_eur_mw,
marginal_cost=0.0,
efficiency_store=0.92,
efficiency_dispatch=0.92,
cyclic_state_of_charge=True
)
def run_capacity_expansion(self) -> dict:
"""
Ottimizzazione espansione rete: trova investimenti ottimali
minimizzando costo totale (CAPEX + OPEX) su orizzonte annuo.
"""
self.network.optimize(
solver_name="highs", # HiGHS solver open source
solver_options={
"time_limit": 300, # max 5 minuti
"mip_gap": 0.01 # gap ottimalita 1%
}
)
# Estrai risultati
total_cost = float(self.network.objective)
# Generazione ottimale per vettore energetico
gen_by_carrier = (
self.network.generators_t.p
.multiply(self.network.snapshot_weightings.generators, axis=0)
.groupby(self.network.generators.carrier, axis=1)
.sum()
.sum()
/ 1e6 # MWh -> TWh
)
# Utilizzo linee (congestion analysis)
line_loading = (
self.network.lines_t.p0.abs() /
self.network.lines.s_nom
).max()
# capacità ottimale per nuovi impianti
new_capacity = self.network.generators[
self.network.generators.p_nom_extendable
]["p_nom_opt"]
return {
"total_system_cost_meur": round(total_cost / 1e6, 2),
"generation_twh": gen_by_carrier.to_dict(),
"congested_lines": line_loading[line_loading > 0.9].to_dict(),
"optimal_new_capacity_mw": new_capacity.to_dict(),
"co2_emissions_kt": self._calculate_emissions()
}
def simulate_n1_contingencies(self) -> pd.DataFrame:
"""
Simula tutte le contingenze N-1 e restituisce report violazioni.
Utile per security assessment della rete pianificata.
"""
results = []
lines = self.network.lines.index.tolist()
for line_name in lines:
# Clona rete e rimuovi linea
n_contingency = self.network.copy()
n_contingency.remove("Line", line_name)
try:
n_contingency.lpf() # Linearized Power Flow (più veloce)
max_loading = (
n_contingency.lines_t.p0.abs() /
n_contingency.lines.s_nom
).max().max()
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float(max_loading),
"secure": bool(max_loading <= 1.0),
"severity": "OK" if max_loading <= 1.0 else
"WARNING" if max_loading <= 1.2 else "CRITICAL"
})
except Exception as e:
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float("inf"),
"secure": False,
"severity": "CRITICAL",
"error": str(e)
})
return pd.DataFrame(results).sort_values("max_line_loading", ascending=False)
def _calculate_emissions(self) -> float:
"""Calcola emissioni CO2 totali in kt."""
emission_factors = {
"gas": 0.202, # tCO2/MWh gas CCGT
"coal": 0.820, # tCO2/MWh carbone
"nuclear": 0.012, # tCO2/MWh nucleare (lifecycle)
"wind": 0.007, # tCO2/MWh eolico
"solar": 0.040, # tCO2/MWh fotovoltaico
"hydro": 0.004 # tCO2/MWh idroelettrico
}
total_kt = 0.0
for carrier, factor in emission_factors.items():
gen_col = [
c for c in self.network.generators_t.p.columns
if self.network.generators.at[c, "carrier"] == carrier
]
if gen_col:
gen_mwh = self.network.generators_t.p[gen_col].sum().sum()
total_kt += gen_mwh * factor / 1000.0
return round(total_kt, 2)
Společný informační model (CIM): IEC 61970/61968
Il CIM (Common Information Model) a mezinárodní standard pro sémantická reprezentace aktiv elektrické sítě. Vyvinutý IEC e spravuje ENTSO-E prostřednictvím CGMES (Common Grid Model Exchange specifikace), CIM definuje sdílenou ontologii, která umožňuje systémy od různých dodavatelů pro výměnu síťových modelů bez ztráty informací.
| Norma | Rozsah | Klíčový balíček | Používám Digital Twin |
|---|---|---|---|
| IEC 61970-301 | Model hlavní sítě | TopologyPackage, WiresPackage | Topologie sítě, impedance, transformátory |
| IEC 61970-456 | Stavové proměnné | StateVariablesPackage | Stav v reálném čase: napětí, průtoky, žádané hodnoty |
| IEC 61968-4 | Správa aktiv | AssetPackage, WorkPackage | Údaje o majetku, pracovní příkazy, údržba |
| CGMES 3.0 | Formát výměny | EQ, TP, SV, SSH, DY | Modelová výměna mezi panevropskými TSO/DSO |
from dataclasses import dataclass, field
import uuid
from enum import Enum
from typing import Optional
class WindingConnection(Enum):
"""IEC CIM - Tipo connessione avvolgimento trasformatore."""
D = "D" # Delta
Y = "Y" # Wye (stella)
Z = "Z" # Zigzag
Yn = "Yn" # Wye con neutro
@dataclass
class CIMIdentifiedObject:
"""Classe base CIM per oggetti identificati."""
mrid: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
alias_name: str = ""
@dataclass
class CIMSubstation(CIMIdentifiedObject):
"""CIM Substation - rappresenta una sottostazione elettrica."""
region_name: str = ""
voltage_levels: list = field(default_factory=list)
equipment: list = field(default_factory=list)
@dataclass
class CIMPowerTransformer(CIMIdentifiedObject):
"""CIM PowerTransformer - trasformatore di potenza."""
vector_group: str = "Dyn11"
is_phase_shifting: bool = False
winding_ends: list = field(default_factory=list)
@dataclass
class CIMPowerTransformerEnd:
"""CIM PowerTransformerEnd - avvolgimento trasformatore."""
end_number: int = 1
rated_s_mva: float = 0.0
rated_u_kv: float = 0.0
r_ohm: float = 0.0
x_ohm: float = 0.0
connection_kind: WindingConnection = WindingConnection.Y
grounded: bool = False
class CIMModelBuilder:
"""
Costruisce modello CIM da dati strutturati per export CGMES.
Permette interoperabilità con EMS/SCADA di vendor diversi.
"""
def __init__(self):
self._substations: dict[str, CIMSubstation] = {}
self._transformers: dict[str, CIMPowerTransformer] = {}
def add_substation(self, name: str, region: str = "") -> CIMSubstation:
"""Crea e registra una sottostazione nel modello CIM."""
sub = CIMSubstation(name=name, region_name=region)
self._substations[sub.mrid] = sub
return sub
def add_transformer(
self,
name: str,
substation: CIMSubstation,
hv_kv: float,
lv_kv: float,
rated_mva: float,
uk_percent: float = 8.5,
vector_group: str = "Dyn11"
) -> CIMPowerTransformer:
"""Crea trasformatore con entrambi gli avvolgimenti."""
trafo = CIMPowerTransformer(name=name, vector_group=vector_group)
# Calcolo impedenze in ohm (riferite al lato HV)
z_base = (hv_kv ** 2) / rated_mva # ohm base lato HV
z_total = (uk_percent / 100.0) * z_base
r_hv = z_total * 0.1 # approssimazione: R/Z ~ 0.1 per trafo AT/MT
x_hv = z_total * 0.995
# Avvolgimento primario (HV)
end_hv = CIMPowerTransformerEnd(
end_number=1,
rated_s_mva=rated_mva,
rated_u_kv=hv_kv,
r_ohm=r_hv,
x_ohm=x_hv,
connection_kind=WindingConnection.D,
grounded=False
)
# Avvolgimento secondario (LV)
end_lv = CIMPowerTransformerEnd(
end_number=2,
rated_s_mva=rated_mva,
rated_u_kv=lv_kv,
r_ohm=0.0,
x_ohm=0.0,
connection_kind=WindingConnection.Yn,
grounded=True
)
trafo.winding_ends = [end_hv, end_lv]
self._transformers[trafo.mrid] = trafo
substation.equipment.append(trafo)
return trafo
def to_cgmes_dict(self) -> dict:
"""
Esporta modello in formato CGMES-compatible (JSON-LD semplificato).
In produzione si usa uno serializer RDF/XML conforme CGMES 3.0.
"""
return {
"@context": "https://iec.ch/TC57/CIM100",
"Substation": [
{
"@id": f"_{sub.mrid}",
"@type": "cim:Substation",
"cim:IdentifiedObject.name": sub.name,
"cim:Substation.Region": sub.region_name
}
for sub in self._substations.values()
],
"PowerTransformer": [
{
"@id": f"_{trafo.mrid}",
"@type": "cim:PowerTransformer",
"cim:IdentifiedObject.name": trafo.name,
"cim:PowerTransformer.vectorGroup": trafo.vector_group,
"cim:PowerTransformer.PowerTransformerEnd": [
{
"cim:PowerTransformerEnd.endNumber": end.end_number,
"cim:PowerTransformerEnd.ratedS":
{"value": end.rated_s_mva, "unit": "MVA"},
"cim:PowerTransformerEnd.ratedU":
{"value": end.rated_u_kv, "unit": "kV"}
}
for end in trafo.winding_ends
]
}
for trafo in self._transformers.values()
]
}
Cloudové platformy: Azure Digital Twins vs AWS IoT TwinMaker
Výběr správné cloudové platformy je kritickým krokem při škálování digitálních technologií dvojče od pilota po výrobu. Dvě přední platformy mají různé přístupy:
| Kritérium | Azure Digital Twins | AWS IoT TwinMaker |
|---|---|---|
| Datový model | DTDL (Digital Twin Definition Language), přizpůsobitelné ontologie | Model komponenty scény, založený na pracovním prostoru |
| SCADA/OT integrace | Vynikající prostřednictvím Azure IoT Hub + Industrial IoT OPC-UA | Hodně štěstí AWS IoT Core + Greengrass edge |
| 3D vizualizace | HoloLens 2, Power BI, partnerský ekosystém | TwinMaker Scene Composer (Babylon.js), nativní plugin Grafana |
| Analytics | Azure Data Explorer, Synapse, Power BI | Amazon Timestream, Grafana, Athena |
| Integrace ML | Azure ML, kognitivní služby | Amazon SageMaker |
| Cena (základní) | 0,10 $/1000 dotazů + 0,50 $/GB úložiště/měsíc | 0,05 $/1000 přečtení nemovitosti + 0,50 $/GB/měsíc |
| standardy CIM/CGMES | Azure Data Manager for Energy (OSDU) | Částečná podpora prostřednictvím vlastních konektorů |
| Energetický sektor | Nejlepší: Azure Energy Data Manager, OSDU | Silné pro generické OT, méně energeticky specifické |
| Ideální případ použití | Utility, TSO/DSO, velké SCADA systémy | Větrné elektrárny, fotovoltaika, chytré budovy |
Doporučení pro energetický sektor
Pro energetické společnosti a provozovatele sítí (TSO/DSO) se systémy SCADA stávající požadavky a požadavky na shodu s NERC/ENTSO-E, Azure Digital Twins s Azure Data Manager for Energy (založené na OSDU) nabízí nejlepší integraci s průmyslovými standardy (CIM/CGMES). Pro provozovatele obnovitelných zdrojů (vítr, slunce) s potřebami 3D vizualizace a integrací Grafany, AWS IoT TwinMaker má plynulejší vývojářskou zkušenost.
Edge Digital Twin: Lehké modely na Gateway
Ne všechny výpočty mohou čekat na zpáteční mrak. Pro scénáře, kde latence a kritické (ochrany, odezva na poptávku, nabíjení EV v reálném čase), ano implementuje a digitální dvojče edge na průmyslové bráně (Raspberry Pi 4, Siemens IPC, Beckhoff CX) běžící v oříznuté verzi modelu s latencí pod 100 ms.
import numpy as np
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class EdgeTwinConfig:
"""Configurazione edge digital twin - ottimizzato per risorse limitate."""
n_buses: int = 6 # max bus simulabili in tempo reale
max_iterations: int = 10 # Newton-Raphson: meno iterazioni
tolerance_pu: float = 1e-4 # convergenza più lassa
update_interval_ms: float = 100.0 # frequenza aggiornamento 10 Hz
class LightweightPowerFlowSolver:
"""
Solver power flow semplificato per edge computing.
Usa metodo DC (lineare) per velocità massima.
L'approssimazione DC e accurata per reti con alto X/R ratio.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self._B_matrix: Optional[np.ndarray] = None # matrice ammettenza
self._initialized = False
def build_susceptance_matrix(
self,
from_buses: list[int],
to_buses: list[int],
x_ohm: list[float],
base_mva: float = 100.0
) -> None:
"""
Costruisce matrice B (suscettanza) per DC power flow.
Complessità O(n_lines), molto più veloce del metodo NR.
"""
n = self.config.n_buses
B = np.zeros((n, n))
for fb, tb, x in zip(from_buses, to_buses, x_ohm):
if x > 1e-8:
b = base_mva / x # suscettanza in pu
B[fb][fb] += b
B[tb][tb] += b
B[fb][tb] -= b
B[tb][fb] -= b
# Rimuovi riga/colonna bus slack (bus 0)
self._B_matrix = B[1:, 1:]
self._initialized = True
def solve_dc(self, p_injections_mw: list[float], base_mva: float = 100.0) -> dict:
"""
Risolve DC power flow: O(n^2) vs O(n^3) di NR.
Adatto per hardware edge con <512MB RAM.
Args:
p_injections_mw: lista iniezioni di potenza per bus (pos=generazione, neg=carico)
base_mva: potenza base per normalizzazione
Returns:
Angoli di fase e flussi di potenza stimati
"""
if not self._initialized:
raise RuntimeError("Matrice B non inizializzata")
t_start = time.monotonic_ns()
# Converti in per-unit (escludi bus slack)
p_pu = np.array(p_injections_mw[1:]) / base_mva
# Risolvi sistema lineare: B * theta = P
try:
theta = np.linalg.solve(self._B_matrix, p_pu)
except np.linalg.LinAlgError:
return {"converged": False, "error": "Matrice singolare - rete isola"}
theta_full = np.concatenate([[0.0], theta]) # aggiungi slack bus
elapsed_us = (time.monotonic_ns() - t_start) // 1000
return {
"converged": True,
"theta_rad": theta_full.tolist(),
"theta_deg": np.degrees(theta_full).tolist(),
"solve_time_us": int(elapsed_us),
"within_latency_budget": elapsed_us < (self.config.update_interval_ms * 1000 * 0.1)
}
class EdgeDigitalTwinRuntime:
"""
Runtime edge per digital twin leggero su gateway industriale.
Gira in un loop a 10 Hz con aggiornamento modello da sensori locali.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self.solver = LightweightPowerFlowSolver(config)
self._thermal_model = TransformerThermalModel()
self._last_sync_ts: float = 0.0
self._cloud_buffer: list[dict] = []
def process_sensor_sample(self, sensors: dict) -> dict:
"""
Elabora un campione sensoriale e aggiorna il modello edge.
Latenza target: < 10ms totale.
Args:
sensors: misure correnti (tensioni, correnti, temperature)
Returns:
Stato calcolato + alert immediati
"""
t_start = time.monotonic_ns()
# Aggiorna modello termico (molto veloce, solo algebra)
load_fraction = sensors.get("trafo_loading_pu", 0.5)
ambient_temp = sensors.get("ambient_temp_c", 20.0)
thermal_state = self._thermal_model.step(
load_fraction=load_fraction,
ambient_temp=ambient_temp,
dt_seconds=self.config.update_interval_ms / 1000.0
)
# Stima veloce stato rete (DC power flow)
p_injections = sensors.get("p_injections_mw", [0.0] * self.config.n_buses)
pf_result = self.solver.solve_dc(p_injections)
# Alert immediati (priorità altissima - <1ms)
immediate_alerts = self._evaluate_fast_alerts(thermal_state, sensors)
elapsed_ms = (time.monotonic_ns() - t_start) / 1e6
result = {
"thermal": thermal_state,
"power_flow": pf_result,
"alerts": immediate_alerts,
"edge_processing_ms": round(elapsed_ms, 3),
"within_budget": elapsed_ms < 10.0
}
# Accumula per sync cloud asincrono
self._cloud_buffer.append(result)
return result
def _evaluate_fast_alerts(
self,
thermal_state: dict,
sensors: dict
) -> list[dict]:
"""Alert critica valutata in <1ms per risposta immediata."""
alerts = []
hst = thermal_state.get("theta_winding_hst_c", 0.0)
if hst > 140.0:
alerts.append({
"code": "TRAFO_EMERGENCY_TEMP",
"severity": "CRITICAL",
"value": hst,
"action": "REDUCE_LOAD_IMMEDIATELY"
})
elif hst > 120.0:
alerts.append({
"code": "TRAFO_HIGH_TEMP",
"severity": "WARNING",
"value": hst,
"action": "MONITOR_CLOSELY"
})
# Verifica sovratensione (from raw sensor)
v_pu = sensors.get("voltage_pu", 1.0)
if v_pu > 1.10 or v_pu < 0.90:
alerts.append({
"code": "VOLTAGE_OUT_OF_RANGE",
"severity": "WARNING",
"value": v_pu,
"action": "ADJUST_TAP_CHANGER"
})
return alerts
def flush_to_cloud(self) -> list[dict]:
"""Svuota buffer locale per invio al cloud (chiamata asincrona)."""
buffer = self._cloud_buffer.copy()
self._cloud_buffer.clear()
return buffer
Případová studie: Digitální dvojče pobřežní větrná farma
Pobřežní větrná farma se 40 8 MW turbínami (celkem 320 MW) generuje a obrovské množství dat: každá turbína má přes 200 senzorů (akcelerometry, tenzometr, anemometry, snímače sklonu/vychýlení, teplotní ložiska, proudy generátor). Digitální dvojče integruje fyzickou, ML a real-time optimalizaci maximalizovat vyrobenou energii a zároveň minimalizovat náklady na údržbu.
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class WindTurbineState:
"""Stato corrente di una turbina eolica."""
turbine_id: str
timestamp: str
wind_speed_ms: float
wind_direction_deg: float
power_kw: float
rotor_rpm: float
pitch_angle_deg: float
yaw_error_deg: float
nacelle_temp_c: float
gearbox_temp_c: float
generator_temp_c: float
main_bearing_vibration_g: float
tower_acceleration_x_g: float
tower_acceleration_y_g: float
operational_status: str # PRODUCING, CURTAILED, FAULT, MAINTENANCE
class OffshoreWindFarmTwin:
"""
Digital twin parco eolico offshore.
Gestisce 40 turbine con simulazione wake effect e predictive maintenance.
"""
# Curva di potenza Vestas V236-15.0 MW (approssimazione)
POWER_CURVE_MS = [0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 25]
POWER_CURVE_KW = [0, 0, 90, 310, 680, 1250, 2100, 3200, 4600, 6200, 7800, 8000, 8000]
def __init__(self, farm_name: str, n_turbines: int = 40):
self.farm_name = farm_name
self.n_turbines = n_turbines
self.turbine_states: dict[str, WindTurbineState] = {}
self._power_interpolator = self._build_power_curve_interp()
def _build_power_curve_interp(self):
"""Costruisce interpolatore curva di potenza."""
from scipy.interpolate import interp1d
return interp1d(
self.POWER_CURVE_MS,
self.POWER_CURVE_KW,
kind="cubic",
bounds_error=False,
fill_value=(0.0, 8000.0)
)
def calculate_wake_effect(
self,
turbine_positions: list[tuple[float, float]],
free_stream_wind_ms: float,
wind_direction_deg: float
) -> list[float]:
"""
Calcola velocità del vento per ogni turbina considerando il wake effect.
Usa modello Jensen (Park model) semplificato.
Args:
turbine_positions: lista (x, y) in metri
free_stream_wind_ms: velocità vento libero
wind_direction_deg: direzione vento (0=Nord)
Returns:
Lista velocità vento effettiva per ogni turbina
"""
n = len(turbine_positions)
effective_wind = [free_stream_wind_ms] * n
k_wake = 0.04 # costante espansione scia offshore
rotor_d = 236.0 # diametro rotore Vestas V236 in metri
ct = 0.79 # coefficiente di thrust a rated wind speed
# Converti direzione vento in vettore
wind_rad = np.radians(wind_direction_deg)
wind_vec = np.array([np.sin(wind_rad), np.cos(wind_rad)])
for i, (xi, yi) in enumerate(turbine_positions):
for j, (xj, yj) in enumerate(turbine_positions):
if i == j:
continue
# Vettore da turbina j a turbina i
delta = np.array([xi - xj, yi - yj])
dist = np.linalg.norm(delta)
if dist < 1.0:
continue
# Componente nella direzione del vento
downstream = np.dot(delta, wind_vec)
if downstream <= 0:
continue # i non e downstream di j
# Distanza laterale dalla scia
lateral = abs(np.cross(delta / dist, wind_vec)) * dist
# Raggio della scia a distanza downstream
wake_radius = rotor_d / 2.0 + k_wake * downstream
# Overlap fraction (semplificato: cilindrico)
if lateral < wake_radius:
overlap = max(0.0, 1.0 - lateral / wake_radius)
# Deficit di velocità Jensen model
deficit = (1.0 - np.sqrt(1.0 - ct)) * (rotor_d / (rotor_d + 2 * k_wake * downstream)) ** 2
effective_wind[i] = min(effective_wind[i], free_stream_wind_ms * (1.0 - overlap * deficit))
return effective_wind
def estimate_farm_production(
self,
wind_speed_ms: float,
wind_direction_deg: float,
turbine_positions: list[tuple[float, float]],
availability: Optional[list[float]] = None
) -> dict:
"""
Stima produzione totale del parco con wake model.
Args:
wind_speed_ms: velocità vento a hub height
wind_direction_deg: direzione vento
turbine_positions: coordinate turbine
availability: disponibilità per turbina (0-1)
Returns:
Produzione totale, per turbina, e perdite wake
"""
if availability is None:
availability = [1.0] * self.n_turbines
# Calcola velocità effettiva per ogni turbina
effective_winds = self.calculate_wake_effect(
turbine_positions, wind_speed_ms, wind_direction_deg
)
# Calcola potenza per turbina dalla curva
powers_kw = []
for i, (v_eff, avail) in enumerate(zip(effective_winds, availability)):
p = float(self._power_interpolator(v_eff)) * avail
powers_kw.append(p)
total_power_mw = sum(powers_kw) / 1000.0
ideal_power_mw = float(self._power_interpolator(wind_speed_ms)) * self.n_turbines / 1000.0
wake_loss_pct = max(0.0, (ideal_power_mw - total_power_mw) / ideal_power_mw * 100) if ideal_power_mw > 0 else 0.0
return {
"total_power_mw": round(total_power_mw, 2),
"ideal_power_mw": round(ideal_power_mw, 2),
"wake_loss_pct": round(wake_loss_pct, 2),
"capacity_factor": round(total_power_mw / (self.n_turbines * 8.0) * 100, 1),
"per_turbine_kw": [round(p, 1) for p in powers_kw],
"effective_wind_ms": [round(v, 2) for v in effective_winds]
}
def detect_yaw_misalignment(
self,
turbine_id: str,
lidar_wind_dir: float,
nacelle_dir: float,
threshold_deg: float = 5.0
) -> dict:
"""
Rileva disallineamento yaw da dati LIDAR + encoder nacelle.
Un yaw error di 10° riduce la produzione del ~3% (cos^2 law).
"""
yaw_error = abs(lidar_wind_dir - nacelle_dir)
# Normalizza nell'intervallo -180/+180
if yaw_error > 180:
yaw_error = 360 - yaw_error
# Stima perdita di produzione (legge del coseno)
production_loss_pct = (1 - np.cos(np.radians(yaw_error)) ** 3) * 100
return {
"turbine_id": turbine_id,
"yaw_error_deg": round(yaw_error, 2),
"production_loss_pct": round(production_loss_pct, 2),
"action_required": yaw_error > threshold_deg,
"priority": "HIGH" if yaw_error > 15.0 else
"MEDIUM" if yaw_error > threshold_deg else "NONE"
}
Výkon, škálovatelnost a zabezpečení
Požadavky na výkon
| Komponent | Cílová latence | Frekvence aktualizace | Skladování/den |
|---|---|---|---|
| Edge twin (stejnosměrný tok energie) | < 10 ms | 10 Hz | 50 MB/turbína |
| NR tok energie (cloud) | < 500 ms | 0,03 Hz (30 s) | 5 MB / rozvodna |
| ML závěr | < 100 ms | 1 Hz | 2 MB/položka |
| 3D vizualizace synchronizace | < 1 s | 0,1 Hz (10 s) | zanedbatelný |
| Export modelu CGMES | < 5 s | 1/hod | 100 MB/síť |
Zabezpečení: Ochrana dat SCADA
Varování: Digitální dvojčata jako útočný vektor
Energetické digitální dvojče a vysoce hodnotný cíl pro kybernetické útoky: obsahuje kompletní topologie sítě, zranitelnosti aktiv a vzory ochranu. Směrnice NIS2 (v Itálii platná od roku 2024) ukládá požadavky přísné pro provozovatele základních služeb v energetickém sektoru:
- Segmentace sítě: datová vrstva DT musí být v DMZ odděleném od sítě OT
- Přístup s nulovou důvěrou: každá komponenta se ověřuje, žádná implicitní důvěra
- Sanitace dat: data SCADA do cloudu DT musí procházet datovou diodou (fyzická jednosměrná)
- Auditní stopy: každý dotaz na modelu musí být přihlášen, aby bylo vyhověno
- Šifrování v klidu: Modely CIM/CGMES obsahují citlivá data topologie
# Architettura di sicurezza raccomandata per Digital Twin energetico
#
# OT Network (IEC 62443) DMZ Cloud
# ┌─────────────────────┐ ┌──────────────┐ ┌───────────────┐
# │ IED / RTU / SCADA │───►│ Data Diode │───►│ Data Broker │
# │ (Modbus, IEC 61850) │ │ (unidirez.) │ │ (Kafka/MQTT) │
# └─────────────────────┘ └──────────────┘ └───────┬───────┘
# │ TLS 1.3
# ┌─────────────────────┐ ┌──────────────┐ ┌───────▼───────┐
# │ Historian (locale) │ │ Firewall │ │ DT Engine │
# │ OSIsoft PI / AVEVA │ │ IPS/IDS │ │ (Azure DT / │
# └─────────────────────┘ └──────────────┘ │ AWS TwinMkr) │
# └───────────────┘
#
# Regole chiave:
# - Nessuna connessione INBOUND verso OT network
# - Autenticazione mTLS per tutti i servizi DT
# - RBAC: operatore vede solo i propri asset
# - Dati storici: retention 7 anni (requisito ARERA in Italia)
# - Backup modello CIM: immutabile su S3/ADLS con versioning
# Esempio: validazione input da SCADA prima di aggiornare il twin
import re
from typing import Any
SCADA_FIELD_LIMITS = {
"load_mw": (0.0, 1000.0),
"voltage_kv": (0.0, 500.0),
"tap_position": (-16, 16),
"ambient_temp_c": (-40.0, 60.0),
"current_ka": (0.0, 10.0)
}
def validate_scada_measurement(field: str, value: Any) -> float:
"""
Valida misura SCADA prima dell'ingestion nel digital twin.
Previene injection di valori anomali (tampering / errori sensore).
"""
if not isinstance(value, (int, float)):
raise ValueError(f"Campo {field}: tipo non valido {type(value)}")
value = float(value)
if not (-1e9 < value < 1e9): # sanity check overflow
raise ValueError(f"Campo {field}: overflow numerico {value}")
if field in SCADA_FIELD_LIMITS:
lo, hi = SCADA_FIELD_LIMITS[field]
if not (lo <= value <= hi):
raise ValueError(
f"Campo {field}: valore {value} fuori range fisico [{lo}, {hi}]"
)
return value
Budoucnost: Federovaná digitální dvojčata a AI-Augmented Grid
Nové trendy pro období 2026–2028 v oblasti energetických digitálních dvojčat:
Trend 2026–2028: Digitální dvojče nové generace
- Federovaná digitální dvojčata: digitální dvojčata TSO a DSO několik, které spolupracují při zachování soukromí dat pomocí technik federovaného učení k aktualizaci sdílených modelů bez sdílení citlivá data o topologii sítě.
- Fyzikální modely rozšířené o umělou inteligenci: fyzicky neuronové sítě (PINN - Physics-Informed Neural Networks), které kombinují Maxwell/Kirchhoffovy rovnice s pozorovanými daty, snížení o 60 % chyba simulace ve srovnání se samotnými analytickými modely.
- Autonomní Grid Twin: digitální dvojče s RL agenty (Reinforcement Learning), které provádějí akce autonomního řízení (přepínač odboček, odkládání nákladu, expedice skladu) optimalizace v čase skutečné s lidským dohledem.
- Digitální dvojče jako služba (DTaaS) pro malé a střední podniky: platformy SaaS, které nabízejí předkonfigurovaná digitální dvojčata pro průmyslové závody střední (1-50 MW), snížení vstupní bariéry z 500 tisíc na 50 tisíc EUR.
- 5G ultraspolehlivá nízká latence: Připojení 5G URLLC (latence <1ms, 99,999% spolehlivost) umožní edge twin v terénu bez mezilehlých bran, s přímou synchronizací senzor-cloud.
Zdroje a reference
| Zdroj | Typ | Používání |
|---|---|---|
| pandapower.readthedocs.io | Dokumentace | Tok výkonu, optimální tok výkonu, zkrat |
| pypsa.readthedocs.io | Dokumentace | Rozšíření kapacity, nasazení jednotky, více období |
| IEC 60076-7:2018 | Norma | Tepelný model výkonových transformátorů |
| IEC 60599:2015 | Norma | DGA interpretace pro diagnostiku transformátorů |
| ENTSO-E CIM / CGMES 3.0 | Norma | Datový model sítě, výměna mezi TSO/PDS |
| Dokumenty Azure Digital Twins | Cloudová platforma | DTDL, CIM ontologie, Azure IoT Hub |
| Dokumenty AWS IoT TwinMaker | Cloudová platforma | Scene Composer, plugin Grafana, Timestream |
| OpenModelica | Open source nástroje | Komplexní fyzikální simulace (jazyk Modelica) |
| Směrnice NIS2 (EU 2022/2555) | Předpisy | Kybernetická bezpečnost pro operátory základních služeb |
Závěry
Digitální dvojče pro energetickou infrastrukturu dnes představuje jeden z nejvíce transformační technologie v tomto odvětví: trh 34 miliard USD v roce 2025 a roste o 34,7 % ročně, taženo energetickým přechodem rostoucí složitost sítí s DER a regulační tlak na spolehlivost a bezpečnost.
Viděli jsme, jak implementovat digitální dvojče na podnikové úrovni pomocí: síla panda pro tok energie Newton-Raphson v reálném čase, tepelný model IEC 60076-7 pro transformátory, PyPSA pro optimalizaci plánování sítě, modely ML s Random Forest a XGBoost pro prediktivní údržbu, a model CIM/CGMES pro interoperabilitu mezi systémy.
Klíčem k úspěchu je dobře oddělená architektura vrstev: fyzika, data, model a vizualizace musí mít jasná rozhraní, která umožňují vyvíjet každou složku nezávisle. Digitální stín (L1) je první přístupný rozvor; normativní dvojče (L4) a konečný cíl, kde infrastruktura se sama optimalizuje.
Další článek v seriálu
Další a poslední článek ze série EnergyTech zkoumá Blockchain pro P2P obchodování s energií: Chytré smlouvy a omezení: Solidní architektura inteligentních smluv pro peer-to-peer obchodování s energií, automatické vypořádání v řetězci a navigace podle evropských regulačních omezení.
Související série
- MLOps: přinést modely prediktivní údržby ML ve výrobě s MLflow, DVC a Kubernetes
- AI inženýrství: implementovat RAG na dokumentaci inženýrství závodu a LLM pro analýzu chybových hlášení
- Obchod s daty a umělou inteligencí: pro správu energetických dat a budování ETL/ELT potrubí pro analýzu







