Cyfrowy bliźniak dla infrastruktury energetycznej: symulacja w czasie rzeczywistym
Globalny rynek cyfrowych bliźniaków dla sektora energetycznego osiągnął 34,1 miliarda dolarów w 2025 roku i będzie rósł w tempie CAGR wynoszącym 34,7% do 2034 r., według Fortune Business Insights. To nie jest obietnica na przyszłość: narzędzia W krajach europejskich działają już cyfrowe bliźniaki podstacji WN/SN i farm wiatrowych sieci offshore i dystrybucyjne, które synchronizują tysiące punktów pomiarowych na sekundę. We Włoszech Terna i Enel uruchomiły pilotażowe programy cyfrowych bliźniaków dla sieci przesył krajowy w ramach inicjatyw PNRR na rzecz transformacji energetycznej.
Un cyfrowy bliźniak energii nie jest to po prostu model symulacyjny statyczny. Jest to system dwukierunkowy, który odzwierciedla stan fizyczny rośliny w czasie real, pozwala na przeprowadzanie symulacji predykcyjnych, ocenę scenariuszy „co by było, gdyby” i wydawanie poleceń aktywa fizyczne poprzez pętlę sprzężenia zwrotnego. Różnica w stosunku do prostego SCADA jest głęboka: gdzie SCADA monitoruje i steruje cyfrowym bliźniakiem rozumie, symulować e przewiduje.
W tym artykule badamy praktyczne wdrożenie cyfrowych bliźniaków w infrastrukturze energia: z fizycznego modelowania sieci elektroenergetycznych pandamoc e PyPSA, integracja z SCADA/IoT, konserwacja predykcyjna na transformatorach z ML, aż do rzeczywistych przypadków podstacji WN/SN i morskich farm wiatrowych.
Czego się nauczysz
- Taksonomia cyfrowych bliźniaków: opisowa, informacyjna, predykcyjna, normatywna
- Architektura warstwowa: Jednostka fizyczna, Warstwa danych, Warstwa modelu, Wizualizacja
- Modele fizyczne: rozpływ mocy Newtona-Raphsona, modele termiczne transformatorów, krzywe degradacji
- Pozyskiwanie danych z SCADA, czujników IoT, API pogodowego i inteligentnych liczników
- Symulacja sieci za pomocą pandapower i PyPSA w Pythonie
- Synchronizacja w czasie rzeczywistym: cyfrowy cień vs dwukierunkowy cyfrowy bliźniak
- Konserwacja predykcyjna: Random Forest i XGBoost dla RUL na transformatorach
- Scenariusze typu „co by było, gdyby”: sytuacje awaryjne N-1 i planowanie rozbudowy sieci
- CIM IEC 61970/61968 i CGMES dla interoperacyjności modeli
- Azure Digital Twins vs AWS IoT TwinMaker: wybór platformy chmurowej
- Studium przypadku: podstacja WN/SN i morska farma wiatrowa
- Cyfrowy bliźniak Edge zapewniający minimalne opóźnienia w bramach przemysłowych
Seria EnergyTech: 10 artykułów na temat energii cyfrowej
To dziewiąty artykuł z tej serii EnergiaTech, dedykowane protokołom, do architektur oprogramowania i algorytmów, które zmieniają zarządzanie energią energii elektrycznej w dobie transformacji energetycznej.
| # | Przedmiot | Technologie | Poziom |
|---|---|---|---|
| 1 | Protokół OCPP 2.x: Budowa systemów ładowania pojazdów elektrycznych | OCPP, WebSocket, Python, ISO 15118 | Zaawansowany |
| 2 | Architektura DERMS: agregacja milionów rozproszonych zasobów | DERMS, REST, MQTT, PostgreSQL | Zaawansowany |
| 3 | Prognoza energii odnawialnej z ML: LSTM dla energii słonecznej i wiatrowej | Python, LSTM, Prorok, FastAPI | Zaawansowany |
| 4 | System zarządzania baterią do przechowywania w skali sieciowej | BMS, SoC/SoH, Python, magistrala CAN | Zaawansowany |
| 5 | IEC 61850 dla inżynierów oprogramowania: komunikacja w inteligentnych sieciach | IEC 61850, GOOSE, MMS, SCADA | Zaawansowany |
| 6 | Równoważenie obciążenia ładowania pojazdów elektrycznych: algorytmy czasu rzeczywistego | Algorytmy, Python, OCPP, WebSocket | Zaawansowany |
| 7 | Od MQTT do InfluxDB: platforma IoT energii w czasie rzeczywistym | MQTT, InfluxDB, Telegraf, Grafana | Mediator |
| 8 | Architektura oprogramowania do rozliczania emisji dwutlenku węgla: platformy ESG | Protokół GHG, Python, API, raportowanie | Mediator |
| 9 | Cyfrowy bliźniak dla infrastruktury energetycznej (tutaj jesteś) | pandapower, PyPSA, ML, Azure DT, Python | Zaawansowany |
| 10 | Blockchain dla handlu energią P2P: inteligentne kontrakty i ograniczenia | Solidność, Ethereum, inteligentne kontrakty | Zaawansowany |
Taksonomia cyfrowych bliźniaków energii
Nie wszystkie „cyfrowe bliźniaki” są sobie równe. Przyjęto model Grieten-Kritzinger przez IEC i wielu dostawców przemysłowych definiuje cztery poziomy dojrzałości w zależności od kierunku przepływu danych i mocy obliczeniowej:
| Poziom | Typ | Przepływ danych | pojemność | Przykład energii |
|---|---|---|---|---|
| L1 | Cyfrowy cień (opisowy) | Fizyczny → Cyfrowy | Monitorowanie aktualnego stanu | Pulpit SCADA w czasie rzeczywistym |
| L2 | Informacyjny cyfrowy bliźniak | Fizyczne ↔ Cyfrowe (historyczne) | Analiza historyczna, KPI, trendy | Transformator do monitorowania stanu zasobów |
| L3 | Przewidywalny cyfrowy bliźniak | Fizyczny ↔ Cyfrowy + ML | Predykcja awarii, RUL, symulacja | Prognoza żywotności kabla WN |
| L4 | Cyfrowy bliźniak nakazowy | Fizyczny ↔ Cyfrowy (aktywny dwukierunkowo) | Zalecenie/wykonanie optymalnych działań | Automatyczne ponowne planowanie konserwacji |
Cyfrowy cień kontra cyfrowy bliźniak
Un cyfrowy cień zbiera dane z organizmu, ale nie ma na nie wpływu (przepływ jednokierunkowy). A cyfrowy bliźniak prawdziwy ma kanał sprzężenia zwrotnego: może wysyłać wartości zadane, polecenia lub parametry do zasobu fizycznego, zamknięcie pętli sterującej. W praktyce większość systemów przemysłowych obecnie działa na poziomie L1-L2, a celem jest osiągnięcie poziomu L3-L4 do 2027 r.
Architektura warstwowa cyfrowego bliźniaka energetycznego
Cyfrowy bliźniak klasy korporacyjnej dla infrastruktury energetycznej składa się z czterech odrębne warstwy, każda z dobrze określonymi obowiązkami:
Warstwa 1: Byty fizyczne
Oprzyrządowane aktywa fizyczne: transformatory WN/SN z czujnikami DGA (gazu rozpuszczonego). analizy), szynoprzewody z przekładnikami prądowymi, linie napowietrzne z czujnikami temperatury i wiatrowej, generatory z monitorowaniem drgań, akumulatory z czujnikami SoC/SoH. Warstwa fizyk generuje surowe dane ze zmienną częstotliwością: od 1 próbki/godzinę dla temperatury środowisku przy 1 próbce/ms dla środków ochronnych IEC 61850.
Warstwa 2: Warstwa danych
Warstwa pozyskiwania, normalizacji i przechowywania danych. Obejmuje:
- SCADA/EMS: dane operacyjne w czasie rzeczywistym poprzez DNP3, Modbus, IEC 61850
- Platforma IoT: czujniki bezprzewodowe poprzez MQTT/AMQP do brokera w chmurze
- DB szeregów czasowych: InfluxDB lub TimescaleDB dla telemetrii o dużej szybkości
- Jezioro danych: przechowywanie danych historycznych dla szkoleń ML (S3, ADLS)
- Interfejs API pogody: dane meteorologiczne do modeli termicznych i prognoz energii słonecznej/wiatrowej
- Inteligentne liczniki: profile obciążenia agregowane za pośrednictwem DLMS/COSEM
Warstwa 3: Warstwa modelu
Serce obliczeniowe: modele fizyczne i ML, które replikują zachowanie aktywów. Obejmuje moduł rozwiązywania przepływu mocy, modele termiczne, modele degradacji i ML na potrzeby konserwacji predykcyjnej.
Warstwa 4: Wizualizacja i integracja
Dashboardy operacyjne (Grafana, Power BI), renderowanie 3D podstacji (Three.js, Unity, Siemens NX), REST/WebSocket API dla systemów innych firm (ERP, CMMS, EMS) oraz interfejsy do kontroli nakazowej.
Modele fizyczne: przepływ mocy z Newtonem-Raphsonem
Sercem każdego cyfrowego bliźniaka sieci energetycznej jest rozwiązanie przepływu mocy. Algorytm Newtona-Raphsona rozwiązuje równania nieliniowe opisujące równowagę mocy czynnej i biernej na każdym węźle (szynie) sieci. Z pandamoc możemy zbudować kompletny model podstacji i symulować jej zachowanie w ciągu kilku milisekund.
Instalacja i konfiguracja
# Installazione dependencies
pip install pandapower pypsa numpy pandas scikit-learn xgboost
pip install influxdb-client paho-mqtt asyncio aiohttp
pip install plotly dash # per visualizzazione
# Versioni consigliate (2025)
# pandapower==2.14.x
# pypsa==0.31.x
# scikit-learn==1.5.x
# xgboost==2.1.x
Modelowanie sieci za pomocą pandapower
Budujemy kompletny model stacji WN/SN wraz z liniami dystrybucyjnymi, transformatory, obciążenia i generatory rozproszone:
import pandapower as pp
import pandapower.networks as pn
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
import json
@dataclass
class SubstationConfig:
"""Configurazione sottostazione AT/MT"""
name: str
voltage_hv_kv: float = 132.0 # Alta Tensione kV
voltage_mv_kv: float = 20.0 # Media Tensione kV
transformer_mva: float = 40.0 # Potenza trasformatore MVA
n_feeders: int = 6 # Numero feeder MT
peak_load_mw: float = 28.0 # Carico picco MW
pv_capacity_mw: float = 4.5 # Fotovoltaico connesso MW
class EnergyDigitalTwin:
"""
Digital Twin per sottostazione AT/MT con simulazione power flow real-time.
Implementa aggiornamento continuo dallo stato SCADA.
"""
def __init__(self, config: SubstationConfig):
self.config = config
self.net = pp.create_empty_network(name=config.name)
self._build_network_model()
self._state_cache: dict = {}
def _build_network_model(self) -> None:
"""Costruisce il modello di rete dalla configurazione."""
cfg = self.config
# Bus AT (slack bus = riferimento di tensione)
self.bus_ht = pp.create_bus(
self.net,
vn_kv=cfg.voltage_hv_kv,
name="Bus_AT_Main",
type="b"
)
# Bus MT primario (lato secondario trasformatore)
self.bus_mt_primary = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name="Bus_MT_Sbarra",
type="b"
)
# Trasformatore AT/MT principale
pp.create_transformer_from_parameters(
self.net,
hv_bus=self.bus_ht,
lv_bus=self.bus_mt_primary,
sn_mva=cfg.transformer_mva,
vn_hv_kv=cfg.voltage_hv_kv,
vn_lv_kv=cfg.voltage_mv_kv,
vkr_percent=0.3, # perdite rame %
vk_percent=8.5, # tensione di cortocircuito %
pfe_kw=35.0, # perdite ferro kW
i0_percent=0.08, # corrente a vuoto %
name="TR_AT_MT_Main",
tap_pos=0, # posizione tap changer
tap_neutral=0,
tap_min=-8,
tap_max=8,
tap_step_percent=1.25
)
# Rete esterna AT (equivalente di Thevenin della rete)
pp.create_ext_grid(
self.net,
bus=self.bus_ht,
vm_pu=1.0,
va_degree=0.0,
name="Rete_Trasmissione",
s_sc_max_mva=3500.0, # potenza di cortocircuito rete
rx_max=0.1
)
# Crea feeder MT con carichi e generazione
self.feeder_buses = []
self.load_ids = []
self.pv_ids = []
for i in range(cfg.n_feeders):
# Bus feeder
bus_feeder = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name=f"Bus_Feeder_{i+1:02d}"
)
self.feeder_buses.append(bus_feeder)
# Linea MT dal bus principale al feeder (cavo XLPE 150mm2)
pp.create_line_from_parameters(
self.net,
from_bus=self.bus_mt_primary,
to_bus=bus_feeder,
length_km=2.5 + i * 0.8,
r_ohm_per_km=0.206, # resistenza XLPE 150mm2
x_ohm_per_km=0.083,
c_nf_per_km=320.0,
max_i_ka=0.261,
name=f"Linea_Feeder_{i+1:02d}"
)
# Carico per feeder (distribuzione uniforme + varianza)
load_mw = cfg.peak_load_mw / cfg.n_feeders * (0.8 + 0.4 * np.random.rand())
load_id = pp.create_load(
self.net,
bus=bus_feeder,
p_mw=load_mw,
q_mvar=load_mw * 0.3, # power factor ~ 0.958
name=f"Carico_Feeder_{i+1:02d}",
controllable=False
)
self.load_ids.append(load_id)
# Generatore fotovoltaico (bus MT primario)
pv_id = pp.create_sgen(
self.net,
bus=self.bus_mt_primary,
p_mw=cfg.pv_capacity_mw,
q_mvar=0.0,
name="FV_Rooftop",
type="PV",
controllable=True
)
self.pv_ids.append(pv_id)
def update_from_scada(self, scada_data: dict) -> None:
"""
Aggiorna il modello con dati real-time da SCADA/IoT.
Args:
scada_data: dizionario con misure correnti
- load_mw_per_feeder: lista potenze attive
- pv_mw: generazione fotovoltaica attuale
- tap_position: posizione tap changer
- ambient_temp_c: temperatura ambiente
"""
# Aggiorna carichi per feeder
loads = scada_data.get("load_mw_per_feeder", [])
for i, (load_id, p_mw) in enumerate(zip(self.load_ids, loads)):
q_mvar = p_mw * 0.30 # power factor fisso (da modello)
self.net.load.at[load_id, "p_mw"] = p_mw
self.net.load.at[load_id, "q_mvar"] = q_mvar
# Aggiorna generazione PV
pv_mw = scada_data.get("pv_mw", 0.0)
if self.pv_ids:
self.net.sgen.at[self.pv_ids[0], "p_mw"] = pv_mw
# Aggiorna posizione tap changer
tap = scada_data.get("tap_position", 0)
self.net.trafo.at[0, "tap_pos"] = np.clip(tap, -8, 8)
# Cache stato per confronto
self._state_cache = scada_data.copy()
def run_power_flow(self) -> dict:
"""
Esegue simulazione Newton-Raphson e restituisce risultati.
Returns:
Dizionario con tensioni, correnti, perdite e stato componenti
"""
try:
pp.runpp(
self.net,
algorithm="nr", # Newton-Raphson
calculate_voltage_angles=True,
max_iteration=50,
tolerance_mva=1e-8,
enforce_q_lims=True
)
results = {
"converged": True,
"buses": self._extract_bus_results(),
"lines": self._extract_line_results(),
"transformer": self._extract_trafo_results(),
"losses_mw": float(self.net.res_ext_grid["p_mw"].sum() -
self.net.res_load["p_mw"].sum() +
self.net.res_sgen["p_mw"].sum()),
"timestamp": pd.Timestamp.now().isoformat()
}
return results
except pp.LoadflowNotConverged:
return {
"converged": False,
"error": "Power flow non convergente - possibile violazione limiti",
"timestamp": pd.Timestamp.now().isoformat()
}
def _extract_bus_results(self) -> list:
results = []
for idx, row in self.net.res_bus.iterrows():
bus_name = self.net.bus.at[idx, "name"]
vn_kv = self.net.bus.at[idx, "vn_kv"]
results.append({
"bus_id": int(idx),
"name": bus_name,
"vm_pu": float(row["vm_pu"]),
"vm_kv": float(row["vm_pu"] * vn_kv),
"va_degree": float(row["va_degree"]),
"within_limits": 0.95 <= float(row["vm_pu"]) <= 1.05
})
return results
def _extract_line_results(self) -> list:
results = []
for idx, row in self.net.res_line.iterrows():
results.append({
"line_id": int(idx),
"name": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"]),
"i_ka": float(row["i_ka"]),
"pl_mw": float(row["pl_mw"]),
"overloaded": float(row["loading_percent"]) > 100.0
})
return results
def _extract_trafo_results(self) -> dict:
row = self.net.res_trafo.iloc[0]
return {
"loading_percent": float(row["loading_percent"]),
"pl_mw": float(row["pl_mw"]),
"i_hv_ka": float(row["i_hv_ka"]),
"i_lv_ka": float(row["i_lv_ka"]),
"overloaded": float(row["loading_percent"]) > 100.0
}
def check_n1_contingency(self, line_to_remove: int) -> dict:
"""
Simulazione contingenza N-1: rimuove una linea e verifica la tenuta della rete.
Args:
line_to_remove: ID della linea da togliere fuori servizio
Returns:
Risultati power flow post-contingenza con violazioni
"""
# Salva stato originale
original_in_service = self.net.line.at[line_to_remove, "in_service"]
# Applica contingenza
self.net.line.at[line_to_remove, "in_service"] = False
try:
pp.runpp(self.net, algorithm="nr", max_iteration=50)
violations = []
for idx, row in self.net.res_bus.iterrows():
if not (0.90 <= float(row["vm_pu"]) <= 1.10):
violations.append({
"type": "voltage",
"bus": self.net.bus.at[idx, "name"],
"vm_pu": float(row["vm_pu"])
})
for idx, row in self.net.res_line.iterrows():
if float(row["loading_percent"]) > 120.0:
violations.append({
"type": "overload",
"line": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"])
})
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": True,
"n_violations": len(violations),
"violations": violations,
"secure": len(violations) == 0
}
except pp.LoadflowNotConverged:
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": False,
"secure": False,
"error": "Rete non sicura: power flow non convergente post-contingenza"
}
finally:
# Ripristina stato originale
self.net.line.at[line_to_remove, "in_service"] = original_in_service
return result
Model termiczny transformatora
Transformator jest najważniejszym elementem podstacji. Jego stan zdrowia zależy bezpośrednio od temperatury oleju i uzwojeń. Standard IEC 60076-7 definiuje podstawowy analityczny model termiczny większości cyfrowych bliźniaków transformatorów przemysłowych.
import numpy as np
from dataclasses import dataclass
@dataclass
class TransformerThermalModel:
"""
Modello termico trasformatore secondo IEC 60076-7.
Implementa il modello esponenziale per calcolo temperatura olio e avvolgimenti.
"""
# Parametri costruttivi (da nameplate / misure FAT)
rated_power_mva: float = 40.0
theta_amb_ref: float = 20.0 # temperatura ambiente riferimento
delta_theta_or: float = 55.0 # sopratemperatura olio a pieno carico
delta_theta_hr: float = 23.0 # gradiente avvolgimento-olio a pieno carico
tau_o: float = 3.0 * 3600 # costante di tempo termica olio [s]
tau_w: float = 10 * 60 # costante di tempo avvolgimenti [s]
n_exp: float = 0.9 # esponente di carico olio (ONAN)
m_exp: float = 0.8 # esponente di carico avvolgimenti
r_load: float = 6.0 # rapporto perdite carico/vuoto
# Stato iniziale
theta_oil: float = 20.0 # temperatura olio corrente
theta_winding: float = 20.0 # temperatura avvolgimento corrente
def step(
self,
load_fraction: float,
ambient_temp: float,
dt_seconds: float = 60.0
) -> dict:
"""
Aggiornamento modello con passo temporale dt.
Args:
load_fraction: carico normalizzato (0=vuoto, 1=pieno carico)
ambient_temp: temperatura ambiente in gradi C
dt_seconds: passo temporale in secondi
Returns:
Stato termico aggiornato con temperatura olio, avvolgimenti e HST
"""
# Sopratemperatura olio a regime per questo carico
k = load_fraction # fattore di carico
delta_theta_o_steady = self.delta_theta_or * (
(1 + self.r_load * k**2) / (1 + self.r_load)
) ** self.n_exp
# Dinamica olio (prima equazione differenziale)
d_theta_oil = (
(ambient_temp + delta_theta_o_steady - self.theta_oil) / self.tau_o
) * dt_seconds
self.theta_oil = self.theta_oil + d_theta_oil
# Gradiente avvolgimento a regime
delta_theta_h_steady = self.delta_theta_hr * (k ** (2 * self.m_exp))
# Hot Spot Temperature (HST) - temperatura punto caldo avvolgimento
# Dinamica più rapida degli avvolgimenti
d_theta_winding = (
(self.theta_oil + delta_theta_h_steady - self.theta_winding) / self.tau_w
) * dt_seconds
self.theta_winding = self.theta_winding + d_theta_winding
# Aging Acceleration Factor (AAF) per calcolo consumo vita isolante
# Riferimento: theta_ref = 98°C per isolante classe A
theta_ref = 98.0
aaf = np.exp(
15000.0 / (theta_ref + 273.0) - 15000.0 / (self.theta_winding + 273.0)
)
return {
"theta_oil_c": round(self.theta_oil, 2),
"theta_winding_hst_c": round(self.theta_winding, 2),
"theta_ambient_c": ambient_temp,
"load_fraction": load_fraction,
"aaf": round(float(aaf), 4),
"thermal_limit_exceeded": self.theta_winding > 120.0, # limite continuo
"emergency_limit_exceeded": self.theta_winding > 140.0 # limite emergenza
}
def calculate_rul_hours(self, avg_hst: float) -> float:
"""
Stima Remaining Useful Life (RUL) dell'isolante cellulosico.
Basato sul modello di Arrhenius (IEC 60076-7 Section 8).
Args:
avg_hst: temperatura media Hot Spot in gradi C
Returns:
Ore di vita residua stimate
"""
# Vita nominale isolante a 98°C = 65.000 ore (IEC 60076-7)
nominal_life_hours = 65_000.0
theta_ref = 98.0
# Aging rate normalizzato
per_unit_life_remaining = 1.0 # assume vita residua 100%
aaf_avg = np.exp(
15000.0 / (theta_ref + 273.15) - 15000.0 / (avg_hst + 273.15)
)
# Ore residue
rul_hours = nominal_life_hours * per_unit_life_remaining / aaf_avg
return round(rul_hours, 0)
Pozyskiwanie danych: od SCADA i IoT do cyfrowego bliźniaka
Jakość cyfrowego bliźniaka zależy bezpośrednio od jakości i opóźnienia danych przy wejściu. W prawdziwym systemie energetycznym musimy zarządzać źródłami heterogenicznymi: RTU/IED poprzez IEC 61850/DNP3, czujniki IoT poprzez MQTT, prognoza pogody poprzez REST API, inteligentny licznik poprzez DLMS/COSEM.
import asyncio
import aiohttp
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
from datetime import datetime, timezone
from typing import Callable
class EnergyDataIngestion:
"""
Gestione ingestione dati multi-sorgente per digital twin energetico.
Supporta MQTT (IoT sensors), REST API (weather, market), SCADA (simulato).
"""
def __init__(
self,
influx_url: str,
influx_token: str,
influx_org: str,
influx_bucket: str,
mqtt_broker: str,
mqtt_port: int = 1883
):
# InfluxDB client per time-series storage
self._influx = InfluxDBClient(
url=influx_url,
token=influx_token,
org=influx_org
)
self._write_api = self._influx.write_api(write_options=SYNCHRONOUS)
self._bucket = influx_bucket
self._org = influx_org
# MQTT client per sensori IoT
self._mqtt_client = mqtt.Client(client_id="dt-ingestion-01")
self._mqtt_client.on_connect = self._on_mqtt_connect
self._mqtt_client.on_message = self._on_mqtt_message
self._mqtt_broker = mqtt_broker
self._mqtt_port = mqtt_port
# Callback per aggiornare il modello in tempo reale
self._model_update_callbacks: list[Callable] = []
def register_model_callback(self, cb: Callable) -> None:
"""Registra callback chiamata a ogni aggiornamento dati."""
self._model_update_callbacks.append(cb)
def _on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
# Sottoscrizione ai topic energetici
topics = [
"substation/+/transformer/+/measurements",
"substation/+/feeder/+/measurements",
"substation/+/weather/measurements",
"substation/+/pv/measurements"
]
for topic in topics:
client.subscribe(topic, qos=1)
print(f"MQTT connesso al broker - subscribed a {len(topics)} topic")
def _on_mqtt_message(self, client, userdata, msg):
"""Elaborazione messaggio MQTT con parsing e storage."""
try:
payload = json.loads(msg.payload.decode())
topic_parts = msg.topic.split("/")
# Estrai metadati dal topic
substation_id = topic_parts[1]
asset_type = topic_parts[2]
asset_id = topic_parts[3]
# Crea punto InfluxDB
point = (
Point(asset_type)
.tag("substation", substation_id)
.tag("asset_id", asset_id)
.time(
datetime.fromisoformat(
payload.get("timestamp", datetime.now(timezone.utc).isoformat())
)
)
)
# Aggiungi misure come fields
for key, value in payload.get("measurements", {}).items():
if isinstance(value, (int, float)):
point = point.field(key, float(value))
# Scrivi su InfluxDB
self._write_api.write(
bucket=self._bucket,
org=self._org,
record=point
)
# Notifica callbacks (aggiornamento modello)
for cb in self._model_update_callbacks:
cb(asset_type, asset_id, payload)
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Errore parsing messaggio MQTT: {e} - topic: {msg.topic}")
async def fetch_weather_forecast(
self,
lat: float,
lon: float,
session: aiohttp.ClientSession
) -> dict:
"""
Recupera previsioni meteo da Open-Meteo API (gratuita, no API key).
Usata per correzione temperatura ambiente nel modello termico trasformatore.
"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,windspeed_10m,direct_radiation",
"forecast_days": 3,
"timezone": "Europe/Rome"
}
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.json()
return {
"hourly_temp_c": data["hourly"]["temperature_2m"],
"hourly_wind_ms": data["hourly"]["windspeed_10m"],
"hourly_radiation_wm2": data["hourly"]["direct_radiation"],
"times": data["hourly"]["time"]
}
else:
raise ValueError(f"Weather API error: HTTP {resp.status}")
def start_mqtt(self) -> None:
"""Avvia connessione MQTT in thread separato."""
self._mqtt_client.connect(self._mqtt_broker, self._mqtt_port, keepalive=60)
self._mqtt_client.loop_start()
def stop(self) -> None:
"""Cleanup risorse."""
self._mqtt_client.loop_stop()
self._mqtt_client.disconnect()
self._influx.close()
Synchronizacja w czasie rzeczywistym i cyfrowa pętla bliźniacza
„Cyfrowa pętla bliźniacza” to ciągła pętla, która zapewnia synchronizację modelu cyfrowego z ciałem. Częstotliwość synchronizacji zależy od dynamiki procesu: dla pomiarów stanu SCADA zazwyczaj wystarcza 30-60 sekund, dla zabezpieczeń IEC 61850 spada do milisekund.
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger(__name__)
class DigitalTwinOrchestrator:
"""
Orchestratore principale del digital twin energetico.
Gestisce il loop di sincronizzazione e coordina i diversi modelli.
"""
def __init__(
self,
twin: EnergyDigitalTwin,
ingestion: EnergyDataIngestion,
thermal_model: TransformerThermalModel,
sync_interval_s: float = 30.0
):
self.twin = twin
self.ingestion = ingestion
self.thermal_model = thermal_model
self.sync_interval_s = sync_interval_s
self._running = False
self._latest_scada: dict = {}
self._latest_weather: dict = {}
# Registra callback per aggiornamenti IoT
self.ingestion.register_model_callback(self._handle_sensor_update)
def _handle_sensor_update(
self,
asset_type: str,
asset_id: str,
payload: dict
) -> None:
"""Callback IoT: aggiorna cache dati SCADA."""
measurements = payload.get("measurements", {})
if asset_type == "transformer":
self._latest_scada["transformer"] = measurements
elif asset_type == "feeder":
feeder_data = self._latest_scada.get("feeders", {})
feeder_data[asset_id] = measurements
self._latest_scada["feeders"] = feeder_data
async def run(self) -> None:
"""Loop principale del digital twin - esegue in continuo."""
self._running = True
self.ingestion.start_mqtt()
logger.info(f"Digital Twin '{self.twin.config.name}' avviato")
async with aiohttp.ClientSession() as session:
while self._running:
cycle_start = asyncio.get_event_loop().time()
try:
await self._sync_cycle(session)
except Exception as e:
logger.error(f"Errore nel ciclo di sync: {e}", exc_info=True)
# Mantieni frequenza di sync
elapsed = asyncio.get_event_loop().time() - cycle_start
sleep_time = max(0, self.sync_interval_s - elapsed)
await asyncio.sleep(sleep_time)
async def _sync_cycle(self, session: aiohttp.ClientSession) -> None:
"""Singolo ciclo di sincronizzazione."""
# 1. Aggiorna weather ogni 30 minuti
now = datetime.now(timezone.utc)
if now.minute % 30 == 0:
try:
self._latest_weather = await self.ingestion.fetch_weather_forecast(
lat=40.85, # Bari, esempio
lon=16.55,
session=session
)
except Exception as e:
logger.warning(f"Weather fetch fallito: {e}")
# 2. Assembla dati SCADA per aggiornamento modello
scada_snapshot = self._build_scada_snapshot()
# 3. Aggiorna rete elettrica nel digital twin
self.twin.update_from_scada(scada_snapshot)
# 4. Esegui power flow
pf_result = self.twin.run_power_flow()
# 5. Aggiorna modello termico trasformatore
ambient_temp = self._get_ambient_temperature()
trafo_loading = pf_result.get("transformer", {}).get("loading_percent", 0) / 100.0
thermal_state = self.thermal_model.step(
load_fraction=trafo_loading,
ambient_temp=ambient_temp,
dt_seconds=self.sync_interval_s
)
# 6. Verifica allarmi e genera alert
self._check_alerts(pf_result, thermal_state)
# 7. Pubblica stato su InfluxDB (per Grafana dashboard)
self._publish_twin_state(pf_result, thermal_state)
logger.debug(
f"Sync OK | Conv={pf_result['converged']} | "
f"HST={thermal_state['theta_winding_hst_c']}°C | "
f"Trafo={pf_result.get('transformer', {}).get('loading_percent', 0):.1f}%"
)
def _build_scada_snapshot(self) -> dict:
"""Costruisce snapshot SCADA normalizzato per il modello."""
feeders = self._latest_scada.get("feeders", {})
n_feeders = self.twin.config.n_feeders
# Estrai potenze per feeder (con fallback a ultimo valore noto)
load_per_feeder = []
for i in range(n_feeders):
feeder_key = f"feeder_{i+1:02d}"
feeder_data = feeders.get(feeder_key, {})
p_mw = feeder_data.get("active_power_mw",
self.twin.config.peak_load_mw / n_feeders * 0.7)
load_per_feeder.append(float(p_mw))
trafo_data = self._latest_scada.get("transformer", {})
return {
"load_mw_per_feeder": load_per_feeder,
"pv_mw": trafo_data.get("pv_injection_mw", 0.0),
"tap_position": int(trafo_data.get("tap_position", 0)),
"ambient_temp_c": self._get_ambient_temperature()
}
def _get_ambient_temperature(self) -> float:
"""Restituisce temperatura ambiente corrente (da weather API o default)."""
weather = self._latest_weather
if weather and "hourly_temp_c" in weather:
# Prendi la temperatura dell'ora corrente
hour_idx = datetime.now().hour
temps = weather["hourly_temp_c"]
if 0 <= hour_idx < len(temps):
return float(temps[hour_idx])
return 20.0 # default
def _check_alerts(self, pf_result: dict, thermal_state: dict) -> None:
"""Genera alert per condizioni anomale."""
alerts = []
if not pf_result.get("converged", True):
alerts.append({"severity": "CRITICAL", "msg": "Power flow non convergente"})
if thermal_state.get("thermal_limit_exceeded"):
hst = thermal_state["theta_winding_hst_c"]
alerts.append({
"severity": "WARNING",
"msg": f"HST trasformatore {hst}°C supera limite continuo 120°C"
})
trafo = pf_result.get("transformer", {})
if trafo.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Trasformatore sovraccarico: {trafo['loading_percent']:.1f}%"
})
for line in pf_result.get("lines", []):
if line.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Linea {line['name']} sovraccarica: {line['loading_percent']:.1f}%"
})
for alert in alerts:
logger.warning(f"[ALERT {alert['severity']}] {alert['msg']}")
def _publish_twin_state(self, pf_result: dict, thermal_state: dict) -> None:
"""Pubblica stato corrente su InfluxDB per dashboard Grafana."""
from influxdb_client import Point
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
points = []
# Punto power flow
trafo = pf_result.get("transformer", {})
pf_point = (
Point("digital_twin_powerflow")
.tag("substation", self.twin.config.name)
.field("converged", int(pf_result.get("converged", 0)))
.field("trafo_loading_pct", trafo.get("loading_percent", 0.0))
.field("trafo_losses_mw", trafo.get("pl_mw", 0.0))
.field("total_losses_mw", pf_result.get("losses_mw", 0.0))
.time(now)
)
points.append(pf_point)
# Punto thermal
th_point = (
Point("digital_twin_thermal")
.tag("substation", self.twin.config.name)
.field("theta_oil_c", thermal_state["theta_oil_c"])
.field("theta_hst_c", thermal_state["theta_winding_hst_c"])
.field("aaf", thermal_state["aaf"])
.field("ambient_c", thermal_state["theta_ambient_c"])
.time(now)
)
points.append(th_point)
self.ingestion._write_api.write(
bucket=self.ingestion._bucket,
org=self.ingestion._org,
record=points
)
def stop(self) -> None:
self._running = False
self.ingestion.stop()
Konserwacja predykcyjna z ML: RUL na transformatorach
Konserwacja predykcyjna to jeden z najbardziej dojrzałych przypadków użycia cyfrowych bliźniaków energia. Transformator WN/SN kosztuje od 500 000 do 5 000 000 euro: nieoczekiwana awaria spowodowana awarią zasilania, kosztami awaryjnymi i karami regulacyjnymi. Dzięki dobrze wyszkolonemu modelowi ML możesz przewidzieć awarie o 3-6 tygodni z góry, zmniejszając o 35% nieoczekiwanych awarii według danych za 2025 rok.
Inżynieria funkcji dla transformatorów
Najbardziej przewidywalne cechy stanu transformatora łączą środki analiza elektryczna, termiczna i chemiczna oleju (DGA – Analiza rozpuszczonego gazu):
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, mean_absolute_error
import xgboost as xgb
import joblib
from pathlib import Path
class TransformerHealthPredictor:
"""
Modello ML per predizione stato di salute e RUL trasformatore.
Usa Random Forest per classificazione anomalia + XGBoost per regressione RUL.
"""
# Feature set secondo IEC 60599 (interpretazione DGA) + misure elettriche/termiche
FEATURE_COLUMNS = [
# DGA - Dissolved Gas Analysis (ppm)
"h2_ppm", # idrogeno - indice scariche parziali
"ch4_ppm", # metano - surriscaldamento olio
"c2h2_ppm", # acetilene - archi elettrici (critico)
"c2h4_ppm", # etilene - surriscaldamento grave
"c2h6_ppm", # etano - surriscaldamento lieve
"co_ppm", # monossido di carbonio - degradazione cellulosa
"co2_ppm", # biossido di carbonio - degradazione cellulosa
# Misure elettriche
"load_factor", # fattore di carico medio (0-1)
"load_factor_peak", # picco fattore di carico
"tap_operations_30d", # operazioni tap changer negli ultimi 30 giorni
"voltage_thd_pct", # distorsione armonica tensione %
# Misure termiche
"avg_hst_30d", # temperatura media hot spot 30 giorni
"max_hst_30d", # picco temperatura hot spot 30 giorni
"ambient_temp_avg", # temperatura ambiente media
# Misure olio
"moisture_ppm", # umidita olio (degrado isolante)
"acidity_mg_koh_g", # acidita olio
"breakdown_voltage_kv", # rigidita dielettrica olio
# Feature ingegnerizzate
"total_dissolved_gas_ppm", # somma gas combustibili
"methane_ratio", # CH4/(CH4+C2H4) ratio Duval
"age_years", # eta trasformatore
"cumulative_aaf", # AAF cumulato (consumo vita isolante)
]
def __init__(self, model_dir: str = "models/transformer_health"):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(parents=True, exist_ok=True)
self.scaler = StandardScaler()
# Classificatore anomalia: normal / warning / critical
self.anomaly_classifier = RandomForestClassifier(
n_estimators=200,
max_depth=12,
min_samples_leaf=5,
class_weight="balanced", # gestisce sbilanciamento classi
n_jobs=-1,
random_state=42
)
# Regressore RUL: ore residue di vita isolante
self.rul_regressor = xgb.XGBRegressor(
n_estimators=300,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror",
eval_metric="mae",
random_state=42
)
self._is_fitted = False
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calcola feature derivate da misure grezze."""
result = df.copy()
# Gas combustibili totali (TDCG secondo IEEE C57.104)
gas_cols = ["h2_ppm", "ch4_ppm", "c2h2_ppm", "c2h4_ppm", "c2h6_ppm"]
available_gas = [c for c in gas_cols if c in result.columns]
result["total_dissolved_gas_ppm"] = result[available_gas].sum(axis=1)
# Ratio di Duval per diagnosi tipo di guasto
if "ch4_ppm" in result.columns and "c2h4_ppm" in result.columns:
denom = result["ch4_ppm"] + result["c2h4_ppm"]
result["methane_ratio"] = np.where(
denom > 0,
result["ch4_ppm"] / denom,
0.5
)
# Carico cumulato normalizzato (proxy degradazione)
if "avg_hst_30d" in result.columns and "age_years" in result.columns:
theta_ref = 98.0
result["cumulative_aaf"] = np.exp(
15000.0 / (theta_ref + 273.15) -
15000.0 / (result["avg_hst_30d"] + 273.15)
) * result["age_years"]
# Colonne mancanti riempite con mediana
for col in self.FEATURE_COLUMNS:
if col not in result.columns:
result[col] = result.get(col, 0.0)
return result[self.FEATURE_COLUMNS].fillna(0.0)
def fit(
self,
df_train: pd.DataFrame,
y_anomaly: pd.Series,
y_rul_hours: pd.Series
) -> dict:
"""
Addestramento entrambi i modelli.
Args:
df_train: DataFrame con misure storiche
y_anomaly: etichette {0=normal, 1=warning, 2=critical}
y_rul_hours: ore di vita residua reali (da storico guasti)
Returns:
Metriche di training
"""
X = self.engineer_features(df_train)
X_scaled = self.scaler.fit_transform(X)
# Split train/validation
X_tr, X_val, y_a_tr, y_a_val, y_r_tr, y_r_val = train_test_split(
X_scaled, y_anomaly, y_rul_hours,
test_size=0.2, random_state=42, stratify=y_anomaly
)
# Training classificatore
self.anomaly_classifier.fit(X_tr, y_a_tr)
y_pred_class = self.anomaly_classifier.predict(X_val)
cv_scores = cross_val_score(
self.anomaly_classifier, X_scaled, y_anomaly,
cv=5, scoring="f1_weighted"
)
# Training regressore RUL
self.rul_regressor.fit(
X_tr, y_r_tr,
eval_set=[(X_val, y_r_val)],
verbose=False
)
y_pred_rul = self.rul_regressor.predict(X_val)
mae_rul = mean_absolute_error(y_r_val, y_pred_rul)
self._is_fitted = True
# Salva modelli
joblib.dump(self.scaler, self.model_dir / "scaler.pkl")
joblib.dump(self.anomaly_classifier, self.model_dir / "anomaly_rf.pkl")
self.rul_regressor.save_model(str(self.model_dir / "rul_xgb.json"))
return {
"anomaly_f1_cv": float(cv_scores.mean()),
"anomaly_f1_std": float(cv_scores.std()),
"rul_mae_hours": float(mae_rul),
"n_train_samples": len(X_tr),
"feature_importances": dict(zip(
self.FEATURE_COLUMNS,
self.anomaly_classifier.feature_importances_.tolist()
))
}
def predict(self, measurements: dict) -> dict:
"""
Predizione stato salute per un trasformatore.
Args:
measurements: dizionario con misure correnti
Returns:
Stato salute, probabilità e RUL stimato
"""
if not self._is_fitted:
raise RuntimeError("Modello non addestrato. Chiama fit() prima.")
df = pd.DataFrame([measurements])
X = self.engineer_features(df)
X_scaled = self.scaler.transform(X)
anomaly_label = int(self.anomaly_classifier.predict(X_scaled)[0])
anomaly_proba = self.anomaly_classifier.predict_proba(X_scaled)[0].tolist()
rul_hours = max(0.0, float(self.rul_regressor.predict(X_scaled)[0]))
status_map = {0: "NORMAL", 1: "WARNING", 2: "CRITICAL"}
return {
"health_status": status_map[anomaly_label],
"probability_normal": round(anomaly_proba[0], 3),
"probability_warning": round(anomaly_proba[1], 3),
"probability_critical": round(anomaly_proba[2], 3),
"rul_hours": round(rul_hours, 0),
"rul_days": round(rul_hours / 24, 1),
"maintenance_urgency": "IMMEDIATE" if anomaly_label == 2 else
"PLANNED" if anomaly_label == 1 else "ROUTINE",
"top_risk_features": self._get_top_risk_features(X)
}
def _get_top_risk_features(self, X: pd.DataFrame) -> list:
"""Restituisce top-3 feature che contribuiscono al rischio."""
importances = self.anomaly_classifier.feature_importances_
feature_values = X.iloc[0].values
risk_scores = importances * np.abs(feature_values)
top_indices = np.argsort(risk_scores)[::-1][:3]
return [
{
"feature": self.FEATURE_COLUMNS[i],
"importance": round(float(importances[i]), 4),
"value": round(float(feature_values[i]), 3)
}
for i in top_indices
]
Optymalizacja sieci za pomocą PyPSA: scenariusze „co by było, gdyby”.
PyPSA (Python do analizy systemu elektroenergetycznego) i narzędzie referencyjne za optymalizację sieci energetycznych w skali kraju. Umożliwia symulację złożone scenariusze „co by było, gdyby”: dodanie generacji odnawialnej, rozbudowa sieci, integracja magazynów, optymalizacja ekonomiczna wysyłki.
import pypsa
import pandas as pd
import numpy as np
class GridExpansionTwin:
"""
Digital twin per pianificazione espansione rete con PyPSA.
Usa ottimizzazione lineare per trovare mix ottimale di generazione + storage.
"""
def __init__(self, network_name: str = "Rete_Puglia_Sud"):
self.network = pypsa.Network()
self.network.name = network_name
# Risoluzione temporale: 8760 ore (un anno)
self.network.set_snapshots(
pd.date_range("2025-01-01", periods=8760, freq="h")
)
def build_regional_network(
self,
buses_df: pd.DataFrame,
lines_df: pd.DataFrame,
generators_df: pd.DataFrame,
loads_df: pd.DataFrame
) -> None:
"""
Costruisce modello di rete da dati CIM/CSV.
Args:
buses_df: nodi con voltaggio nominale
lines_df: linee con impedenza e capacità
generators_df: generatori con curve di costo
loads_df: profili di carico orari annui
"""
# Aggiungi bus
for _, bus in buses_df.iterrows():
self.network.add(
"Bus",
name=bus["name"],
v_nom=bus["v_nom_kv"],
x=bus.get("longitude", 0.0),
y=bus.get("latitude", 0.0)
)
# Aggiungi linee
for _, line in lines_df.iterrows():
self.network.add(
"Line",
name=line["name"],
bus0=line["bus0"],
bus1=line["bus1"],
r=line["r_ohm"],
x=line["x_ohm"],
s_nom=line["s_nom_mva"],
s_nom_extendable=bool(line.get("extendable", False)),
s_nom_max=line.get("s_nom_max_mva", float("inf"))
)
# Aggiungi generatori con costi marginali
for _, gen in generators_df.iterrows():
self.network.add(
"Generator",
name=gen["name"],
bus=gen["bus"],
p_nom=gen["p_nom_mw"],
p_nom_extendable=bool(gen.get("extendable", False)),
p_nom_max=gen.get("p_nom_max_mw", float("inf")),
marginal_cost=gen.get("marginal_cost_eur_mwh", 0.0),
capital_cost=gen.get("capital_cost_eur_mw", 0.0),
carrier=gen.get("carrier", "gas"),
efficiency=gen.get("efficiency", 0.45),
committable=False
)
# Aggiungi carichi con profili orari
for _, load in loads_df.iterrows():
p_set = load.get("hourly_profile") # serie temporale 8760 valori
self.network.add(
"Load",
name=load["name"],
bus=load["bus"],
p_set=p_set if p_set is not None else load["p_mw"]
)
def add_storage(
self,
bus: str,
p_nom_mw: float,
energy_capacity_mwh: float,
capital_cost_eur_mw: float = 800_000
) -> None:
"""Aggiunge sistema di accumulo (BESS) alla rete."""
self.network.add(
"StorageUnit",
name=f"BESS_{bus}",
bus=bus,
p_nom=p_nom_mw,
p_nom_extendable=True,
p_nom_max=p_nom_mw * 3,
max_hours=energy_capacity_mwh / p_nom_mw,
capital_cost=capital_cost_eur_mw,
marginal_cost=0.0,
efficiency_store=0.92,
efficiency_dispatch=0.92,
cyclic_state_of_charge=True
)
def run_capacity_expansion(self) -> dict:
"""
Ottimizzazione espansione rete: trova investimenti ottimali
minimizzando costo totale (CAPEX + OPEX) su orizzonte annuo.
"""
self.network.optimize(
solver_name="highs", # HiGHS solver open source
solver_options={
"time_limit": 300, # max 5 minuti
"mip_gap": 0.01 # gap ottimalita 1%
}
)
# Estrai risultati
total_cost = float(self.network.objective)
# Generazione ottimale per vettore energetico
gen_by_carrier = (
self.network.generators_t.p
.multiply(self.network.snapshot_weightings.generators, axis=0)
.groupby(self.network.generators.carrier, axis=1)
.sum()
.sum()
/ 1e6 # MWh -> TWh
)
# Utilizzo linee (congestion analysis)
line_loading = (
self.network.lines_t.p0.abs() /
self.network.lines.s_nom
).max()
# capacità ottimale per nuovi impianti
new_capacity = self.network.generators[
self.network.generators.p_nom_extendable
]["p_nom_opt"]
return {
"total_system_cost_meur": round(total_cost / 1e6, 2),
"generation_twh": gen_by_carrier.to_dict(),
"congested_lines": line_loading[line_loading > 0.9].to_dict(),
"optimal_new_capacity_mw": new_capacity.to_dict(),
"co2_emissions_kt": self._calculate_emissions()
}
def simulate_n1_contingencies(self) -> pd.DataFrame:
"""
Simula tutte le contingenze N-1 e restituisce report violazioni.
Utile per security assessment della rete pianificata.
"""
results = []
lines = self.network.lines.index.tolist()
for line_name in lines:
# Clona rete e rimuovi linea
n_contingency = self.network.copy()
n_contingency.remove("Line", line_name)
try:
n_contingency.lpf() # Linearized Power Flow (più veloce)
max_loading = (
n_contingency.lines_t.p0.abs() /
n_contingency.lines.s_nom
).max().max()
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float(max_loading),
"secure": bool(max_loading <= 1.0),
"severity": "OK" if max_loading <= 1.0 else
"WARNING" if max_loading <= 1.2 else "CRITICAL"
})
except Exception as e:
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float("inf"),
"secure": False,
"severity": "CRITICAL",
"error": str(e)
})
return pd.DataFrame(results).sort_values("max_line_loading", ascending=False)
def _calculate_emissions(self) -> float:
"""Calcola emissioni CO2 totali in kt."""
emission_factors = {
"gas": 0.202, # tCO2/MWh gas CCGT
"coal": 0.820, # tCO2/MWh carbone
"nuclear": 0.012, # tCO2/MWh nucleare (lifecycle)
"wind": 0.007, # tCO2/MWh eolico
"solar": 0.040, # tCO2/MWh fotovoltaico
"hydro": 0.004 # tCO2/MWh idroelettrico
}
total_kt = 0.0
for carrier, factor in emission_factors.items():
gen_col = [
c for c in self.network.generators_t.p.columns
if self.network.generators.at[c, "carrier"] == carrier
]
if gen_col:
gen_mwh = self.network.generators_t.p[gen_col].sum().sum()
total_kt += gen_mwh * factor / 1000.0
return round(total_kt, 2)
Model informacji wspólnych (CIM): IEC 61970/61968
Il CIM (model informacji wspólnych) oraz międzynarodowy standard dla semantyczna reprezentacja aktywów sieci elektroenergetycznej. Opracowany przez IEC e utrzymywane przez ENTSO-E poprzez CGMES (wspólna wymiana modeli sieci Specyfikacja), CIM definiuje wspólną ontologię, która pozwala systemów różnych dostawców w celu wymiany modeli sieciowych bez utraty informacji.
| Standard | Zakres | Pakiet kluczy | Używam Digital Twina |
|---|---|---|---|
| IEC 61970-301 | Model sieci rdzeniowej | Pakiet Topologii, Pakiet Przewodów | Topologia sieci, impedancje, transformatory |
| IEC 61970-456 | Zmienne stanu | Pakiet zmiennych stanu | Status w czasie rzeczywistym: napięcia, przepływy, wartości zadane |
| IEC 61968-4 | Zarządzanie aktywami | Pakiet zasobów, Pakiet roboczy | Dane o zasobach, zlecenia pracy, konserwacja |
| CGMES 3.0 | Format wymiany | EQ, TP, SV, SSH, DY | Wymiana modeli pomiędzy ogólnoeuropejskimi OSP/OSD |
from dataclasses import dataclass, field
import uuid
from enum import Enum
from typing import Optional
class WindingConnection(Enum):
"""IEC CIM - Tipo connessione avvolgimento trasformatore."""
D = "D" # Delta
Y = "Y" # Wye (stella)
Z = "Z" # Zigzag
Yn = "Yn" # Wye con neutro
@dataclass
class CIMIdentifiedObject:
"""Classe base CIM per oggetti identificati."""
mrid: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
alias_name: str = ""
@dataclass
class CIMSubstation(CIMIdentifiedObject):
"""CIM Substation - rappresenta una sottostazione elettrica."""
region_name: str = ""
voltage_levels: list = field(default_factory=list)
equipment: list = field(default_factory=list)
@dataclass
class CIMPowerTransformer(CIMIdentifiedObject):
"""CIM PowerTransformer - trasformatore di potenza."""
vector_group: str = "Dyn11"
is_phase_shifting: bool = False
winding_ends: list = field(default_factory=list)
@dataclass
class CIMPowerTransformerEnd:
"""CIM PowerTransformerEnd - avvolgimento trasformatore."""
end_number: int = 1
rated_s_mva: float = 0.0
rated_u_kv: float = 0.0
r_ohm: float = 0.0
x_ohm: float = 0.0
connection_kind: WindingConnection = WindingConnection.Y
grounded: bool = False
class CIMModelBuilder:
"""
Costruisce modello CIM da dati strutturati per export CGMES.
Permette interoperabilità con EMS/SCADA di vendor diversi.
"""
def __init__(self):
self._substations: dict[str, CIMSubstation] = {}
self._transformers: dict[str, CIMPowerTransformer] = {}
def add_substation(self, name: str, region: str = "") -> CIMSubstation:
"""Crea e registra una sottostazione nel modello CIM."""
sub = CIMSubstation(name=name, region_name=region)
self._substations[sub.mrid] = sub
return sub
def add_transformer(
self,
name: str,
substation: CIMSubstation,
hv_kv: float,
lv_kv: float,
rated_mva: float,
uk_percent: float = 8.5,
vector_group: str = "Dyn11"
) -> CIMPowerTransformer:
"""Crea trasformatore con entrambi gli avvolgimenti."""
trafo = CIMPowerTransformer(name=name, vector_group=vector_group)
# Calcolo impedenze in ohm (riferite al lato HV)
z_base = (hv_kv ** 2) / rated_mva # ohm base lato HV
z_total = (uk_percent / 100.0) * z_base
r_hv = z_total * 0.1 # approssimazione: R/Z ~ 0.1 per trafo AT/MT
x_hv = z_total * 0.995
# Avvolgimento primario (HV)
end_hv = CIMPowerTransformerEnd(
end_number=1,
rated_s_mva=rated_mva,
rated_u_kv=hv_kv,
r_ohm=r_hv,
x_ohm=x_hv,
connection_kind=WindingConnection.D,
grounded=False
)
# Avvolgimento secondario (LV)
end_lv = CIMPowerTransformerEnd(
end_number=2,
rated_s_mva=rated_mva,
rated_u_kv=lv_kv,
r_ohm=0.0,
x_ohm=0.0,
connection_kind=WindingConnection.Yn,
grounded=True
)
trafo.winding_ends = [end_hv, end_lv]
self._transformers[trafo.mrid] = trafo
substation.equipment.append(trafo)
return trafo
def to_cgmes_dict(self) -> dict:
"""
Esporta modello in formato CGMES-compatible (JSON-LD semplificato).
In produzione si usa uno serializer RDF/XML conforme CGMES 3.0.
"""
return {
"@context": "https://iec.ch/TC57/CIM100",
"Substation": [
{
"@id": f"_{sub.mrid}",
"@type": "cim:Substation",
"cim:IdentifiedObject.name": sub.name,
"cim:Substation.Region": sub.region_name
}
for sub in self._substations.values()
],
"PowerTransformer": [
{
"@id": f"_{trafo.mrid}",
"@type": "cim:PowerTransformer",
"cim:IdentifiedObject.name": trafo.name,
"cim:PowerTransformer.vectorGroup": trafo.vector_group,
"cim:PowerTransformer.PowerTransformerEnd": [
{
"cim:PowerTransformerEnd.endNumber": end.end_number,
"cim:PowerTransformerEnd.ratedS":
{"value": end.rated_s_mva, "unit": "MVA"},
"cim:PowerTransformerEnd.ratedU":
{"value": end.rated_u_kv, "unit": "kV"}
}
for end in trafo.winding_ends
]
}
for trafo in self._transformers.values()
]
}
Platformy chmurowe: Azure Digital Twins vs AWS IoT TwinMaker
Wybór odpowiedniej platformy chmurowej to krytyczny krok w skalowaniu rozwiązań cyfrowych bliźniak od pilota do produkcji. Dwie wiodące platformy mają różne podejścia:
| Kryterium | Cyfrowe bliźniaki platformy Azure | TwinMaker AWS IoT |
|---|---|---|
| Model danych | DTDL (Digital Twin Definition Language), konfigurowalne ontologie | Model komponentów sceny, oparty na obszarze roboczym |
| Integracja SCADA/OT | Doskonały za pośrednictwem Azure IoT Hub + Industrial IoT OPC-UA | Powodzenia AWS IoT Core + Edge Greengrass |
| Wizualizacja 3D | HoloLens 2, Power BI, ekosystem partnerski | TwinMaker Scene Composer (Babylon.js), natywna wtyczka Grafana |
| Analityka | Eksplorator danych Azure, Synapse, Power BI | Amazon Timestream, Grafana, Athena |
| Integracja ML | Azure ML, usługi poznawcze | Amazon SageMaker |
| Cennik (podstawowy) | 0,10 USD/1000 zapytań + 0,50 USD/GB przestrzeni dyskowej/miesiąc | Odczyt wartości 0,05 USD/1000 + 0,50 USD/GB/miesiąc |
| Standardy CIM/CGMES | Azure Data Manager dla energii (OSDU) | Częściowe wsparcie poprzez niestandardowe złącza |
| Sektor energetyczny | Najlepsze: Azure Energy Data Manager, OSDU | Silny dla ogólnego OT, mniej specyficzny dla energii |
| Idealny przypadek użycia | Media, OSP/DSO, duże systemy SCADA | Farmy wiatrowe, fotowoltaika, inteligentne budynki |
Rekomendacja dla sektora energetycznego
Dla operatorów mediów i sieci (OSP/OSD) z systemami SCADA istniejące wymagania i wymagania zgodności NERC/ENTSO-E, Cyfrowe bliźniaki platformy Azure z Azure Data Manager for Energy (opartym na OSDU) zapewnia najlepszą integrację ze standardami branżowymi (CIM/CGMES). Dla operatorów wytwarzania energii odnawialnej (wiatr, słońce) na potrzeby wizualizacji 3D i integracji z Grafaną, TwinMaker AWS IoT zapewnia płynniejszą pracę programisty.
Edge Digital Twin: lekkie modele w Gateway
Nie wszystkie obliczenia mogą poczekać na chmurę w obie strony. W przypadku scenariuszy, w których opóźnienia i krytyczne (zabezpieczenia, reakcja na zapotrzebowanie, ładowanie pojazdów elektrycznych w czasie rzeczywistym), tak wdraża a krawędziowy cyfrowy bliźniak na bramie przemysłowej (Raspberry Pi 4, Siemens IPC, Beckhoff CX) z uproszczoną wersją modelu z opóźnieniem poniżej 100 ms.
import numpy as np
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class EdgeTwinConfig:
"""Configurazione edge digital twin - ottimizzato per risorse limitate."""
n_buses: int = 6 # max bus simulabili in tempo reale
max_iterations: int = 10 # Newton-Raphson: meno iterazioni
tolerance_pu: float = 1e-4 # convergenza più lassa
update_interval_ms: float = 100.0 # frequenza aggiornamento 10 Hz
class LightweightPowerFlowSolver:
"""
Solver power flow semplificato per edge computing.
Usa metodo DC (lineare) per velocità massima.
L'approssimazione DC e accurata per reti con alto X/R ratio.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self._B_matrix: Optional[np.ndarray] = None # matrice ammettenza
self._initialized = False
def build_susceptance_matrix(
self,
from_buses: list[int],
to_buses: list[int],
x_ohm: list[float],
base_mva: float = 100.0
) -> None:
"""
Costruisce matrice B (suscettanza) per DC power flow.
Complessità O(n_lines), molto più veloce del metodo NR.
"""
n = self.config.n_buses
B = np.zeros((n, n))
for fb, tb, x in zip(from_buses, to_buses, x_ohm):
if x > 1e-8:
b = base_mva / x # suscettanza in pu
B[fb][fb] += b
B[tb][tb] += b
B[fb][tb] -= b
B[tb][fb] -= b
# Rimuovi riga/colonna bus slack (bus 0)
self._B_matrix = B[1:, 1:]
self._initialized = True
def solve_dc(self, p_injections_mw: list[float], base_mva: float = 100.0) -> dict:
"""
Risolve DC power flow: O(n^2) vs O(n^3) di NR.
Adatto per hardware edge con <512MB RAM.
Args:
p_injections_mw: lista iniezioni di potenza per bus (pos=generazione, neg=carico)
base_mva: potenza base per normalizzazione
Returns:
Angoli di fase e flussi di potenza stimati
"""
if not self._initialized:
raise RuntimeError("Matrice B non inizializzata")
t_start = time.monotonic_ns()
# Converti in per-unit (escludi bus slack)
p_pu = np.array(p_injections_mw[1:]) / base_mva
# Risolvi sistema lineare: B * theta = P
try:
theta = np.linalg.solve(self._B_matrix, p_pu)
except np.linalg.LinAlgError:
return {"converged": False, "error": "Matrice singolare - rete isola"}
theta_full = np.concatenate([[0.0], theta]) # aggiungi slack bus
elapsed_us = (time.monotonic_ns() - t_start) // 1000
return {
"converged": True,
"theta_rad": theta_full.tolist(),
"theta_deg": np.degrees(theta_full).tolist(),
"solve_time_us": int(elapsed_us),
"within_latency_budget": elapsed_us < (self.config.update_interval_ms * 1000 * 0.1)
}
class EdgeDigitalTwinRuntime:
"""
Runtime edge per digital twin leggero su gateway industriale.
Gira in un loop a 10 Hz con aggiornamento modello da sensori locali.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self.solver = LightweightPowerFlowSolver(config)
self._thermal_model = TransformerThermalModel()
self._last_sync_ts: float = 0.0
self._cloud_buffer: list[dict] = []
def process_sensor_sample(self, sensors: dict) -> dict:
"""
Elabora un campione sensoriale e aggiorna il modello edge.
Latenza target: < 10ms totale.
Args:
sensors: misure correnti (tensioni, correnti, temperature)
Returns:
Stato calcolato + alert immediati
"""
t_start = time.monotonic_ns()
# Aggiorna modello termico (molto veloce, solo algebra)
load_fraction = sensors.get("trafo_loading_pu", 0.5)
ambient_temp = sensors.get("ambient_temp_c", 20.0)
thermal_state = self._thermal_model.step(
load_fraction=load_fraction,
ambient_temp=ambient_temp,
dt_seconds=self.config.update_interval_ms / 1000.0
)
# Stima veloce stato rete (DC power flow)
p_injections = sensors.get("p_injections_mw", [0.0] * self.config.n_buses)
pf_result = self.solver.solve_dc(p_injections)
# Alert immediati (priorità altissima - <1ms)
immediate_alerts = self._evaluate_fast_alerts(thermal_state, sensors)
elapsed_ms = (time.monotonic_ns() - t_start) / 1e6
result = {
"thermal": thermal_state,
"power_flow": pf_result,
"alerts": immediate_alerts,
"edge_processing_ms": round(elapsed_ms, 3),
"within_budget": elapsed_ms < 10.0
}
# Accumula per sync cloud asincrono
self._cloud_buffer.append(result)
return result
def _evaluate_fast_alerts(
self,
thermal_state: dict,
sensors: dict
) -> list[dict]:
"""Alert critica valutata in <1ms per risposta immediata."""
alerts = []
hst = thermal_state.get("theta_winding_hst_c", 0.0)
if hst > 140.0:
alerts.append({
"code": "TRAFO_EMERGENCY_TEMP",
"severity": "CRITICAL",
"value": hst,
"action": "REDUCE_LOAD_IMMEDIATELY"
})
elif hst > 120.0:
alerts.append({
"code": "TRAFO_HIGH_TEMP",
"severity": "WARNING",
"value": hst,
"action": "MONITOR_CLOSELY"
})
# Verifica sovratensione (from raw sensor)
v_pu = sensors.get("voltage_pu", 1.0)
if v_pu > 1.10 or v_pu < 0.90:
alerts.append({
"code": "VOLTAGE_OUT_OF_RANGE",
"severity": "WARNING",
"value": v_pu,
"action": "ADJUST_TAP_CHANGER"
})
return alerts
def flush_to_cloud(self) -> list[dict]:
"""Svuota buffer locale per invio al cloud (chiamata asincrona)."""
buffer = self._cloud_buffer.copy()
self._cloud_buffer.clear()
return buffer
Studium przypadku: Cyfrowa bliźniacza morska farma wiatrowa
Morska farma wiatrowa z 40 turbinami o mocy 8 MW (łącznie 320 MW) generuje: ogromna ilość danych: każda turbina posiada ponad 200 czujników (akcelerometry, tensometr, anemometry, enkodery pochylenia/odchylenia, łożyska temperaturowe, prądy generator). Cyfrowy bliźniak integruje optymalizację fizyczną, ML i optymalizację w czasie rzeczywistym aby zmaksymalizować wytwarzaną energię przy jednoczesnej minimalizacji kosztów konserwacji.
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class WindTurbineState:
"""Stato corrente di una turbina eolica."""
turbine_id: str
timestamp: str
wind_speed_ms: float
wind_direction_deg: float
power_kw: float
rotor_rpm: float
pitch_angle_deg: float
yaw_error_deg: float
nacelle_temp_c: float
gearbox_temp_c: float
generator_temp_c: float
main_bearing_vibration_g: float
tower_acceleration_x_g: float
tower_acceleration_y_g: float
operational_status: str # PRODUCING, CURTAILED, FAULT, MAINTENANCE
class OffshoreWindFarmTwin:
"""
Digital twin parco eolico offshore.
Gestisce 40 turbine con simulazione wake effect e predictive maintenance.
"""
# Curva di potenza Vestas V236-15.0 MW (approssimazione)
POWER_CURVE_MS = [0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 25]
POWER_CURVE_KW = [0, 0, 90, 310, 680, 1250, 2100, 3200, 4600, 6200, 7800, 8000, 8000]
def __init__(self, farm_name: str, n_turbines: int = 40):
self.farm_name = farm_name
self.n_turbines = n_turbines
self.turbine_states: dict[str, WindTurbineState] = {}
self._power_interpolator = self._build_power_curve_interp()
def _build_power_curve_interp(self):
"""Costruisce interpolatore curva di potenza."""
from scipy.interpolate import interp1d
return interp1d(
self.POWER_CURVE_MS,
self.POWER_CURVE_KW,
kind="cubic",
bounds_error=False,
fill_value=(0.0, 8000.0)
)
def calculate_wake_effect(
self,
turbine_positions: list[tuple[float, float]],
free_stream_wind_ms: float,
wind_direction_deg: float
) -> list[float]:
"""
Calcola velocità del vento per ogni turbina considerando il wake effect.
Usa modello Jensen (Park model) semplificato.
Args:
turbine_positions: lista (x, y) in metri
free_stream_wind_ms: velocità vento libero
wind_direction_deg: direzione vento (0=Nord)
Returns:
Lista velocità vento effettiva per ogni turbina
"""
n = len(turbine_positions)
effective_wind = [free_stream_wind_ms] * n
k_wake = 0.04 # costante espansione scia offshore
rotor_d = 236.0 # diametro rotore Vestas V236 in metri
ct = 0.79 # coefficiente di thrust a rated wind speed
# Converti direzione vento in vettore
wind_rad = np.radians(wind_direction_deg)
wind_vec = np.array([np.sin(wind_rad), np.cos(wind_rad)])
for i, (xi, yi) in enumerate(turbine_positions):
for j, (xj, yj) in enumerate(turbine_positions):
if i == j:
continue
# Vettore da turbina j a turbina i
delta = np.array([xi - xj, yi - yj])
dist = np.linalg.norm(delta)
if dist < 1.0:
continue
# Componente nella direzione del vento
downstream = np.dot(delta, wind_vec)
if downstream <= 0:
continue # i non e downstream di j
# Distanza laterale dalla scia
lateral = abs(np.cross(delta / dist, wind_vec)) * dist
# Raggio della scia a distanza downstream
wake_radius = rotor_d / 2.0 + k_wake * downstream
# Overlap fraction (semplificato: cilindrico)
if lateral < wake_radius:
overlap = max(0.0, 1.0 - lateral / wake_radius)
# Deficit di velocità Jensen model
deficit = (1.0 - np.sqrt(1.0 - ct)) * (rotor_d / (rotor_d + 2 * k_wake * downstream)) ** 2
effective_wind[i] = min(effective_wind[i], free_stream_wind_ms * (1.0 - overlap * deficit))
return effective_wind
def estimate_farm_production(
self,
wind_speed_ms: float,
wind_direction_deg: float,
turbine_positions: list[tuple[float, float]],
availability: Optional[list[float]] = None
) -> dict:
"""
Stima produzione totale del parco con wake model.
Args:
wind_speed_ms: velocità vento a hub height
wind_direction_deg: direzione vento
turbine_positions: coordinate turbine
availability: disponibilità per turbina (0-1)
Returns:
Produzione totale, per turbina, e perdite wake
"""
if availability is None:
availability = [1.0] * self.n_turbines
# Calcola velocità effettiva per ogni turbina
effective_winds = self.calculate_wake_effect(
turbine_positions, wind_speed_ms, wind_direction_deg
)
# Calcola potenza per turbina dalla curva
powers_kw = []
for i, (v_eff, avail) in enumerate(zip(effective_winds, availability)):
p = float(self._power_interpolator(v_eff)) * avail
powers_kw.append(p)
total_power_mw = sum(powers_kw) / 1000.0
ideal_power_mw = float(self._power_interpolator(wind_speed_ms)) * self.n_turbines / 1000.0
wake_loss_pct = max(0.0, (ideal_power_mw - total_power_mw) / ideal_power_mw * 100) if ideal_power_mw > 0 else 0.0
return {
"total_power_mw": round(total_power_mw, 2),
"ideal_power_mw": round(ideal_power_mw, 2),
"wake_loss_pct": round(wake_loss_pct, 2),
"capacity_factor": round(total_power_mw / (self.n_turbines * 8.0) * 100, 1),
"per_turbine_kw": [round(p, 1) for p in powers_kw],
"effective_wind_ms": [round(v, 2) for v in effective_winds]
}
def detect_yaw_misalignment(
self,
turbine_id: str,
lidar_wind_dir: float,
nacelle_dir: float,
threshold_deg: float = 5.0
) -> dict:
"""
Rileva disallineamento yaw da dati LIDAR + encoder nacelle.
Un yaw error di 10° riduce la produzione del ~3% (cos^2 law).
"""
yaw_error = abs(lidar_wind_dir - nacelle_dir)
# Normalizza nell'intervallo -180/+180
if yaw_error > 180:
yaw_error = 360 - yaw_error
# Stima perdita di produzione (legge del coseno)
production_loss_pct = (1 - np.cos(np.radians(yaw_error)) ** 3) * 100
return {
"turbine_id": turbine_id,
"yaw_error_deg": round(yaw_error, 2),
"production_loss_pct": round(production_loss_pct, 2),
"action_required": yaw_error > threshold_deg,
"priority": "HIGH" if yaw_error > 15.0 else
"MEDIUM" if yaw_error > threshold_deg else "NONE"
}
Wydajność, skalowalność i bezpieczeństwo
Wymagania dotyczące wydajności
| Część | Docelowe opóźnienie | Częstotliwość aktualizacji | Przechowywanie/dzień |
|---|---|---|---|
| Edge twin (przepływ prądu stałego) | < 10 ms | 10 Hz | 50 MB/turbina |
| Przepływ mocy NR (chmura) | < 500 ms | 0,03 Hz (30 s) | 5 MB/podstację |
| Wnioskowanie ML | < 100 ms | 1 Hz | 2 MB/zasób |
| Synchronizacja wizualizacji 3D | < 1s | 0,1 Hz (10 s) | nieistotny |
| Eksport modelu CGMES | < 5 s | 1/godzina | 100MB/sieć |
Bezpieczeństwo: Ochrona danych SCADA
Ostrzeżenie: cyfrowe bliźniaki jako wektor ataku
Cyfrowy bliźniak energii i wartościowy cel cyberataków: zawiera kompletną topologię sieci, słabe punkty i wzorce zasobów ochrona. Dyrektywa NIS2 (obowiązująca we Włoszech od 2024 r.) nakłada wymagania rygorystyczne dla operatorów usług kluczowych w energetyce:
- Segmentacja sieci: warstwa danych DT musi znajdować się w strefie DMZ oddzielonej od sieci OT
- Dostęp bez zaufania: każdy komponent uwierzytelnia, bez ukrytego zaufania
- Oczyszczanie danych: dane SCADA do chmury DT muszą przejść przez diodę danych (fizyczna jednokierunkowa)
- Ścieżki audytu: aby zachować zgodność, każde zapytanie dotyczące modelu musi być zalogowane
- Szyfrowanie w stanie spoczynku: Modele CIM/CGMES zawierają wrażliwe dane topologiczne
# Architettura di sicurezza raccomandata per Digital Twin energetico
#
# OT Network (IEC 62443) DMZ Cloud
# ┌─────────────────────┐ ┌──────────────┐ ┌───────────────┐
# │ IED / RTU / SCADA │───►│ Data Diode │───►│ Data Broker │
# │ (Modbus, IEC 61850) │ │ (unidirez.) │ │ (Kafka/MQTT) │
# └─────────────────────┘ └──────────────┘ └───────┬───────┘
# │ TLS 1.3
# ┌─────────────────────┐ ┌──────────────┐ ┌───────▼───────┐
# │ Historian (locale) │ │ Firewall │ │ DT Engine │
# │ OSIsoft PI / AVEVA │ │ IPS/IDS │ │ (Azure DT / │
# └─────────────────────┘ └──────────────┘ │ AWS TwinMkr) │
# └───────────────┘
#
# Regole chiave:
# - Nessuna connessione INBOUND verso OT network
# - Autenticazione mTLS per tutti i servizi DT
# - RBAC: operatore vede solo i propri asset
# - Dati storici: retention 7 anni (requisito ARERA in Italia)
# - Backup modello CIM: immutabile su S3/ADLS con versioning
# Esempio: validazione input da SCADA prima di aggiornare il twin
import re
from typing import Any
SCADA_FIELD_LIMITS = {
"load_mw": (0.0, 1000.0),
"voltage_kv": (0.0, 500.0),
"tap_position": (-16, 16),
"ambient_temp_c": (-40.0, 60.0),
"current_ka": (0.0, 10.0)
}
def validate_scada_measurement(field: str, value: Any) -> float:
"""
Valida misura SCADA prima dell'ingestion nel digital twin.
Previene injection di valori anomali (tampering / errori sensore).
"""
if not isinstance(value, (int, float)):
raise ValueError(f"Campo {field}: tipo non valido {type(value)}")
value = float(value)
if not (-1e9 < value < 1e9): # sanity check overflow
raise ValueError(f"Campo {field}: overflow numerico {value}")
if field in SCADA_FIELD_LIMITS:
lo, hi = SCADA_FIELD_LIMITS[field]
if not (lo <= value <= hi):
raise ValueError(
f"Campo {field}: valore {value} fuori range fisico [{lo}, {hi}]"
)
return value
Przyszłość: stowarzyszone cyfrowe bliźniaki i sieć wzmocniona sztuczną inteligencją
Pojawiające się trendy na lata 2026–2028 w zakresie cyfrowych bliźniaków energii:
Trend 2026-2028: Cyfrowy bliźniak nowej generacji
- Sfederowane cyfrowe bliźniaki: cyfrowe bliźniaki OSP i DSO kilka z nich współpracuje, zachowując prywatność danych, stosując odpowiednie techniki stowarzyszonego uczenia się w celu aktualizacji udostępnionych modeli bez udostępniania wrażliwe dane dotyczące topologii sieci.
- Modele fizyczne wspomagane sztuczną inteligencją: sieci neuronowe fizycznie (PINN – sieci neuronowe zorientowane na fizykę), które łączą Równania Maxwella/Kirchhoffa z zaobserwowanymi danymi, redukujące o 60% błąd symulacji w porównaniu z samymi modelami analitycznymi.
- Autonomiczny bliźniak sieci: cyfrowy bliźniak z agentami RL (Uczenie się przez wzmacnianie), które realizują autonomiczne działania kontrolne (przełącznik zaczepów, odciążanie, wysyłka magazynu) optymalizacja w czasie prawdziwe pod nadzorem człowieka.
- Cyfrowy bliźniak jako usługa (DTaaS) dla MŚP: platformy SaaS oferujące wstępnie skonfigurowane cyfrowe bliźniaki dla zakładów przemysłowych średnie (1-50 MW), obniżające barierę wejścia z 500 tys. do 50 tys. euro.
- 5G Ultra-Reliable Low-Latency: connessione 5G URLLC (latenza <1ms, affidabilità 99.999%) permettera edge twin su campo senza gateway intermedi, con sync diretto sensore-cloud.
Risorse e Riferimenti
| Risorsa | Tipo | Utilizzo |
|---|---|---|
| pandapower.readthedocs.io | Documentazione | Power flow, optimal power flow, short circuit |
| pypsa.readthedocs.io | Documentazione | Capacity expansion, unit commitment, multi-period |
| IEC 60076-7:2018 | Standard | Modello termico trasformatori di potenza |
| IEC 60599:2015 | Standard | Interpretazione DGA per diagnosi trasformatori |
| ENTSO-E CIM / CGMES 3.0 | Standard | Modello dati rete, scambio tra TSO/DSO |
| Azure Digital Twins docs | Cloud Platform | DTDL, ontologie CIM, Azure IoT Hub |
| AWS IoT TwinMaker docs | Cloud Platform | Scene Composer, Grafana plugin, Timestream |
| OpenModelica | Tool open source | Simulazione fisica complessa (Modelica language) |
| Direttiva NIS2 (EU 2022/2555) | Normativa | Cybersecurity per operatori servizi essenziali |
Conclusioni
Il digital twin per infrastruttura energetica rappresenta oggi una delle tecnologie più trasformative del settore: il mercato da $34 miliardi nel 2025 e in crescita al 34,7% annuo, trainato dalla transizione energetica, dalla complessità crescente delle reti con DER e dalla pressione regolatoria su affidabilità e sicurezza.
Abbiamo visto come implementare un digital twin enterprise-grade con: pandapower per il power flow Newton-Raphson in tempo reale, il modello termico IEC 60076-7 per i trasformatori, PyPSA per l'ottimizzazione della pianificazione rete, modelli ML con Random Forest e XGBoost per la manutenzione predittiva, e il modello CIM/CGMES per l'interoperabilità tra sistemi.
La chiave del successo e un'architettura a layer ben separata: fisica, dati, modello e visualizzazione devono avere interfacce chiare, permettendo di evolvere ogni componente indipendentemente. Il digital shadow (L1) e il primo passo accessibile; il twin prescrittivo (L4) e l'obiettivo finale dove l'infrastruttura si auto-ottimizza.
Prossimo Articolo della Serie
Il prossimo e ultimo articolo della serie EnergyTech esplora Blockchain per Trading Energetico P2P: Smart Contract e Vincoli: architettura smart contract Solidity per trading peer-to-peer di energia, settlement automatico on-chain e navigazione dei vincoli normativi europei.
Serie Correlate
- MLOps: aby wprowadzić modele ML konserwacji predykcyjnej w produkcji z MLflow, DVC i Kubernetes
- Inżynieria sztucznej inteligencji: wdrożyć RAG w dokumentacji inżynieria instalacji i LLM do analizy raportów o błędach
- Biznes związany z danymi i sztuczną inteligencją: w zakresie zarządzania danymi dotyczącymi energii oraz budowanie potoków ETL/ELT na potrzeby analiz







