Enerji Altyapısı için Dijital İkiz: Gerçek Zamanlı Simülasyon
Enerji sektörü için küresel dijital ikiz pazarı, 2025'te 34,1 milyar dolar ve 2034 yılına kadar %34,7'lik bir Bileşik Büyüme Oranı ile büyüyecek, Fortune Business Insights'a göre. Bu geleceğe dair bir söz değil: kamu hizmetleri Avrupa ülkeleri hâlihazırda HV/MV trafo merkezleri ve rüzgar santrallerinin dijital ikizlerini işletiyor Saniyede binlerce ölçüm noktasını senkronize eden açık deniz ve dağıtım ağları. İtalya'da Terna ve Enel, şebeke için pilot dijital ikiz programları başlattı Enerji dönüşümüne yönelik PNRR girişimlerinin bir parçası olarak ulusal iletim.
Un enerji dijital ikizi bu sadece bir simülasyon modeli değil statik. Bitkinin zaman içindeki fiziksel durumunu yansıtan çift yönlü bir sistemdir. gerçek, tahmine dayalı simülasyonlar çalıştırmanıza, olası senaryoları değerlendirmenize ve komut vermenize olanak tanır Geri bildirim döngüsü yoluyla fiziksel varlıklar. Basit bir SCADA ile arasındaki fark çok derin: SCADA'nın izlediği ve kontrol ettiği yerde dijital ikiz kavrar, benzetmek e tahmin eder.
Bu makalede dijital ikizlerin altyapı için pratik uygulamasını araştırıyoruz enerji: elektrik ağlarının fiziksel modellemesinden panda gücü e PyPSA, SCADA/IoT ile entegrasyon, kestirimci bakım ML'li transformatörlerde, gerçek HV/MV trafo merkezleri ve açık deniz rüzgar santralleri vakalarına kadar.
Ne Öğreneceksiniz
- Dijital ikiz taksonomisi: tanımlayıcı, bilgilendirici, tahmine dayalı, kuralcı
- Katman mimarisi: Fiziksel Varlık, Veri Katmanı, Model Katmanı, Görselleştirme
- Fiziksel modeller: Newton-Raphson güç akışı, termal model transformatörler, bozulma eğrileri
- SCADA, IoT sensörleri, hava durumu API'si ve akıllı sayaçlardan veri alımı
- Python'da pandapower ve PyPSA ile ağ simülasyonu
- Gerçek zamanlı senkronizasyon: dijital gölge ve çift yönlü dijital ikiz
- Tahmine dayalı bakım: Transformatörlerde RUL için Rastgele Orman ve XGBoost
- Olursa senaryoları: N-1 beklenmedik durumlar ve ağ genişletme planlaması
- Model birlikte çalışabilirliği için CIM IEC 61970/61968 ve CGMES
- Azure Digital Twins ve AWS IoT TwinMaker karşılaştırması: bulut platformu seçimi
- Örnek olay: YG/MV trafo merkezi ve açık deniz rüzgar santrali
- Endüstriyel ağ geçitlerinde minimum gecikme için uç dijital ikiz
EnergyTech Serisi: Dijital Enerji Üzerine 10 Makale
Bu serinin dokuzuncu makalesi EnerjiTekniği, protokollere adanmış, enerji yönetimini dönüştüren yazılım mimarilerine ve algoritmalara Enerji geçişi çağında elektrik.
| # | Öğe | Teknolojiler | Seviye |
|---|---|---|---|
| 1 | OCPP 2.x Protokolü: EV Şarj Sistemleri Oluşturmak | OCPP, WebSocket, Python, ISO 15118 | Gelişmiş |
| 2 | DERMS Mimarisi: Milyonlarca Dağıtılmış Kaynağı Bir araya Getirme | DERMS, REST, MQTT, PostgreSQL | Gelişmiş |
| 3 | ML ile Yenilenebilir Enerji Tahmini: Güneş ve Rüzgar için LSTM | Python, LSTM, Peygamber, FastAPI | Gelişmiş |
| 4 | Şebeke Ölçeğinde Depolama için Pil Yönetim Sistemi | BMS, SoC/SoH, Python, CAN veriyolu | Gelişmiş |
| 5 | Yazılım Mühendisleri için IEC 61850: Akıllı Şebeke İletişimi | IEC 61850, KAZ, MMS, SCADA | Gelişmiş |
| 6 | EV Şarj Yükü Dengeleme: Gerçek Zamanlı Algoritmalar | Algoritmalar, Python, OCPP, WebSocket | Gelişmiş |
| 7 | MQTT'den InfluxDB'ye: Gerçek Zamanlı Enerji IoT Platformu | MQTT, InfluxDB, Telegraf, Grafana | Orta seviye |
| 8 | Karbon Muhasebe Yazılım Mimarisi: ESG Platformları | Sera Gazı Protokolü, Python, API, raporlama | Orta seviye |
| 9 | Enerji Altyapısı için Dijital İkiz (buradasınız) | pandapower, PyPSA, ML, Azure DT, Python | Gelişmiş |
| 10 | P2P Enerji Ticareti için Blockchain: Akıllı Sözleşmeler ve Kısıtlamalar | Sağlamlık, Ethereum, Akıllı Sözleşmeler | Gelişmiş |
Enerji Dijital İkiz Taksonomisi
Tüm "dijital ikizler" eşit yaratılmamıştır. Grieten-Kritzinger çerçevesi benimsendi IEC ve birçok endüstriyel tedarikçi tarafından dört olgunluk düzeyi tanımlanmaktadır. veri akışının yönüne ve işleme kapasitesine göre:
| Seviye | Tip | Veri Akışı | kapasite | Enerji Örneği |
|---|---|---|---|---|
| L1 | Dijital Gölge (Açıklayıcı) | Fiziksel → Dijital | Mevcut durum izleme | Gerçek zamanlı SCADA kontrol paneli |
| L2 | Bilgilendirici Dijital İkiz | Fiziksel ↔ Dijital (tarihsel) | Tarihsel analiz, KPI'lar, trendler | Varlık sağlığı izleme transformatörü |
| L3 | Tahmine Dayalı Dijital İkiz | Fiziksel ↔ Dijital + ML | Arıza tahmini, RUL, simülasyon | HV kablonun faydalı ömür tahmini |
| L4 | Dijital İkiz Kuralcı | Fiziksel ↔ Dijital (çift yönlü aktif) | Optimum eylemlerin tavsiye edilmesi/uygulanması | Otomatik bakım yeniden planlama |
Dijital Gölge ve Dijital İkiz
Un dijital gölge vücuttan veri toplar ancak onu etkilemez (tek yönlü akış). A dijital ikiz gerçek bir var Geri bildirim kanalı: fiziksel varlığa ayar noktaları, komutlar veya parametreler gönderebilir, kontrol döngüsünün kapatılması. Pratikte çoğu endüstriyel sistem bugün L1-L2'de faaliyet göstermektedir ve 2027 yılına kadar L3-L4'e ulaşma hedefi bulunmaktadır.
Enerji Dijital İkizinin Katman Mimarisi
Enerji altyapısına yönelik kurumsal düzeyde bir dijital ikiz dört bileşenden oluşur: her biri iyi tanımlanmış sorumluluklara sahip farklı katmanlar:
Katman 1: Fiziksel Varlıklar
Cihazlı fiziksel varlıklar: DGA (Çözünmüş Gaz) sensörlü HV/MV transformatörleri Analiz), akım trafolu baralar, sıcaklık sensörlü havai hatlar ve rüzgar, titreşim izlemeli jeneratörler, SoC/SoH sensörlü piller. Katman fizikçi değişken frekanslarda ham veri üretir: sıcaklık için 1 örnek/saatten IEC 61850 koruma önlemleri için 1 örnek/ms'de ortam.
Katman 2: Veri Katmanı
Veri alımı, normalleştirme ve depolama katmanı. Şunları içerir:
- SCADA/EMS: DNP3, Modbus, IEC 61850 aracılığıyla gerçek zamanlı operasyonel veriler
- Nesnelerin İnterneti Platformu: MQTT/AMQP aracılığıyla kablosuz sensörlerden bulut aracısına
- Zaman Serisi Veritabanı: Yüksek hızlı telemetri için InfluxDB veya TimescaleDB
- Veri Gölü: ML eğitimi için geçmiş depolama (S3, ADLS)
- Hava Durumu API'si: Termal modeller ve güneş/rüzgar tahmini için meteorolojik veriler
- Akıllı Sayaçlar: DLMS/COSEM aracılığıyla toplanan yük profilleri
Katman 3: Model Katmanı
Hesaplamanın kalbi: davranışı kopyalayan fiziksel ve makine öğrenimi modelleri varlıkların. Güç akışı çözücüyü, termal modelleri içerir, tahmine dayalı bakım için bozulma ve ML modelleri.
Katman 4: Görselleştirme ve Entegrasyon
Operasyonel kontrol panelleri (Grafana, Power BI), trafo merkezlerinin 3 boyutlu oluşturulması (Three.js, Unity, Siemens NX), üçüncü taraf sistemler (ERP, CMMS, EMS) için REST/WebSocket API ve kuralcı kontrol için arayüzler.
Fiziksel Modeller: Newton-Raphson ile Güç Akışı
Herhangi bir elektrik şebekesi dijital ikizinin kalbi, güç akışı çözücü. Newton-Raphson algoritması dengeyi tanımlayan doğrusal olmayan denklemleri çözer Ağın her bir düğümünde (veriyolunda) aktif ve reaktif güç. İle panda gücü eksiksiz bir trafo merkezi modeli oluşturabilir ve davranışını simüle edebiliriz birkaç milisaniye içinde.
Kurulum ve Kurulum
# Installazione dependencies
pip install pandapower pypsa numpy pandas scikit-learn xgboost
pip install influxdb-client paho-mqtt asyncio aiohttp
pip install plotly dash # per visualizzazione
# Versioni consigliate (2025)
# pandapower==2.14.x
# pypsa==0.31.x
# scikit-learn==1.5.x
# xgboost==2.1.x
Pandapower ile ağ modelleme
Dağıtım hatlarıyla birlikte bir HV/MV trafo merkezinin eksiksiz bir modelini oluşturuyoruz, transformatörler, yükler ve dağıtılmış jeneratörler:
import pandapower as pp
import pandapower.networks as pn
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
import json
@dataclass
class SubstationConfig:
"""Configurazione sottostazione AT/MT"""
name: str
voltage_hv_kv: float = 132.0 # Alta Tensione kV
voltage_mv_kv: float = 20.0 # Media Tensione kV
transformer_mva: float = 40.0 # Potenza trasformatore MVA
n_feeders: int = 6 # Numero feeder MT
peak_load_mw: float = 28.0 # Carico picco MW
pv_capacity_mw: float = 4.5 # Fotovoltaico connesso MW
class EnergyDigitalTwin:
"""
Digital Twin per sottostazione AT/MT con simulazione power flow real-time.
Implementa aggiornamento continuo dallo stato SCADA.
"""
def __init__(self, config: SubstationConfig):
self.config = config
self.net = pp.create_empty_network(name=config.name)
self._build_network_model()
self._state_cache: dict = {}
def _build_network_model(self) -> None:
"""Costruisce il modello di rete dalla configurazione."""
cfg = self.config
# Bus AT (slack bus = riferimento di tensione)
self.bus_ht = pp.create_bus(
self.net,
vn_kv=cfg.voltage_hv_kv,
name="Bus_AT_Main",
type="b"
)
# Bus MT primario (lato secondario trasformatore)
self.bus_mt_primary = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name="Bus_MT_Sbarra",
type="b"
)
# Trasformatore AT/MT principale
pp.create_transformer_from_parameters(
self.net,
hv_bus=self.bus_ht,
lv_bus=self.bus_mt_primary,
sn_mva=cfg.transformer_mva,
vn_hv_kv=cfg.voltage_hv_kv,
vn_lv_kv=cfg.voltage_mv_kv,
vkr_percent=0.3, # perdite rame %
vk_percent=8.5, # tensione di cortocircuito %
pfe_kw=35.0, # perdite ferro kW
i0_percent=0.08, # corrente a vuoto %
name="TR_AT_MT_Main",
tap_pos=0, # posizione tap changer
tap_neutral=0,
tap_min=-8,
tap_max=8,
tap_step_percent=1.25
)
# Rete esterna AT (equivalente di Thevenin della rete)
pp.create_ext_grid(
self.net,
bus=self.bus_ht,
vm_pu=1.0,
va_degree=0.0,
name="Rete_Trasmissione",
s_sc_max_mva=3500.0, # potenza di cortocircuito rete
rx_max=0.1
)
# Crea feeder MT con carichi e generazione
self.feeder_buses = []
self.load_ids = []
self.pv_ids = []
for i in range(cfg.n_feeders):
# Bus feeder
bus_feeder = pp.create_bus(
self.net,
vn_kv=cfg.voltage_mv_kv,
name=f"Bus_Feeder_{i+1:02d}"
)
self.feeder_buses.append(bus_feeder)
# Linea MT dal bus principale al feeder (cavo XLPE 150mm2)
pp.create_line_from_parameters(
self.net,
from_bus=self.bus_mt_primary,
to_bus=bus_feeder,
length_km=2.5 + i * 0.8,
r_ohm_per_km=0.206, # resistenza XLPE 150mm2
x_ohm_per_km=0.083,
c_nf_per_km=320.0,
max_i_ka=0.261,
name=f"Linea_Feeder_{i+1:02d}"
)
# Carico per feeder (distribuzione uniforme + varianza)
load_mw = cfg.peak_load_mw / cfg.n_feeders * (0.8 + 0.4 * np.random.rand())
load_id = pp.create_load(
self.net,
bus=bus_feeder,
p_mw=load_mw,
q_mvar=load_mw * 0.3, # power factor ~ 0.958
name=f"Carico_Feeder_{i+1:02d}",
controllable=False
)
self.load_ids.append(load_id)
# Generatore fotovoltaico (bus MT primario)
pv_id = pp.create_sgen(
self.net,
bus=self.bus_mt_primary,
p_mw=cfg.pv_capacity_mw,
q_mvar=0.0,
name="FV_Rooftop",
type="PV",
controllable=True
)
self.pv_ids.append(pv_id)
def update_from_scada(self, scada_data: dict) -> None:
"""
Aggiorna il modello con dati real-time da SCADA/IoT.
Args:
scada_data: dizionario con misure correnti
- load_mw_per_feeder: lista potenze attive
- pv_mw: generazione fotovoltaica attuale
- tap_position: posizione tap changer
- ambient_temp_c: temperatura ambiente
"""
# Aggiorna carichi per feeder
loads = scada_data.get("load_mw_per_feeder", [])
for i, (load_id, p_mw) in enumerate(zip(self.load_ids, loads)):
q_mvar = p_mw * 0.30 # power factor fisso (da modello)
self.net.load.at[load_id, "p_mw"] = p_mw
self.net.load.at[load_id, "q_mvar"] = q_mvar
# Aggiorna generazione PV
pv_mw = scada_data.get("pv_mw", 0.0)
if self.pv_ids:
self.net.sgen.at[self.pv_ids[0], "p_mw"] = pv_mw
# Aggiorna posizione tap changer
tap = scada_data.get("tap_position", 0)
self.net.trafo.at[0, "tap_pos"] = np.clip(tap, -8, 8)
# Cache stato per confronto
self._state_cache = scada_data.copy()
def run_power_flow(self) -> dict:
"""
Esegue simulazione Newton-Raphson e restituisce risultati.
Returns:
Dizionario con tensioni, correnti, perdite e stato componenti
"""
try:
pp.runpp(
self.net,
algorithm="nr", # Newton-Raphson
calculate_voltage_angles=True,
max_iteration=50,
tolerance_mva=1e-8,
enforce_q_lims=True
)
results = {
"converged": True,
"buses": self._extract_bus_results(),
"lines": self._extract_line_results(),
"transformer": self._extract_trafo_results(),
"losses_mw": float(self.net.res_ext_grid["p_mw"].sum() -
self.net.res_load["p_mw"].sum() +
self.net.res_sgen["p_mw"].sum()),
"timestamp": pd.Timestamp.now().isoformat()
}
return results
except pp.LoadflowNotConverged:
return {
"converged": False,
"error": "Power flow non convergente - possibile violazione limiti",
"timestamp": pd.Timestamp.now().isoformat()
}
def _extract_bus_results(self) -> list:
results = []
for idx, row in self.net.res_bus.iterrows():
bus_name = self.net.bus.at[idx, "name"]
vn_kv = self.net.bus.at[idx, "vn_kv"]
results.append({
"bus_id": int(idx),
"name": bus_name,
"vm_pu": float(row["vm_pu"]),
"vm_kv": float(row["vm_pu"] * vn_kv),
"va_degree": float(row["va_degree"]),
"within_limits": 0.95 <= float(row["vm_pu"]) <= 1.05
})
return results
def _extract_line_results(self) -> list:
results = []
for idx, row in self.net.res_line.iterrows():
results.append({
"line_id": int(idx),
"name": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"]),
"i_ka": float(row["i_ka"]),
"pl_mw": float(row["pl_mw"]),
"overloaded": float(row["loading_percent"]) > 100.0
})
return results
def _extract_trafo_results(self) -> dict:
row = self.net.res_trafo.iloc[0]
return {
"loading_percent": float(row["loading_percent"]),
"pl_mw": float(row["pl_mw"]),
"i_hv_ka": float(row["i_hv_ka"]),
"i_lv_ka": float(row["i_lv_ka"]),
"overloaded": float(row["loading_percent"]) > 100.0
}
def check_n1_contingency(self, line_to_remove: int) -> dict:
"""
Simulazione contingenza N-1: rimuove una linea e verifica la tenuta della rete.
Args:
line_to_remove: ID della linea da togliere fuori servizio
Returns:
Risultati power flow post-contingenza con violazioni
"""
# Salva stato originale
original_in_service = self.net.line.at[line_to_remove, "in_service"]
# Applica contingenza
self.net.line.at[line_to_remove, "in_service"] = False
try:
pp.runpp(self.net, algorithm="nr", max_iteration=50)
violations = []
for idx, row in self.net.res_bus.iterrows():
if not (0.90 <= float(row["vm_pu"]) <= 1.10):
violations.append({
"type": "voltage",
"bus": self.net.bus.at[idx, "name"],
"vm_pu": float(row["vm_pu"])
})
for idx, row in self.net.res_line.iterrows():
if float(row["loading_percent"]) > 120.0:
violations.append({
"type": "overload",
"line": self.net.line.at[idx, "name"],
"loading_percent": float(row["loading_percent"])
})
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": True,
"n_violations": len(violations),
"violations": violations,
"secure": len(violations) == 0
}
except pp.LoadflowNotConverged:
result = {
"contingency": f"N-1 Line {line_to_remove}",
"converged": False,
"secure": False,
"error": "Rete non sicura: power flow non convergente post-contingenza"
}
finally:
# Ripristina stato originale
self.net.line.at[line_to_remove, "in_service"] = original_in_service
return result
Transformatörün Termal Modeli
Transformatör bir trafo merkezindeki en kritik varlıktır. Onun sağlık durumu doğrudan yağın ve sargıların sıcaklığına bağlıdır. Standart IEC 60076-7 Temel analitik termal modeli tanımlar çoğu endüstriyel transformatör dijital ikizinden biridir.
import numpy as np
from dataclasses import dataclass
@dataclass
class TransformerThermalModel:
"""
Modello termico trasformatore secondo IEC 60076-7.
Implementa il modello esponenziale per calcolo temperatura olio e avvolgimenti.
"""
# Parametri costruttivi (da nameplate / misure FAT)
rated_power_mva: float = 40.0
theta_amb_ref: float = 20.0 # temperatura ambiente riferimento
delta_theta_or: float = 55.0 # sopratemperatura olio a pieno carico
delta_theta_hr: float = 23.0 # gradiente avvolgimento-olio a pieno carico
tau_o: float = 3.0 * 3600 # costante di tempo termica olio [s]
tau_w: float = 10 * 60 # costante di tempo avvolgimenti [s]
n_exp: float = 0.9 # esponente di carico olio (ONAN)
m_exp: float = 0.8 # esponente di carico avvolgimenti
r_load: float = 6.0 # rapporto perdite carico/vuoto
# Stato iniziale
theta_oil: float = 20.0 # temperatura olio corrente
theta_winding: float = 20.0 # temperatura avvolgimento corrente
def step(
self,
load_fraction: float,
ambient_temp: float,
dt_seconds: float = 60.0
) -> dict:
"""
Aggiornamento modello con passo temporale dt.
Args:
load_fraction: carico normalizzato (0=vuoto, 1=pieno carico)
ambient_temp: temperatura ambiente in gradi C
dt_seconds: passo temporale in secondi
Returns:
Stato termico aggiornato con temperatura olio, avvolgimenti e HST
"""
# Sopratemperatura olio a regime per questo carico
k = load_fraction # fattore di carico
delta_theta_o_steady = self.delta_theta_or * (
(1 + self.r_load * k**2) / (1 + self.r_load)
) ** self.n_exp
# Dinamica olio (prima equazione differenziale)
d_theta_oil = (
(ambient_temp + delta_theta_o_steady - self.theta_oil) / self.tau_o
) * dt_seconds
self.theta_oil = self.theta_oil + d_theta_oil
# Gradiente avvolgimento a regime
delta_theta_h_steady = self.delta_theta_hr * (k ** (2 * self.m_exp))
# Hot Spot Temperature (HST) - temperatura punto caldo avvolgimento
# Dinamica più rapida degli avvolgimenti
d_theta_winding = (
(self.theta_oil + delta_theta_h_steady - self.theta_winding) / self.tau_w
) * dt_seconds
self.theta_winding = self.theta_winding + d_theta_winding
# Aging Acceleration Factor (AAF) per calcolo consumo vita isolante
# Riferimento: theta_ref = 98°C per isolante classe A
theta_ref = 98.0
aaf = np.exp(
15000.0 / (theta_ref + 273.0) - 15000.0 / (self.theta_winding + 273.0)
)
return {
"theta_oil_c": round(self.theta_oil, 2),
"theta_winding_hst_c": round(self.theta_winding, 2),
"theta_ambient_c": ambient_temp,
"load_fraction": load_fraction,
"aaf": round(float(aaf), 4),
"thermal_limit_exceeded": self.theta_winding > 120.0, # limite continuo
"emergency_limit_exceeded": self.theta_winding > 140.0 # limite emergenza
}
def calculate_rul_hours(self, avg_hst: float) -> float:
"""
Stima Remaining Useful Life (RUL) dell'isolante cellulosico.
Basato sul modello di Arrhenius (IEC 60076-7 Section 8).
Args:
avg_hst: temperatura media Hot Spot in gradi C
Returns:
Ore di vita residua stimate
"""
# Vita nominale isolante a 98°C = 65.000 ore (IEC 60076-7)
nominal_life_hours = 65_000.0
theta_ref = 98.0
# Aging rate normalizzato
per_unit_life_remaining = 1.0 # assume vita residua 100%
aaf_avg = np.exp(
15000.0 / (theta_ref + 273.15) - 15000.0 / (avg_hst + 273.15)
)
# Ore residue
rul_hours = nominal_life_hours * per_unit_life_remaining / aaf_avg
return round(rul_hours, 0)
Veri Kullanımı: SCADA ve Nesnelerin İnterneti'nden Dijital İkiz'e
Dijital ikizin kalitesi doğrudan verinin kalitesine ve gecikmesine bağlıdır girişte. Gerçek bir enerji sisteminde heterojen kaynakları yönetmek zorundayız: IEC 61850/DNP3 aracılığıyla RTU/IED, MQTT aracılığıyla IoT sensörleri, REST API aracılığıyla hava durumu tahmini, DLMS/COSEM aracılığıyla akıllı sayaç.
import asyncio
import aiohttp
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
from datetime import datetime, timezone
from typing import Callable
class EnergyDataIngestion:
"""
Gestione ingestione dati multi-sorgente per digital twin energetico.
Supporta MQTT (IoT sensors), REST API (weather, market), SCADA (simulato).
"""
def __init__(
self,
influx_url: str,
influx_token: str,
influx_org: str,
influx_bucket: str,
mqtt_broker: str,
mqtt_port: int = 1883
):
# InfluxDB client per time-series storage
self._influx = InfluxDBClient(
url=influx_url,
token=influx_token,
org=influx_org
)
self._write_api = self._influx.write_api(write_options=SYNCHRONOUS)
self._bucket = influx_bucket
self._org = influx_org
# MQTT client per sensori IoT
self._mqtt_client = mqtt.Client(client_id="dt-ingestion-01")
self._mqtt_client.on_connect = self._on_mqtt_connect
self._mqtt_client.on_message = self._on_mqtt_message
self._mqtt_broker = mqtt_broker
self._mqtt_port = mqtt_port
# Callback per aggiornare il modello in tempo reale
self._model_update_callbacks: list[Callable] = []
def register_model_callback(self, cb: Callable) -> None:
"""Registra callback chiamata a ogni aggiornamento dati."""
self._model_update_callbacks.append(cb)
def _on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
# Sottoscrizione ai topic energetici
topics = [
"substation/+/transformer/+/measurements",
"substation/+/feeder/+/measurements",
"substation/+/weather/measurements",
"substation/+/pv/measurements"
]
for topic in topics:
client.subscribe(topic, qos=1)
print(f"MQTT connesso al broker - subscribed a {len(topics)} topic")
def _on_mqtt_message(self, client, userdata, msg):
"""Elaborazione messaggio MQTT con parsing e storage."""
try:
payload = json.loads(msg.payload.decode())
topic_parts = msg.topic.split("/")
# Estrai metadati dal topic
substation_id = topic_parts[1]
asset_type = topic_parts[2]
asset_id = topic_parts[3]
# Crea punto InfluxDB
point = (
Point(asset_type)
.tag("substation", substation_id)
.tag("asset_id", asset_id)
.time(
datetime.fromisoformat(
payload.get("timestamp", datetime.now(timezone.utc).isoformat())
)
)
)
# Aggiungi misure come fields
for key, value in payload.get("measurements", {}).items():
if isinstance(value, (int, float)):
point = point.field(key, float(value))
# Scrivi su InfluxDB
self._write_api.write(
bucket=self._bucket,
org=self._org,
record=point
)
# Notifica callbacks (aggiornamento modello)
for cb in self._model_update_callbacks:
cb(asset_type, asset_id, payload)
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Errore parsing messaggio MQTT: {e} - topic: {msg.topic}")
async def fetch_weather_forecast(
self,
lat: float,
lon: float,
session: aiohttp.ClientSession
) -> dict:
"""
Recupera previsioni meteo da Open-Meteo API (gratuita, no API key).
Usata per correzione temperatura ambiente nel modello termico trasformatore.
"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,windspeed_10m,direct_radiation",
"forecast_days": 3,
"timezone": "Europe/Rome"
}
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.json()
return {
"hourly_temp_c": data["hourly"]["temperature_2m"],
"hourly_wind_ms": data["hourly"]["windspeed_10m"],
"hourly_radiation_wm2": data["hourly"]["direct_radiation"],
"times": data["hourly"]["time"]
}
else:
raise ValueError(f"Weather API error: HTTP {resp.status}")
def start_mqtt(self) -> None:
"""Avvia connessione MQTT in thread separato."""
self._mqtt_client.connect(self._mqtt_broker, self._mqtt_port, keepalive=60)
self._mqtt_client.loop_start()
def stop(self) -> None:
"""Cleanup risorse."""
self._mqtt_client.loop_stop()
self._mqtt_client.disconnect()
self._influx.close()
Gerçek Zamanlı Senkronizasyon ve Dijital İkiz Döngü
"Dijital ikiz döngü", dijital modeli senkronize tutan sürekli döngüdür vücutla. Senkronizasyon sıklığı sürecin dinamiğine bağlıdır: SCADA durum ölçümleri için koruma amacıyla genellikle 30-60 saniye yeterlidir IEC 61850 milisaniyeye düşer.
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger(__name__)
class DigitalTwinOrchestrator:
"""
Orchestratore principale del digital twin energetico.
Gestisce il loop di sincronizzazione e coordina i diversi modelli.
"""
def __init__(
self,
twin: EnergyDigitalTwin,
ingestion: EnergyDataIngestion,
thermal_model: TransformerThermalModel,
sync_interval_s: float = 30.0
):
self.twin = twin
self.ingestion = ingestion
self.thermal_model = thermal_model
self.sync_interval_s = sync_interval_s
self._running = False
self._latest_scada: dict = {}
self._latest_weather: dict = {}
# Registra callback per aggiornamenti IoT
self.ingestion.register_model_callback(self._handle_sensor_update)
def _handle_sensor_update(
self,
asset_type: str,
asset_id: str,
payload: dict
) -> None:
"""Callback IoT: aggiorna cache dati SCADA."""
measurements = payload.get("measurements", {})
if asset_type == "transformer":
self._latest_scada["transformer"] = measurements
elif asset_type == "feeder":
feeder_data = self._latest_scada.get("feeders", {})
feeder_data[asset_id] = measurements
self._latest_scada["feeders"] = feeder_data
async def run(self) -> None:
"""Loop principale del digital twin - esegue in continuo."""
self._running = True
self.ingestion.start_mqtt()
logger.info(f"Digital Twin '{self.twin.config.name}' avviato")
async with aiohttp.ClientSession() as session:
while self._running:
cycle_start = asyncio.get_event_loop().time()
try:
await self._sync_cycle(session)
except Exception as e:
logger.error(f"Errore nel ciclo di sync: {e}", exc_info=True)
# Mantieni frequenza di sync
elapsed = asyncio.get_event_loop().time() - cycle_start
sleep_time = max(0, self.sync_interval_s - elapsed)
await asyncio.sleep(sleep_time)
async def _sync_cycle(self, session: aiohttp.ClientSession) -> None:
"""Singolo ciclo di sincronizzazione."""
# 1. Aggiorna weather ogni 30 minuti
now = datetime.now(timezone.utc)
if now.minute % 30 == 0:
try:
self._latest_weather = await self.ingestion.fetch_weather_forecast(
lat=40.85, # Bari, esempio
lon=16.55,
session=session
)
except Exception as e:
logger.warning(f"Weather fetch fallito: {e}")
# 2. Assembla dati SCADA per aggiornamento modello
scada_snapshot = self._build_scada_snapshot()
# 3. Aggiorna rete elettrica nel digital twin
self.twin.update_from_scada(scada_snapshot)
# 4. Esegui power flow
pf_result = self.twin.run_power_flow()
# 5. Aggiorna modello termico trasformatore
ambient_temp = self._get_ambient_temperature()
trafo_loading = pf_result.get("transformer", {}).get("loading_percent", 0) / 100.0
thermal_state = self.thermal_model.step(
load_fraction=trafo_loading,
ambient_temp=ambient_temp,
dt_seconds=self.sync_interval_s
)
# 6. Verifica allarmi e genera alert
self._check_alerts(pf_result, thermal_state)
# 7. Pubblica stato su InfluxDB (per Grafana dashboard)
self._publish_twin_state(pf_result, thermal_state)
logger.debug(
f"Sync OK | Conv={pf_result['converged']} | "
f"HST={thermal_state['theta_winding_hst_c']}°C | "
f"Trafo={pf_result.get('transformer', {}).get('loading_percent', 0):.1f}%"
)
def _build_scada_snapshot(self) -> dict:
"""Costruisce snapshot SCADA normalizzato per il modello."""
feeders = self._latest_scada.get("feeders", {})
n_feeders = self.twin.config.n_feeders
# Estrai potenze per feeder (con fallback a ultimo valore noto)
load_per_feeder = []
for i in range(n_feeders):
feeder_key = f"feeder_{i+1:02d}"
feeder_data = feeders.get(feeder_key, {})
p_mw = feeder_data.get("active_power_mw",
self.twin.config.peak_load_mw / n_feeders * 0.7)
load_per_feeder.append(float(p_mw))
trafo_data = self._latest_scada.get("transformer", {})
return {
"load_mw_per_feeder": load_per_feeder,
"pv_mw": trafo_data.get("pv_injection_mw", 0.0),
"tap_position": int(trafo_data.get("tap_position", 0)),
"ambient_temp_c": self._get_ambient_temperature()
}
def _get_ambient_temperature(self) -> float:
"""Restituisce temperatura ambiente corrente (da weather API o default)."""
weather = self._latest_weather
if weather and "hourly_temp_c" in weather:
# Prendi la temperatura dell'ora corrente
hour_idx = datetime.now().hour
temps = weather["hourly_temp_c"]
if 0 <= hour_idx < len(temps):
return float(temps[hour_idx])
return 20.0 # default
def _check_alerts(self, pf_result: dict, thermal_state: dict) -> None:
"""Genera alert per condizioni anomale."""
alerts = []
if not pf_result.get("converged", True):
alerts.append({"severity": "CRITICAL", "msg": "Power flow non convergente"})
if thermal_state.get("thermal_limit_exceeded"):
hst = thermal_state["theta_winding_hst_c"]
alerts.append({
"severity": "WARNING",
"msg": f"HST trasformatore {hst}°C supera limite continuo 120°C"
})
trafo = pf_result.get("transformer", {})
if trafo.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Trasformatore sovraccarico: {trafo['loading_percent']:.1f}%"
})
for line in pf_result.get("lines", []):
if line.get("overloaded"):
alerts.append({
"severity": "WARNING",
"msg": f"Linea {line['name']} sovraccarica: {line['loading_percent']:.1f}%"
})
for alert in alerts:
logger.warning(f"[ALERT {alert['severity']}] {alert['msg']}")
def _publish_twin_state(self, pf_result: dict, thermal_state: dict) -> None:
"""Pubblica stato corrente su InfluxDB per dashboard Grafana."""
from influxdb_client import Point
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
points = []
# Punto power flow
trafo = pf_result.get("transformer", {})
pf_point = (
Point("digital_twin_powerflow")
.tag("substation", self.twin.config.name)
.field("converged", int(pf_result.get("converged", 0)))
.field("trafo_loading_pct", trafo.get("loading_percent", 0.0))
.field("trafo_losses_mw", trafo.get("pl_mw", 0.0))
.field("total_losses_mw", pf_result.get("losses_mw", 0.0))
.time(now)
)
points.append(pf_point)
# Punto thermal
th_point = (
Point("digital_twin_thermal")
.tag("substation", self.twin.config.name)
.field("theta_oil_c", thermal_state["theta_oil_c"])
.field("theta_hst_c", thermal_state["theta_winding_hst_c"])
.field("aaf", thermal_state["aaf"])
.field("ambient_c", thermal_state["theta_ambient_c"])
.time(now)
)
points.append(th_point)
self.ingestion._write_api.write(
bucket=self.ingestion._bucket,
org=self.ingestion._org,
record=points
)
def stop(self) -> None:
self._running = False
self.ingestion.stop()
ML ile Kestirimci Bakım: Transformatörlerde RUL
Kestirimci bakım, dijital ikizlerin en olgun kullanım örneklerinden biridir enerji. Bir HV/MV transformatörünün maliyeti 500.000 ila 5.000.000 Euro arasındadır: elektrik kesintisi, acil durum maliyetleri ve düzenleyici cezalar nedeniyle beklenmedik arızalar. İyi eğitilmiş bir ML modeliyle arızaları 3-6 haftaya kadar tahmin edebilirsiniz önceden azaltarak %35 beklenmeyen arızalar 2025 verilerine göre.
Transformatörler için Özellik Mühendisliği
Bir transformatörün sağlık birleşimi önlemleri için en öngörücü özellikler yağın elektriksel, termal ve kimyasal analizi (DGA - Çözünmüş Gaz Analizi):
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, mean_absolute_error
import xgboost as xgb
import joblib
from pathlib import Path
class TransformerHealthPredictor:
"""
Modello ML per predizione stato di salute e RUL trasformatore.
Usa Random Forest per classificazione anomalia + XGBoost per regressione RUL.
"""
# Feature set secondo IEC 60599 (interpretazione DGA) + misure elettriche/termiche
FEATURE_COLUMNS = [
# DGA - Dissolved Gas Analysis (ppm)
"h2_ppm", # idrogeno - indice scariche parziali
"ch4_ppm", # metano - surriscaldamento olio
"c2h2_ppm", # acetilene - archi elettrici (critico)
"c2h4_ppm", # etilene - surriscaldamento grave
"c2h6_ppm", # etano - surriscaldamento lieve
"co_ppm", # monossido di carbonio - degradazione cellulosa
"co2_ppm", # biossido di carbonio - degradazione cellulosa
# Misure elettriche
"load_factor", # fattore di carico medio (0-1)
"load_factor_peak", # picco fattore di carico
"tap_operations_30d", # operazioni tap changer negli ultimi 30 giorni
"voltage_thd_pct", # distorsione armonica tensione %
# Misure termiche
"avg_hst_30d", # temperatura media hot spot 30 giorni
"max_hst_30d", # picco temperatura hot spot 30 giorni
"ambient_temp_avg", # temperatura ambiente media
# Misure olio
"moisture_ppm", # umidita olio (degrado isolante)
"acidity_mg_koh_g", # acidita olio
"breakdown_voltage_kv", # rigidita dielettrica olio
# Feature ingegnerizzate
"total_dissolved_gas_ppm", # somma gas combustibili
"methane_ratio", # CH4/(CH4+C2H4) ratio Duval
"age_years", # eta trasformatore
"cumulative_aaf", # AAF cumulato (consumo vita isolante)
]
def __init__(self, model_dir: str = "models/transformer_health"):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(parents=True, exist_ok=True)
self.scaler = StandardScaler()
# Classificatore anomalia: normal / warning / critical
self.anomaly_classifier = RandomForestClassifier(
n_estimators=200,
max_depth=12,
min_samples_leaf=5,
class_weight="balanced", # gestisce sbilanciamento classi
n_jobs=-1,
random_state=42
)
# Regressore RUL: ore residue di vita isolante
self.rul_regressor = xgb.XGBRegressor(
n_estimators=300,
max_depth=6,
learning_rate=0.05,
subsample=0.8,
colsample_bytree=0.8,
objective="reg:squarederror",
eval_metric="mae",
random_state=42
)
self._is_fitted = False
def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calcola feature derivate da misure grezze."""
result = df.copy()
# Gas combustibili totali (TDCG secondo IEEE C57.104)
gas_cols = ["h2_ppm", "ch4_ppm", "c2h2_ppm", "c2h4_ppm", "c2h6_ppm"]
available_gas = [c for c in gas_cols if c in result.columns]
result["total_dissolved_gas_ppm"] = result[available_gas].sum(axis=1)
# Ratio di Duval per diagnosi tipo di guasto
if "ch4_ppm" in result.columns and "c2h4_ppm" in result.columns:
denom = result["ch4_ppm"] + result["c2h4_ppm"]
result["methane_ratio"] = np.where(
denom > 0,
result["ch4_ppm"] / denom,
0.5
)
# Carico cumulato normalizzato (proxy degradazione)
if "avg_hst_30d" in result.columns and "age_years" in result.columns:
theta_ref = 98.0
result["cumulative_aaf"] = np.exp(
15000.0 / (theta_ref + 273.15) -
15000.0 / (result["avg_hst_30d"] + 273.15)
) * result["age_years"]
# Colonne mancanti riempite con mediana
for col in self.FEATURE_COLUMNS:
if col not in result.columns:
result[col] = result.get(col, 0.0)
return result[self.FEATURE_COLUMNS].fillna(0.0)
def fit(
self,
df_train: pd.DataFrame,
y_anomaly: pd.Series,
y_rul_hours: pd.Series
) -> dict:
"""
Addestramento entrambi i modelli.
Args:
df_train: DataFrame con misure storiche
y_anomaly: etichette {0=normal, 1=warning, 2=critical}
y_rul_hours: ore di vita residua reali (da storico guasti)
Returns:
Metriche di training
"""
X = self.engineer_features(df_train)
X_scaled = self.scaler.fit_transform(X)
# Split train/validation
X_tr, X_val, y_a_tr, y_a_val, y_r_tr, y_r_val = train_test_split(
X_scaled, y_anomaly, y_rul_hours,
test_size=0.2, random_state=42, stratify=y_anomaly
)
# Training classificatore
self.anomaly_classifier.fit(X_tr, y_a_tr)
y_pred_class = self.anomaly_classifier.predict(X_val)
cv_scores = cross_val_score(
self.anomaly_classifier, X_scaled, y_anomaly,
cv=5, scoring="f1_weighted"
)
# Training regressore RUL
self.rul_regressor.fit(
X_tr, y_r_tr,
eval_set=[(X_val, y_r_val)],
verbose=False
)
y_pred_rul = self.rul_regressor.predict(X_val)
mae_rul = mean_absolute_error(y_r_val, y_pred_rul)
self._is_fitted = True
# Salva modelli
joblib.dump(self.scaler, self.model_dir / "scaler.pkl")
joblib.dump(self.anomaly_classifier, self.model_dir / "anomaly_rf.pkl")
self.rul_regressor.save_model(str(self.model_dir / "rul_xgb.json"))
return {
"anomaly_f1_cv": float(cv_scores.mean()),
"anomaly_f1_std": float(cv_scores.std()),
"rul_mae_hours": float(mae_rul),
"n_train_samples": len(X_tr),
"feature_importances": dict(zip(
self.FEATURE_COLUMNS,
self.anomaly_classifier.feature_importances_.tolist()
))
}
def predict(self, measurements: dict) -> dict:
"""
Predizione stato salute per un trasformatore.
Args:
measurements: dizionario con misure correnti
Returns:
Stato salute, probabilità e RUL stimato
"""
if not self._is_fitted:
raise RuntimeError("Modello non addestrato. Chiama fit() prima.")
df = pd.DataFrame([measurements])
X = self.engineer_features(df)
X_scaled = self.scaler.transform(X)
anomaly_label = int(self.anomaly_classifier.predict(X_scaled)[0])
anomaly_proba = self.anomaly_classifier.predict_proba(X_scaled)[0].tolist()
rul_hours = max(0.0, float(self.rul_regressor.predict(X_scaled)[0]))
status_map = {0: "NORMAL", 1: "WARNING", 2: "CRITICAL"}
return {
"health_status": status_map[anomaly_label],
"probability_normal": round(anomaly_proba[0], 3),
"probability_warning": round(anomaly_proba[1], 3),
"probability_critical": round(anomaly_proba[2], 3),
"rul_hours": round(rul_hours, 0),
"rul_days": round(rul_hours / 24, 1),
"maintenance_urgency": "IMMEDIATE" if anomaly_label == 2 else
"PLANNED" if anomaly_label == 1 else "ROUTINE",
"top_risk_features": self._get_top_risk_features(X)
}
def _get_top_risk_features(self, X: pd.DataFrame) -> list:
"""Restituisce top-3 feature che contribuiscono al rischio."""
importances = self.anomaly_classifier.feature_importances_
feature_values = X.iloc[0].values
risk_scores = importances * np.abs(feature_values)
top_indices = np.argsort(risk_scores)[::-1][:3]
return [
{
"feature": self.FEATURE_COLUMNS[i],
"importance": round(float(importances[i]), 4),
"value": round(float(feature_values[i]), 3)
}
for i in top_indices
]
PyPSA ile Ağ Optimizasyonu: Olursa Olur Senaryoları
PyPSA (Güç Sistemi Analizi için Python) ve referans aracı Ulusal ölçekte enerji ağlarının optimizasyonu için. Simüle etmenizi sağlar karmaşık senaryolar: yenilenebilir enerji üretiminin eklenmesi, ağ genişletme, depolama entegrasyonu, sevkiyatın ekonomik optimizasyonu.
import pypsa
import pandas as pd
import numpy as np
class GridExpansionTwin:
"""
Digital twin per pianificazione espansione rete con PyPSA.
Usa ottimizzazione lineare per trovare mix ottimale di generazione + storage.
"""
def __init__(self, network_name: str = "Rete_Puglia_Sud"):
self.network = pypsa.Network()
self.network.name = network_name
# Risoluzione temporale: 8760 ore (un anno)
self.network.set_snapshots(
pd.date_range("2025-01-01", periods=8760, freq="h")
)
def build_regional_network(
self,
buses_df: pd.DataFrame,
lines_df: pd.DataFrame,
generators_df: pd.DataFrame,
loads_df: pd.DataFrame
) -> None:
"""
Costruisce modello di rete da dati CIM/CSV.
Args:
buses_df: nodi con voltaggio nominale
lines_df: linee con impedenza e capacità
generators_df: generatori con curve di costo
loads_df: profili di carico orari annui
"""
# Aggiungi bus
for _, bus in buses_df.iterrows():
self.network.add(
"Bus",
name=bus["name"],
v_nom=bus["v_nom_kv"],
x=bus.get("longitude", 0.0),
y=bus.get("latitude", 0.0)
)
# Aggiungi linee
for _, line in lines_df.iterrows():
self.network.add(
"Line",
name=line["name"],
bus0=line["bus0"],
bus1=line["bus1"],
r=line["r_ohm"],
x=line["x_ohm"],
s_nom=line["s_nom_mva"],
s_nom_extendable=bool(line.get("extendable", False)),
s_nom_max=line.get("s_nom_max_mva", float("inf"))
)
# Aggiungi generatori con costi marginali
for _, gen in generators_df.iterrows():
self.network.add(
"Generator",
name=gen["name"],
bus=gen["bus"],
p_nom=gen["p_nom_mw"],
p_nom_extendable=bool(gen.get("extendable", False)),
p_nom_max=gen.get("p_nom_max_mw", float("inf")),
marginal_cost=gen.get("marginal_cost_eur_mwh", 0.0),
capital_cost=gen.get("capital_cost_eur_mw", 0.0),
carrier=gen.get("carrier", "gas"),
efficiency=gen.get("efficiency", 0.45),
committable=False
)
# Aggiungi carichi con profili orari
for _, load in loads_df.iterrows():
p_set = load.get("hourly_profile") # serie temporale 8760 valori
self.network.add(
"Load",
name=load["name"],
bus=load["bus"],
p_set=p_set if p_set is not None else load["p_mw"]
)
def add_storage(
self,
bus: str,
p_nom_mw: float,
energy_capacity_mwh: float,
capital_cost_eur_mw: float = 800_000
) -> None:
"""Aggiunge sistema di accumulo (BESS) alla rete."""
self.network.add(
"StorageUnit",
name=f"BESS_{bus}",
bus=bus,
p_nom=p_nom_mw,
p_nom_extendable=True,
p_nom_max=p_nom_mw * 3,
max_hours=energy_capacity_mwh / p_nom_mw,
capital_cost=capital_cost_eur_mw,
marginal_cost=0.0,
efficiency_store=0.92,
efficiency_dispatch=0.92,
cyclic_state_of_charge=True
)
def run_capacity_expansion(self) -> dict:
"""
Ottimizzazione espansione rete: trova investimenti ottimali
minimizzando costo totale (CAPEX + OPEX) su orizzonte annuo.
"""
self.network.optimize(
solver_name="highs", # HiGHS solver open source
solver_options={
"time_limit": 300, # max 5 minuti
"mip_gap": 0.01 # gap ottimalita 1%
}
)
# Estrai risultati
total_cost = float(self.network.objective)
# Generazione ottimale per vettore energetico
gen_by_carrier = (
self.network.generators_t.p
.multiply(self.network.snapshot_weightings.generators, axis=0)
.groupby(self.network.generators.carrier, axis=1)
.sum()
.sum()
/ 1e6 # MWh -> TWh
)
# Utilizzo linee (congestion analysis)
line_loading = (
self.network.lines_t.p0.abs() /
self.network.lines.s_nom
).max()
# capacità ottimale per nuovi impianti
new_capacity = self.network.generators[
self.network.generators.p_nom_extendable
]["p_nom_opt"]
return {
"total_system_cost_meur": round(total_cost / 1e6, 2),
"generation_twh": gen_by_carrier.to_dict(),
"congested_lines": line_loading[line_loading > 0.9].to_dict(),
"optimal_new_capacity_mw": new_capacity.to_dict(),
"co2_emissions_kt": self._calculate_emissions()
}
def simulate_n1_contingencies(self) -> pd.DataFrame:
"""
Simula tutte le contingenze N-1 e restituisce report violazioni.
Utile per security assessment della rete pianificata.
"""
results = []
lines = self.network.lines.index.tolist()
for line_name in lines:
# Clona rete e rimuovi linea
n_contingency = self.network.copy()
n_contingency.remove("Line", line_name)
try:
n_contingency.lpf() # Linearized Power Flow (più veloce)
max_loading = (
n_contingency.lines_t.p0.abs() /
n_contingency.lines.s_nom
).max().max()
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float(max_loading),
"secure": bool(max_loading <= 1.0),
"severity": "OK" if max_loading <= 1.0 else
"WARNING" if max_loading <= 1.2 else "CRITICAL"
})
except Exception as e:
results.append({
"contingency": f"N-1: {line_name}",
"max_line_loading": float("inf"),
"secure": False,
"severity": "CRITICAL",
"error": str(e)
})
return pd.DataFrame(results).sort_values("max_line_loading", ascending=False)
def _calculate_emissions(self) -> float:
"""Calcola emissioni CO2 totali in kt."""
emission_factors = {
"gas": 0.202, # tCO2/MWh gas CCGT
"coal": 0.820, # tCO2/MWh carbone
"nuclear": 0.012, # tCO2/MWh nucleare (lifecycle)
"wind": 0.007, # tCO2/MWh eolico
"solar": 0.040, # tCO2/MWh fotovoltaico
"hydro": 0.004 # tCO2/MWh idroelettrico
}
total_kt = 0.0
for carrier, factor in emission_factors.items():
gen_col = [
c for c in self.network.generators_t.p.columns
if self.network.generators.at[c, "carrier"] == carrier
]
if gen_col:
gen_mwh = self.network.generators_t.p[gen_col].sum().sum()
total_kt += gen_mwh * factor / 1000.0
return round(total_kt, 2)
Ortak Bilgi Modeli (CIM): IEC 61970/61968
Il CIM (Ortak Bilgi Modeli) ve uluslararası standart elektrik şebekesi varlıklarının anlamsal temsili. IEC e tarafından geliştirildi ENTSO-E tarafından sürdürülmektedir. CGMES (Ortak Izgara Model Değişimi) Şartname)CIM, izin veren ortak bir ontoloji tanımlar Bilgi kaybı olmadan ağ modellerini değiştirmek için farklı satıcıların sistemleri.
| Standart | Kapsam | Anahtar Paketi | Digital Twin kullanıyorum |
|---|---|---|---|
| IEC 61970-301 | Çekirdek Ağ Modeli | Topoloji Paketi, Teller Paketi | Ağ topolojisi, empedanslar, transformatörler |
| IEC 61970-456 | Durum Değişkenleri | Durum Değişkenleri Paketi | Gerçek zamanlı durum: gerilimler, akışlar, ayar noktaları |
| IEC 61968-4 | Varlık Yönetimi | Varlık Paketi, İş Paketi | Varlık verileri, iş emirleri, bakım |
| CGMES3.0 | Değişim Formatı | EQ, TP, SV, SSH, DY | Pan-Avrupa TSO'ları/DSO'ları arasında model alışverişi |
from dataclasses import dataclass, field
import uuid
from enum import Enum
from typing import Optional
class WindingConnection(Enum):
"""IEC CIM - Tipo connessione avvolgimento trasformatore."""
D = "D" # Delta
Y = "Y" # Wye (stella)
Z = "Z" # Zigzag
Yn = "Yn" # Wye con neutro
@dataclass
class CIMIdentifiedObject:
"""Classe base CIM per oggetti identificati."""
mrid: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
alias_name: str = ""
@dataclass
class CIMSubstation(CIMIdentifiedObject):
"""CIM Substation - rappresenta una sottostazione elettrica."""
region_name: str = ""
voltage_levels: list = field(default_factory=list)
equipment: list = field(default_factory=list)
@dataclass
class CIMPowerTransformer(CIMIdentifiedObject):
"""CIM PowerTransformer - trasformatore di potenza."""
vector_group: str = "Dyn11"
is_phase_shifting: bool = False
winding_ends: list = field(default_factory=list)
@dataclass
class CIMPowerTransformerEnd:
"""CIM PowerTransformerEnd - avvolgimento trasformatore."""
end_number: int = 1
rated_s_mva: float = 0.0
rated_u_kv: float = 0.0
r_ohm: float = 0.0
x_ohm: float = 0.0
connection_kind: WindingConnection = WindingConnection.Y
grounded: bool = False
class CIMModelBuilder:
"""
Costruisce modello CIM da dati strutturati per export CGMES.
Permette interoperabilità con EMS/SCADA di vendor diversi.
"""
def __init__(self):
self._substations: dict[str, CIMSubstation] = {}
self._transformers: dict[str, CIMPowerTransformer] = {}
def add_substation(self, name: str, region: str = "") -> CIMSubstation:
"""Crea e registra una sottostazione nel modello CIM."""
sub = CIMSubstation(name=name, region_name=region)
self._substations[sub.mrid] = sub
return sub
def add_transformer(
self,
name: str,
substation: CIMSubstation,
hv_kv: float,
lv_kv: float,
rated_mva: float,
uk_percent: float = 8.5,
vector_group: str = "Dyn11"
) -> CIMPowerTransformer:
"""Crea trasformatore con entrambi gli avvolgimenti."""
trafo = CIMPowerTransformer(name=name, vector_group=vector_group)
# Calcolo impedenze in ohm (riferite al lato HV)
z_base = (hv_kv ** 2) / rated_mva # ohm base lato HV
z_total = (uk_percent / 100.0) * z_base
r_hv = z_total * 0.1 # approssimazione: R/Z ~ 0.1 per trafo AT/MT
x_hv = z_total * 0.995
# Avvolgimento primario (HV)
end_hv = CIMPowerTransformerEnd(
end_number=1,
rated_s_mva=rated_mva,
rated_u_kv=hv_kv,
r_ohm=r_hv,
x_ohm=x_hv,
connection_kind=WindingConnection.D,
grounded=False
)
# Avvolgimento secondario (LV)
end_lv = CIMPowerTransformerEnd(
end_number=2,
rated_s_mva=rated_mva,
rated_u_kv=lv_kv,
r_ohm=0.0,
x_ohm=0.0,
connection_kind=WindingConnection.Yn,
grounded=True
)
trafo.winding_ends = [end_hv, end_lv]
self._transformers[trafo.mrid] = trafo
substation.equipment.append(trafo)
return trafo
def to_cgmes_dict(self) -> dict:
"""
Esporta modello in formato CGMES-compatible (JSON-LD semplificato).
In produzione si usa uno serializer RDF/XML conforme CGMES 3.0.
"""
return {
"@context": "https://iec.ch/TC57/CIM100",
"Substation": [
{
"@id": f"_{sub.mrid}",
"@type": "cim:Substation",
"cim:IdentifiedObject.name": sub.name,
"cim:Substation.Region": sub.region_name
}
for sub in self._substations.values()
],
"PowerTransformer": [
{
"@id": f"_{trafo.mrid}",
"@type": "cim:PowerTransformer",
"cim:IdentifiedObject.name": trafo.name,
"cim:PowerTransformer.vectorGroup": trafo.vector_group,
"cim:PowerTransformer.PowerTransformerEnd": [
{
"cim:PowerTransformerEnd.endNumber": end.end_number,
"cim:PowerTransformerEnd.ratedS":
{"value": end.rated_s_mva, "unit": "MVA"},
"cim:PowerTransformerEnd.ratedU":
{"value": end.rated_u_kv, "unit": "kV"}
}
for end in trafo.winding_ends
]
}
for trafo in self._transformers.values()
]
}
Bulut Platformları: Azure Digital Twins ve AWS IoT TwinMaker karşılaştırması
Doğru bulut platformunu seçmek, dijital ortamı ölçeklendirmede kritik bir adımdır Pilottan üretime ikiz. Önde gelen iki platformun farklı yaklaşımları var:
| Kriter | Azure Dijital İkizler | AWS IoT TwinMaker |
|---|---|---|
| Veri modeli | DTDL (Dijital İkiz Tanımlama Dili), özelleştirilebilir ontolojiler | Sahne bileşeni modeli, çalışma alanı tabanlı |
| SCADA/OT entegrasyonu | Azure IoT Hub + Endüstriyel IoT OPC-UA ile mükemmel | İyi şanslar AWS IoT Core + Greengrass Edge |
| 3 boyutlu görselleştirme | HoloLens 2, Power BI, iş ortağı ekosistemi | TwinMaker Sahne Bestecisi (Babylon.js), yerel Grafana eklentisi |
| Analitik | Azure Veri Gezgini, Synapse, Power BI | Amazon Zaman Akışı, Grafana, Athena |
| ML Entegrasyonu | Azure ML, Bilişsel Hizmetler | Amazon SageMaker |
| Fiyatlandırma (temel) | 0,10 USD/1000 sorgu + 0,50 USD/GB depolama/ay | 0,05 USD/1000 mülk okuması + 0,50 USD/GB/ay |
| CIM/CGMES standartları | Enerji için Azure Veri Yöneticisi (OSDU) | Özel bağlayıcılar aracılığıyla kısmi destek |
| Enerji sektörü | En İyi: Azure Enerji Veri Yöneticisi, OSDU | Genel OT için güçlü, enerjiye daha az özgü |
| İdeal kullanım durumu | Kamu hizmetleri, TSO/DSO, büyük SCADA sistemleri | Rüzgar santralleri, fotovoltaikler, akıllı binalar |
Enerji Sektörüne Yönelik Öneri
Kamu hizmetleri ve ağ operatörleri için (TSO/DSO) SCADA sistemleri ile Mevcut ve NERC/ENTSO-E uyumluluk gereklilikleri, Azure Dijital İkizler Enerji için Azure Veri Yöneticisi (OSDU'yu temel alır) ile en iyi entegrasyonu sunar endüstri standartlarıyla (CIM/CGMES). Yenilenebilir üretim operatörleri için (rüzgar, güneş) 3D görselleştirme ihtiyaçları ve Grafana entegrasyonu ile, AWS IoT TwinMaker daha sorunsuz bir geliştirici deneyimine sahiptir.
Edge Digital Twin: Ağ Geçidinde Hafif Modeller
Tüm hesaplamalar gidiş-dönüş bulutunu bekleyemez. Senaryolar için gecikme ve kritik (korumalar, talep yanıtı, gerçek zamanlı EV şarjı), evet bir uygular kenar dijital ikiz endüstriyel ağ geçidinde (Raspberry Pi 4, Siemens IPC, Beckhoff CX) sadeleştirilmiş bir sürümü çalıştırıyor Gecikme süresi 100 ms'nin altında olan modelin.
import numpy as np
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class EdgeTwinConfig:
"""Configurazione edge digital twin - ottimizzato per risorse limitate."""
n_buses: int = 6 # max bus simulabili in tempo reale
max_iterations: int = 10 # Newton-Raphson: meno iterazioni
tolerance_pu: float = 1e-4 # convergenza più lassa
update_interval_ms: float = 100.0 # frequenza aggiornamento 10 Hz
class LightweightPowerFlowSolver:
"""
Solver power flow semplificato per edge computing.
Usa metodo DC (lineare) per velocità massima.
L'approssimazione DC e accurata per reti con alto X/R ratio.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self._B_matrix: Optional[np.ndarray] = None # matrice ammettenza
self._initialized = False
def build_susceptance_matrix(
self,
from_buses: list[int],
to_buses: list[int],
x_ohm: list[float],
base_mva: float = 100.0
) -> None:
"""
Costruisce matrice B (suscettanza) per DC power flow.
Complessità O(n_lines), molto più veloce del metodo NR.
"""
n = self.config.n_buses
B = np.zeros((n, n))
for fb, tb, x in zip(from_buses, to_buses, x_ohm):
if x > 1e-8:
b = base_mva / x # suscettanza in pu
B[fb][fb] += b
B[tb][tb] += b
B[fb][tb] -= b
B[tb][fb] -= b
# Rimuovi riga/colonna bus slack (bus 0)
self._B_matrix = B[1:, 1:]
self._initialized = True
def solve_dc(self, p_injections_mw: list[float], base_mva: float = 100.0) -> dict:
"""
Risolve DC power flow: O(n^2) vs O(n^3) di NR.
Adatto per hardware edge con <512MB RAM.
Args:
p_injections_mw: lista iniezioni di potenza per bus (pos=generazione, neg=carico)
base_mva: potenza base per normalizzazione
Returns:
Angoli di fase e flussi di potenza stimati
"""
if not self._initialized:
raise RuntimeError("Matrice B non inizializzata")
t_start = time.monotonic_ns()
# Converti in per-unit (escludi bus slack)
p_pu = np.array(p_injections_mw[1:]) / base_mva
# Risolvi sistema lineare: B * theta = P
try:
theta = np.linalg.solve(self._B_matrix, p_pu)
except np.linalg.LinAlgError:
return {"converged": False, "error": "Matrice singolare - rete isola"}
theta_full = np.concatenate([[0.0], theta]) # aggiungi slack bus
elapsed_us = (time.monotonic_ns() - t_start) // 1000
return {
"converged": True,
"theta_rad": theta_full.tolist(),
"theta_deg": np.degrees(theta_full).tolist(),
"solve_time_us": int(elapsed_us),
"within_latency_budget": elapsed_us < (self.config.update_interval_ms * 1000 * 0.1)
}
class EdgeDigitalTwinRuntime:
"""
Runtime edge per digital twin leggero su gateway industriale.
Gira in un loop a 10 Hz con aggiornamento modello da sensori locali.
"""
def __init__(self, config: EdgeTwinConfig):
self.config = config
self.solver = LightweightPowerFlowSolver(config)
self._thermal_model = TransformerThermalModel()
self._last_sync_ts: float = 0.0
self._cloud_buffer: list[dict] = []
def process_sensor_sample(self, sensors: dict) -> dict:
"""
Elabora un campione sensoriale e aggiorna il modello edge.
Latenza target: < 10ms totale.
Args:
sensors: misure correnti (tensioni, correnti, temperature)
Returns:
Stato calcolato + alert immediati
"""
t_start = time.monotonic_ns()
# Aggiorna modello termico (molto veloce, solo algebra)
load_fraction = sensors.get("trafo_loading_pu", 0.5)
ambient_temp = sensors.get("ambient_temp_c", 20.0)
thermal_state = self._thermal_model.step(
load_fraction=load_fraction,
ambient_temp=ambient_temp,
dt_seconds=self.config.update_interval_ms / 1000.0
)
# Stima veloce stato rete (DC power flow)
p_injections = sensors.get("p_injections_mw", [0.0] * self.config.n_buses)
pf_result = self.solver.solve_dc(p_injections)
# Alert immediati (priorità altissima - <1ms)
immediate_alerts = self._evaluate_fast_alerts(thermal_state, sensors)
elapsed_ms = (time.monotonic_ns() - t_start) / 1e6
result = {
"thermal": thermal_state,
"power_flow": pf_result,
"alerts": immediate_alerts,
"edge_processing_ms": round(elapsed_ms, 3),
"within_budget": elapsed_ms < 10.0
}
# Accumula per sync cloud asincrono
self._cloud_buffer.append(result)
return result
def _evaluate_fast_alerts(
self,
thermal_state: dict,
sensors: dict
) -> list[dict]:
"""Alert critica valutata in <1ms per risposta immediata."""
alerts = []
hst = thermal_state.get("theta_winding_hst_c", 0.0)
if hst > 140.0:
alerts.append({
"code": "TRAFO_EMERGENCY_TEMP",
"severity": "CRITICAL",
"value": hst,
"action": "REDUCE_LOAD_IMMEDIATELY"
})
elif hst > 120.0:
alerts.append({
"code": "TRAFO_HIGH_TEMP",
"severity": "WARNING",
"value": hst,
"action": "MONITOR_CLOSELY"
})
# Verifica sovratensione (from raw sensor)
v_pu = sensors.get("voltage_pu", 1.0)
if v_pu > 1.10 or v_pu < 0.90:
alerts.append({
"code": "VOLTAGE_OUT_OF_RANGE",
"severity": "WARNING",
"value": v_pu,
"action": "ADJUST_TAP_CHANGER"
})
return alerts
def flush_to_cloud(self) -> list[dict]:
"""Svuota buffer locale per invio al cloud (chiamata asincrona)."""
buffer = self._cloud_buffer.copy()
self._cloud_buffer.clear()
return buffer
Örnek Olay: Dijital İkiz Açık Deniz Rüzgar Santrali
40 adet 8 MW'lık türbine (toplam 320 MW) sahip bir açık deniz rüzgar çiftliği, Muazzam miktarda veri: Her türbinde 200'den fazla sensör (ivmeölçerler, Gerinim ölçer, anemometreler, eğim/sapma kodlayıcıları, sıcaklık yatakları, akımlar jeneratör). Dijital ikiz; fiziksel, makine öğrenimi ve gerçek zamanlı optimizasyonu birleştirir Bakım maliyetlerini en aza indirirken üretilen enerjiyi en üst düzeye çıkarmak.
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class WindTurbineState:
"""Stato corrente di una turbina eolica."""
turbine_id: str
timestamp: str
wind_speed_ms: float
wind_direction_deg: float
power_kw: float
rotor_rpm: float
pitch_angle_deg: float
yaw_error_deg: float
nacelle_temp_c: float
gearbox_temp_c: float
generator_temp_c: float
main_bearing_vibration_g: float
tower_acceleration_x_g: float
tower_acceleration_y_g: float
operational_status: str # PRODUCING, CURTAILED, FAULT, MAINTENANCE
class OffshoreWindFarmTwin:
"""
Digital twin parco eolico offshore.
Gestisce 40 turbine con simulazione wake effect e predictive maintenance.
"""
# Curva di potenza Vestas V236-15.0 MW (approssimazione)
POWER_CURVE_MS = [0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 25]
POWER_CURVE_KW = [0, 0, 90, 310, 680, 1250, 2100, 3200, 4600, 6200, 7800, 8000, 8000]
def __init__(self, farm_name: str, n_turbines: int = 40):
self.farm_name = farm_name
self.n_turbines = n_turbines
self.turbine_states: dict[str, WindTurbineState] = {}
self._power_interpolator = self._build_power_curve_interp()
def _build_power_curve_interp(self):
"""Costruisce interpolatore curva di potenza."""
from scipy.interpolate import interp1d
return interp1d(
self.POWER_CURVE_MS,
self.POWER_CURVE_KW,
kind="cubic",
bounds_error=False,
fill_value=(0.0, 8000.0)
)
def calculate_wake_effect(
self,
turbine_positions: list[tuple[float, float]],
free_stream_wind_ms: float,
wind_direction_deg: float
) -> list[float]:
"""
Calcola velocità del vento per ogni turbina considerando il wake effect.
Usa modello Jensen (Park model) semplificato.
Args:
turbine_positions: lista (x, y) in metri
free_stream_wind_ms: velocità vento libero
wind_direction_deg: direzione vento (0=Nord)
Returns:
Lista velocità vento effettiva per ogni turbina
"""
n = len(turbine_positions)
effective_wind = [free_stream_wind_ms] * n
k_wake = 0.04 # costante espansione scia offshore
rotor_d = 236.0 # diametro rotore Vestas V236 in metri
ct = 0.79 # coefficiente di thrust a rated wind speed
# Converti direzione vento in vettore
wind_rad = np.radians(wind_direction_deg)
wind_vec = np.array([np.sin(wind_rad), np.cos(wind_rad)])
for i, (xi, yi) in enumerate(turbine_positions):
for j, (xj, yj) in enumerate(turbine_positions):
if i == j:
continue
# Vettore da turbina j a turbina i
delta = np.array([xi - xj, yi - yj])
dist = np.linalg.norm(delta)
if dist < 1.0:
continue
# Componente nella direzione del vento
downstream = np.dot(delta, wind_vec)
if downstream <= 0:
continue # i non e downstream di j
# Distanza laterale dalla scia
lateral = abs(np.cross(delta / dist, wind_vec)) * dist
# Raggio della scia a distanza downstream
wake_radius = rotor_d / 2.0 + k_wake * downstream
# Overlap fraction (semplificato: cilindrico)
if lateral < wake_radius:
overlap = max(0.0, 1.0 - lateral / wake_radius)
# Deficit di velocità Jensen model
deficit = (1.0 - np.sqrt(1.0 - ct)) * (rotor_d / (rotor_d + 2 * k_wake * downstream)) ** 2
effective_wind[i] = min(effective_wind[i], free_stream_wind_ms * (1.0 - overlap * deficit))
return effective_wind
def estimate_farm_production(
self,
wind_speed_ms: float,
wind_direction_deg: float,
turbine_positions: list[tuple[float, float]],
availability: Optional[list[float]] = None
) -> dict:
"""
Stima produzione totale del parco con wake model.
Args:
wind_speed_ms: velocità vento a hub height
wind_direction_deg: direzione vento
turbine_positions: coordinate turbine
availability: disponibilità per turbina (0-1)
Returns:
Produzione totale, per turbina, e perdite wake
"""
if availability is None:
availability = [1.0] * self.n_turbines
# Calcola velocità effettiva per ogni turbina
effective_winds = self.calculate_wake_effect(
turbine_positions, wind_speed_ms, wind_direction_deg
)
# Calcola potenza per turbina dalla curva
powers_kw = []
for i, (v_eff, avail) in enumerate(zip(effective_winds, availability)):
p = float(self._power_interpolator(v_eff)) * avail
powers_kw.append(p)
total_power_mw = sum(powers_kw) / 1000.0
ideal_power_mw = float(self._power_interpolator(wind_speed_ms)) * self.n_turbines / 1000.0
wake_loss_pct = max(0.0, (ideal_power_mw - total_power_mw) / ideal_power_mw * 100) if ideal_power_mw > 0 else 0.0
return {
"total_power_mw": round(total_power_mw, 2),
"ideal_power_mw": round(ideal_power_mw, 2),
"wake_loss_pct": round(wake_loss_pct, 2),
"capacity_factor": round(total_power_mw / (self.n_turbines * 8.0) * 100, 1),
"per_turbine_kw": [round(p, 1) for p in powers_kw],
"effective_wind_ms": [round(v, 2) for v in effective_winds]
}
def detect_yaw_misalignment(
self,
turbine_id: str,
lidar_wind_dir: float,
nacelle_dir: float,
threshold_deg: float = 5.0
) -> dict:
"""
Rileva disallineamento yaw da dati LIDAR + encoder nacelle.
Un yaw error di 10° riduce la produzione del ~3% (cos^2 law).
"""
yaw_error = abs(lidar_wind_dir - nacelle_dir)
# Normalizza nell'intervallo -180/+180
if yaw_error > 180:
yaw_error = 360 - yaw_error
# Stima perdita di produzione (legge del coseno)
production_loss_pct = (1 - np.cos(np.radians(yaw_error)) ** 3) * 100
return {
"turbine_id": turbine_id,
"yaw_error_deg": round(yaw_error, 2),
"production_loss_pct": round(production_loss_pct, 2),
"action_required": yaw_error > threshold_deg,
"priority": "HIGH" if yaw_error > 15.0 else
"MEDIUM" if yaw_error > threshold_deg else "NONE"
}
Performans, Ölçeklenebilirlik ve Güvenlik
Performans Gereksinimleri
| Bileşen | Hedef Gecikme Süresi | Güncelleme Sıklığı | Depolama/gün |
|---|---|---|---|
| Kenar ikizi (DC güç akışı) | < 10ms | 10 Hz | 50 MB/türbin |
| NR güç akışı (bulut) | < 500ms | 0,03Hz (30s) | 5 MB/trafo merkezi |
| Makine öğrenimi çıkarımı | < 100ms | 1Hz | 2 MB/varlık |
| 3D görselleştirme senkronizasyonu | < 1s | 0,1Hz (10s) | ihmal edilebilir |
| CGMES modelini dışa aktarma | < 5sn | 1/saat | 100 MB/ağ |
Güvenlik: SCADA Veri Koruması
Uyarı: Saldırı Vektörü Olarak Dijital İkizler
Enerji dijital ikizi ve siber saldırılar için yüksek değerli bir hedef: şunları içerir: tam ağ topolojisi, varlık güvenlik açıkları ve modelleri koruma. NIS2 Direktifi (İtalya'da 2024'ten itibaren yürürlüktedir) gereklilikleri zorunlu kılar Enerji sektöründeki temel hizmetlerin işletmecileri için sıkı:
- Ağ segmentasyonu: DT veri katmanı OT ağından ayrı bir DMZ'de olmalıdır
- Sıfır güven erişimi: her bileşen kimlik doğrulaması yapar, örtülü güven yoktur
- Veri temizleme: DT bulutuna giden SCADA verileri bir veri diyotundan (fiziksel tek yönlü) geçmelidir
- Denetim izleri: Uyumluluk için modeldeki her sorguda oturum açılmalıdır
- Beklemedeyken şifreleme: CIM/CGMES modelleri hassas topoloji verileri içerir
# Architettura di sicurezza raccomandata per Digital Twin energetico
#
# OT Network (IEC 62443) DMZ Cloud
# ┌─────────────────────┐ ┌──────────────┐ ┌───────────────┐
# │ IED / RTU / SCADA │───►│ Data Diode │───►│ Data Broker │
# │ (Modbus, IEC 61850) │ │ (unidirez.) │ │ (Kafka/MQTT) │
# └─────────────────────┘ └──────────────┘ └───────┬───────┘
# │ TLS 1.3
# ┌─────────────────────┐ ┌──────────────┐ ┌───────▼───────┐
# │ Historian (locale) │ │ Firewall │ │ DT Engine │
# │ OSIsoft PI / AVEVA │ │ IPS/IDS │ │ (Azure DT / │
# └─────────────────────┘ └──────────────┘ │ AWS TwinMkr) │
# └───────────────┘
#
# Regole chiave:
# - Nessuna connessione INBOUND verso OT network
# - Autenticazione mTLS per tutti i servizi DT
# - RBAC: operatore vede solo i propri asset
# - Dati storici: retention 7 anni (requisito ARERA in Italia)
# - Backup modello CIM: immutabile su S3/ADLS con versioning
# Esempio: validazione input da SCADA prima di aggiornare il twin
import re
from typing import Any
SCADA_FIELD_LIMITS = {
"load_mw": (0.0, 1000.0),
"voltage_kv": (0.0, 500.0),
"tap_position": (-16, 16),
"ambient_temp_c": (-40.0, 60.0),
"current_ka": (0.0, 10.0)
}
def validate_scada_measurement(field: str, value: Any) -> float:
"""
Valida misura SCADA prima dell'ingestion nel digital twin.
Previene injection di valori anomali (tampering / errori sensore).
"""
if not isinstance(value, (int, float)):
raise ValueError(f"Campo {field}: tipo non valido {type(value)}")
value = float(value)
if not (-1e9 < value < 1e9): # sanity check overflow
raise ValueError(f"Campo {field}: overflow numerico {value}")
if field in SCADA_FIELD_LIMITS:
lo, hi = SCADA_FIELD_LIMITS[field]
if not (lo <= value <= hi):
raise ValueError(
f"Campo {field}: valore {value} fuori range fisico [{lo}, {hi}]"
)
return value
Gelecek: Birleşik Dijital İkizler ve Yapay Zekayla Artırılmış Izgara
Enerji dijital ikizlerinde 2026-2028 için yükselen trendler:
Trend 2026-2028: Yeni Nesil Dijital İkiz
- Birleşik Dijital İkizler: TSO ve DSO'nun dijital ikizleri teknikleri kullanarak veri gizliliğini korurken işbirliği yapan birkaç kişi paylaşılan modelleri paylaşmadan güncellemeye yönelik birleşik öğrenme ağ topolojisi hakkında hassas veriler.
- Yapay Zekayla Artırılmış Fizik Modelleri: sinir ağları fiziksel olarak (PINN – Fizik Bilgili Sinir Ağları) Gözlemlenen verilerle Maxwell/Kirchhoff denklemleri, %60 oranında azaltılır Yalnızca analitik modellerle karşılaştırıldığında simülasyon hatası.
- Otonom Izgara İkizi: RL temsilcileriyle dijital ikiz Otonom kontrol eylemleri gerçekleştiren (Takviyeli Öğrenme) (kademe değiştirici, yük atma, depolama gönderimi) zamanında optimize etme insan denetimi ile gerçek.
- KOBİ'ler için Hizmet Olarak Dijital İkiz (DTaaS): platformlar Endüstriyel tesisler için önceden yapılandırılmış dijital ikizler sunan SaaS orta (1-50 MW), giriş bariyerini 500.000'den 50.000 EUR'ya düşürüyor.
- 5G Ultra Güvenilir Düşük Gecikme Süresi: 5G URLLC bağlantısı (gecikme <1 ms, %99,999 güvenilirlik) sahada uç ikizlemeye olanak tanıyacak ara ağ geçitleri olmadan, doğrudan sensör-bulut senkronizasyonuyla.
Kaynaklar ve Referanslar
| Kaynak | Tip | Kullanım |
|---|---|---|
| pandapower.readthedocs.io | Dokümantasyon | Güç akışı, optimum güç akışı, kısa devre |
| pypsa.readthedocs.io | Dokümantasyon | Kapasite genişletme, birim taahhüdü, çok dönemli |
| IEC 60076-7:2018 | Standart | Güç transformatörlerinin termal modeli |
| IEC 60599:2015 | Standart | Transformatör teşhisi için DGA yorumu |
| ENTSO-E CIM / CGMES 3.0 | Standart | Ağ veri modeli, TSO/DSO arasında alışveriş |
| Azure Digital Twins belgeleri | Bulut Platformu | DTDL, CIM ontolojileri, Azure IoT Hub |
| AWS IoT TwinMaker belgeleri | Bulut Platformu | Sahne Bestecisi, Grafana eklentisi, Timestream |
| OpenModelica | Açık kaynak araçları | Karmaşık fiziksel simülasyon (Modelica dili) |
| NIS2 Direktifi (AB 2022/2555) | Düzenlemeler | Temel hizmet operatörleri için siber güvenlik |
Sonuçlar
Günümüzde enerji altyapısının dijital ikizi, sektörün en dönüştürücü teknolojileri: 2025'te 34 milyar dolarlık pazar ve enerji geçişinin etkisiyle yılda %34,7 büyüyen DER'li ağların artan karmaşıklığı ve düzenleyici baskılar güvenilirlik ve güvenlik.
Aşağıdakilerle kurumsal düzeyde bir dijital ikizin nasıl uygulanacağını gördük: panda gücü gerçek zamanlı Newton-Raphson güç akışı için, transformatörler için IEC 60076-7 termal modeli, PyPSA ağ planlama optimizasyonu için ML modelleri Rastgele Orman ve XGBoost Kestirimci bakım için, ve model CIM/CGMES Sistemler arasında birlikte çalışabilirlik için.
Başarının anahtarı iyi ayrılmış bir katman mimarisidir: fizik, veri, model ve görselleştirmenin açık arayüzlere sahip olması gerekir; Her bileşeni bağımsız olarak geliştirin. Dijital gölge (L1) ilk erişilebilir dingil mesafesi; kuralcı ikiz (L4) ve nihai hedef altyapı kendi kendini optimize eder.
Serideki Sonraki Makale
EnergyTech serisinin bir sonraki ve son makalesi şunları araştırıyor: P2P Enerji Ticareti için Blockchain: Akıllı Sözleşmeler ve Kısıtlamalar: Eşler arası enerji ticareti için sağlam akıllı sözleşme mimarisi, Otomatik zincir içi yerleşim ve Avrupa düzenleyici kısıtlamalarının navigasyonu.
İlgili Seriler
- MLOps: tahmine dayalı bakım makine öğrenimi modelleri sunmak MLflow, DVC ve Kubernetes ile üretimde
- Yapay Zeka Mühendisliği: RAG'ı belgelere uygulamak Arıza raporu analizi için tesis mühendisliği ve Yüksek Lisans
- Veri ve Yapay Zeka İşi: enerji veri yönetimi için ve analitik için ETL/ELT işlem hatları oluşturma







