Architektura DERMS: Agregace milionů distribuovaných zdrojů
V roce 2025 bude poprvé v historii pocházet více než polovina elektřiny vyrobené v Evropě z obnovitelných zdrojů. Mimořádný úspěch, který s sebou však nese neméně mimořádnou výzvu: jak řídit elektrickou síť, ve které se výroba již nesoustředí v pár velkých elektrárnách, ale distribuovány v milionech fotovoltaických systémů na střechách, akumulátorů v garážích, elektrických vozidel připojené do zásuvky, inteligentní tepelná čerpadla a průmyslové mikrokogenerátory.
Odpověď na tuto výzvu má název: DERMS, Systém řízení zdrojů distribuované energie. Jedná se o softwarovou platformu, která agreguje, monitoruje, optimalizuje a koordinuje v reálném čase tisíce - nebo miliony - distribuovaných energetických zdrojů (DER), které je přeměňují na flexibilní a kontrolovatelné aktivum pro síť. Bez DERMS růst distribuovaných obnovitelných zdrojů riskuje destabilizaci sítě místo dekarbonizace.
Trh DERMS zažívá boom: odhady pro rok 2025 se pohybují mezi 1,1 a 1,42 miliardy dolarů v závislosti na zdroji, s projekcemi růstu až 2,2 miliardy dolarů do roku 2030 v CAGR o 14-16 %. Ale skutečná revoluce se odehrává na architektonické stránce: škálování z 1 000 na 1 000 000 zařízení distribuované, udržování latence pod 500 ms pro odeslání v reálném čase, je technický problém software, který málokdo vyřešil ve výrobě.
V tomto článku se ponoříme do celé architektury moderního DERMS: od komunikačních protokolů pole (OpenADR 2.0b, IEEE 2030.5, SunSpec Modbus) až po cloudové platformy řízené událostmi s Kafkou, od matematika optimalizace odbavení (lineární programování s PuLP) na trh doplňkových služeb italština (MSD/MGP of Terna). S fungujícím kódem Python a skutečnou případovou studií italských regionálních VPP.
Co se dozvíte v tomto článku
- Definice a umístění DERMS v ekosystému utility (rozdíly s EMS, SCADA, ADMS)
- Typy DER: rezidenční FV, BESS, EV/V2G, odezva na poptávku, kogenerace
- Vícevrstvá architektura: Field, Edge, Platform, Market
- Virtuální elektrárna: jak agregovat tisíce DER do aktiva jednotného trhu
- Implementace Pythonu: Služba FastAPI + optimalizace odeslání s PuLP
- Komunikační protokoly: OpenADR 2.0b, IEEE 2030.5, MQTT, SunSpec Modbus
- Škálovatelnost řízená událostmi s Apache Kafka a CQRS pro 1 milion DER
- Odezva na poptávku: programy DR, M&V, události OpenADR
- Italský kontext: CER, GSE, MACSE, MSD/MGP Terna
- Případová studie: Regionální VPP s 5 000 PV + 500 BESS
Řada EnergyTech – 10 článků
| # | Položka | Stát |
|---|---|---|
| 1 | Smart Grid a IoT: Architektura pro elektrickou síť budoucnosti | Publikováno |
| 2 | Architektura DERMS: Agregace milionů distribuovaných zdrojů (jste zde) | Proud |
| 3 | Battery Management System: Řídicí algoritmy pro BESS | Další |
| 4 | Digitální dvojče elektrické sítě s Pythonem a Pandapower | Již brzy |
| 5 | Prognóza obnovitelné energie: ML pro FV a větrnou energii | Již brzy |
| 6 | EV Load Balancing: V2G a Smart Charging s OCPP | Již brzy |
| 7 | MQTT a InfluxDB pro energetickou telemetrii v reálném čase | Již brzy |
| 8 | IEC 61850: Komunikace v elektrické rozvodně | Již brzy |
| 9 | Software pro uhlíkové účetnictví: Měření a snižování emisí | Již brzy |
| 10 | Blockchain pro P2P obchodování s energií v CER | Již brzy |
Co je DERMS a jak je umístěn v užitkovém ekosystému
Před vstupem do technické architektury je nezbytné ujasnit si, co odlišuje DERMS od ostatních systémy energetického managementu, které energetické společnosti používají již desítky let. Terminologický zmatek v tom průmysl a vysoká a prodejci často používají tyto zkratky pro marketingové účely zaměnitelně.
Hierarchie systémů řízení
V ekosystému moderní veřejné služby existuje několik systémů, z nichž každý má specifické odpovědnosti:
| Systém | Akronym | Doména | Spravované zdroje | Typická latence |
|---|---|---|---|---|
| Systém energetického managementu | EMS | Převodovka (HV) | Velké elektrárny, propojení | Sekundy-minuty |
| VYPRŠÍ | VYPRŠÍ | Převod + distribuce | Vypínače, transformátory, vedení | 100 ms - 1 s |
| Pokročilý systém řízení distribuce | ADMS | Distribuce (MV/NN) | Rozvodná síť, kabiny | Sekundy |
| Distribuovaný systém řízení zdrojů energie | DERMS | Distribuce + koncový zákazník | FV, BESS, EV, DR, VPP | 100 ms - 5 minut |
| Domácí systém energetického managementu | HEMS | Rezidenční zákazník | Jednotlivá domácí zařízení | Sekundy-minuty |
DERMS zaujímá jedinečné postavení: je to první systém, který překročil hranici metru, zadáním domény koncového zákazníka. To vytváří právní důsledky (souhlas, ochrana osobních údajů), technické (interoperabilita s tisíci značkami/modely zařízení) a obchodní (kdo vlastní data? kdo se podělí o výtěžek?).
Referenční standard: IEEE 2030.xa OpenADR
Ekosystém DERMS řídí dvě rodiny standardů:
IEEE 2030.5 (Smart Energy Profile 2.0 / SEP2)
Publikováno v roce 2013 a aktualizováno v roce 2023 (IEEE 2030.5-2023, prosinec 2024), definuje protokol komunikace mezi utilitami a zařízeními distribuovanými zákazníkovi. Založeno na architektuře RESTful HTTP/HTTPS, podporuje zabezpečení TLS 1.2+. Pokrývá: odezvu na poptávku, řízení zátěže, ceny dynamický, DER management (fotovoltaické, akumulační, EV). A kalifornský mandát (článek 21) pro všechny nové fotovoltaické a akumulační systémy. V profilu 2023 zavedl funkce specifické pro DER.
OpenADR 2.0b
Open Automated Demand Response, vyvinuté OpenADR Alliance. Verze 2.0b a profil kompletní pro sofistikované servery a klienty (2.0a a pro jednoduchá zařízení). Používat XML/JSON přes HTTP, definuje Virtual Top Node (VTN - DERMS/utilita) a Virtual End Node (VEN - zařízení/agregátor). Podporuje push (VTN initia) a pull (VEN vyžaduje) režimy. V roce 2025 první certifikované produkty OpenADR 3.0 byly oznámeny (platforma E.ON SWITCH), ale operační referencí zůstává 2.0b pro velkou většinu globálních nasazení.
Typy DER: Bestiář distribuovaných zdrojů
DERMS musí řídit zoologickou zahradu s heterogenními technologiemi, z nichž každá má jiné fyzické vlastnosti, různá komunikační rozhraní a různá provozní omezení. Předpokladem je jejich důkladná znalost navrhnout účinný agregační systém.
| typ DER | typická kapacita | ovladatelnost | Komunikační latence | Hlavní protokol | Klíčová omezení |
|---|---|---|---|---|---|
| Rezidenční FV | 3-10 kWp | Omezení, rychlost rampy | 5-60 sekund | IEEE 2030.5, SunSpec | Záleží na ozáření |
| FV C&I (komerční a průmyslové) | 50-5000 kWp | Omezení, jalový výkon | 1-5 sekund | Modbus TCP, DNP3 | Smlouva o PPA, síťová omezení |
| BESS Rezidenční | 5-15 kWh / 3-10 kW | Vysoká (nabíjení/vybíjení/pohotovostní režim) | 100 ms - 2 sekundy | SunSpec, IEEE 2030.5 | SoC min/max, životní cykly |
| BESS C&I / Grid-Scale | 100 kWh - 1 GWh | Velmi vysoká, ms odezva | 50-500 ms | Modbus TCP, IEC 60870, IEC 61850 | Teplota, SoC, degradace |
| EV (Vehicle-to-Grid V2G) | 7-100 kW obousměrný | Vysoká, pokud je připojen a povolen | 1–10 sekund (OCPP 2.0.1) | OCPP 2.0.1, ISO 15118 | Uživatelský SoC, časy nabíjení |
| EV (chytré nabíjení V1G) | 3,7-22 kW ve světovém směru | Střední (pouze zkrácení) | 5-30 sekund | OCPP 1.6/2.0.1 | Uživatelské preference, cílový SoC |
| Odezva poptávky (průmyslové zatížení) | 50kW - 50MW | Vysoká, pokud je předem kvalifikována | 10-300 sekund | OpenADR 2.0b | Délka akce, zotavení |
| Tepelná čerpadla (HP) | 3-20 kW tepelný | Průměr (časový posun) | 30-300 sekund | Modbus, OpenADR | Tepelný komfort, nastavená hodnota |
| Mikrokogenerace (CHP) | 1-1000 kWe | Vysoká (rozšířená) | 1-30 sekund | Modbus TCP, OPC-UA | Tepelná účinnost, plyn |
Složitost heterogenity
Skutečný DERMS se musí propojit se stovkami různých modelů invertorů, BMS, kolon průmyslové nabíjení a regulátory. Každý výrobce implementuje protokoly trochu jinak, se specifickými chybami, nestandardními časovými limity a podmnožinami funkcí. Robustní vrstva ovladače s adaptérem vzor a první architektonickou prioritu, než vůbec začneme přemýšlet o optimalizaci.
Softwarová architektura DERMS: Vícevrstvý model
Moderní DERMS je rozdělen do čtyř odlišných vrstev, z nichž každá má přesně definované odpovědnosti a konkrétní technologie. Jasné oddělení mezi vrstvami je zásadní pro škálovatelnost a udržovatelnost systému.
# Architettura DERMS - Vista ad alto livello
+================================================================+
| LAYER 4: MARKET |
| Mercati Energia: MGP, MSD, MO, Capacity Market |
| DSO Flexibility Markets, Aggregatori terzi |
| Revenue stacking, Portfolio optimization |
+================================================================+
| |
Bid/Offer API Settlement data
| |
+================================================================+
| LAYER 3: PLATFORM (Cloud DERMS) |
| |
| +------------------+ +------------------+ |
| | Forecasting | | Dispatch Engine | |
| | (ML: FV, load, | | (Optimization: | |
| | EV availability)| | LP/MILP/MPC) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Aggregation | | Market Interface | |
| | Service (VPP | | (Bid builder, | |
| | portfolio mgmt) | | settlement) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Event Bus | | Time-Series DB | |
| | (Apache Kafka) | | (InfluxDB/ | |
| | | | TimescaleDB) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Device Registry | | API Gateway | |
| | (DER catalog, | | (REST, WebSocket,| |
| | metadata, caps) | | gRPC) | |
| +------------------+ +------------------+ |
+================================================================+
| |
Commands (dispatch) Telemetry (status)
| |
+================================================================+
| LAYER 2: EDGE |
| |
| +------------------+ +------------------+ |
| | Site Aggregator | | Protocol Gateway | |
| | (building/plant | | (Modbus->MQTT, | |
| | controller) | | SunSpec->JSON) | |
| +------------------+ +------------------+ |
| |
| Local optimization, failsafe, buffering, compression |
+================================================================+
| |
Device protocols (Modbus, SunSpec, OCPP, BACnet, OPC-UA)
| |
+================================================================+
| LAYER 1: FIELD |
| Inverter FV - BESS BMS - EV Charger - Smart Meter |
| Industrial Loads - CHP Controller - Heat Pump |
+================================================================+
Klíčové architektonické principy
Událostmi řízený s CQRS
Srdcem DERMS je sběrnice událostí (Apache Kafka ve výrobě), která odděluje příkazy zápisu (Strana příkazu - expedice příkazů) z přečtených dotazů (Strana dotazu - dashboard, reporting). Vzor CQRS (Command Query Responsibility Segregation) umožňuje škálovat tyto dvě cesty nezávisle: telemetrické dotazy jsou velmi časté (miliony zpráv/hodinu), zatímco příkazy expedice jsou méně časté, ale vyžadují záruky doručení (alespoň jednou nebo přesně jednou).
Odolnost na prvním místě
Vrstva Edge není jednoduché relé: má schopnost pracovat v režimu offline (ladná degradace) při přerušení připojení ke cloudu. Site Agregators provádějí místní optimalizaci s podmnožinou zjednodušených pravidel zajišťujících, že se baterie nevybíjí pod minimální SoC a že kritické zátěže zůstávají napájeny i bez dohledu cloudu DERMS.
Vzor adaptéru ovladače
Každý typ zařízení má svůj vlastní adaptér, který překládá nativní protokol (SunSpec Modbus, OpenADR, OCPP) ve standardizovaném interním datovém modelu (Device Shadow, inspirovaný AWS IoT). To je izolace zcela obchodní logiku od složitosti polních protokolů a umožňuje přidat nové typy DER, aniž by se dotýkaly jádra systému.
Virtuální elektrárna: Transformace DER na tržní aktivum
Virtuální elektrárna (VPP) je klíčovým konceptem, který dává agregaci DER ekonomickou hodnotu. VPP není fyzický závod: je to portfolio distribuovaných zdrojů, které jsou při pohledu zvenčí (z trhu s elektřinou nebo přenosové sítě), se chová jako virtuální elektrárna s ovladatelnými a předvídatelnými vlastnostmi.
Globální trh VPP vzrostl z 5,7 miliardy USD v roce 2025 a očekává se, že dosáhne 28,4 miliardy dolarů do roku 2035 (CAGR 17,4 %). Agregační a orchestrační software dominuje s 46% podílem na trhu. Celková kapacita VPP v Severní Americe dosáhla 37,5 GW v roce 2025, zatímco globální cíl pro rok 2030 překračuje 500 GW umožněný V2G.
Jak funguje agregace VPP
Proces agregace probíhá ve čtyřech po sobě jdoucích fázích, které se opakují každých 15 minut (typické plánovací období evropských trhů):
- Individuální prognózování: pro každý DER v portfoliu systém vypočítá dostupnost se očekává v nejbližších hodinách. U FV systému to závisí na očekávané radiaci; u BESS aktuální stav nabití (SoC) a již naplánované cykly; for an EV, from pravděpodobnost připojení (na základě historických uživatelských vzorců).
- Agregace portfolia: Jednotlivé prognózy jsou agregovány na úroveň VPP, s přihlédnutím k omezením sítě (řízení přetížení) a korelacím mezi zdroji. Výsledkem je „křivka nabídky“, která popisuje, jakou sílu může VPP poskytnout za každou cenu.
- Optimalizace a nabízení cen: optimalizační algoritmus (lineární programování nebo MILP) určuje optimální strategii nabídek na trzích (MGP, MSD, kapacitní trhy) maximalizace očekávaných výnosů portfolia.
- Odeslání v reálném čase: jakmile je zakázka zadána na trhu, Dispatch Engine posílá nastavené body do každého jednotlivého DER, respektuje fyzická omezení a vyvažuje souhrnná odezva s tržním cílem.
Implementace Pythonu: DERMS Aggregation Service
Vybudujeme kompletní službu agregace DER s FastAPI a optimalizací expedice založené na lineárním programování. Kód je strukturován do realistických modulů pro systém výroby.
Datový model DER
# models.py - Modello dati per le risorse distribuite
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import datetime
class DERType(Enum):
SOLAR_PV = "solar_pv"
BATTERY_STORAGE = "battery_storage"
EV_CHARGER = "ev_charger"
DEMAND_RESPONSE = "demand_response"
CHP = "chp"
HEAT_PUMP = "heat_pump"
class DERStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
DISPATCHING = "dispatching"
FAULT = "fault"
STANDBY = "standby"
@dataclass
class DERCapabilities:
"""capacità fisiche di una DER"""
max_power_kw: float # Potenza massima erogabile (kW)
min_power_kw: float # Potenza minima (0 per curtailment FV)
ramp_up_kw_per_sec: float # Velocita rampa salita (kW/s)
ramp_down_kw_per_sec: float # Velocita rampa discesa (kW/s)
# Solo per storage (BESS/EV)
capacity_kwh: Optional[float] = None
min_soc_pct: Optional[float] = None # SoC minimo (es. 10%)
max_soc_pct: Optional[float] = None # SoC massimo (es. 95%)
roundtrip_efficiency: Optional[float] = None # Efficienza ciclo (es. 0.92)
@dataclass
class DERTelemetry:
"""Stato in tempo reale di una DER"""
der_id: str
timestamp: datetime.datetime
active_power_kw: float # Potenza attuale (positiva = generazione)
reactive_power_kvar: float
voltage_v: float
current_a: float
status: DERStatus
# Solo per storage
soc_pct: Optional[float] = None
available_charge_kw: Optional[float] = None
available_discharge_kw: Optional[float] = None
# Solo per FV
irradiance_wm2: Optional[float] = None
temperature_c: Optional[float] = None
@dataclass
class DERAsset:
"""Registro completo di una DER nel portfolio"""
id: str
type: DERType
name: str
site_id: str # Sito fisico di appartenenza
grid_node_id: str # Nodo di rete (per vincoli topologici)
capabilities: DERCapabilities
protocol: str # "sunspec", "openadr", "ocpp", "modbus"
endpoint: str # URL/IP del dispositivo o del gateway
owner_id: str # Proprietario (utente o azienda)
aggregation_vpp_ids: list = field(default_factory=list) # VPP di appartenenza
# Telemetria più recente (aggiornata dal telemetry service)
last_telemetry: Optional[DERTelemetry] = None
Agregační služba s FastAPI
# aggregation_service.py - Servizio di aggregazione VPP
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, List, Optional
import asyncio
import logging
from datetime import datetime, timezone
# Import interni
from models import DERAsset, DERTelemetry, DERType, DERStatus
from dispatch_optimizer import DispatchOptimizer
from telemetry_store import TelemetryStore
logger = logging.getLogger(__name__)
app = FastAPI(title="DERMS Aggregation Service", version="2.1.0")
# --- Pydantic schemas per la API ---
class VPPPortfolioResponse(BaseModel):
vpp_id: str
der_count: int
total_capacity_kw: float
available_capacity_kw: float
current_dispatch_kw: float
battery_soc_avg_pct: Optional[float]
timestamp: str
class DispatchRequest(BaseModel):
vpp_id: str
target_power_kw: float # Potenza target per la VPP (positiva = generazione)
duration_minutes: int # Durata del dispatch
priority: str = "normal" # "emergency" | "normal" | "economic"
max_deviation_pct: float = 5.0 # Tolleranza deviazione dal target
class DispatchSetpoint(BaseModel):
der_id: str
power_kw: float
duration_minutes: int
timestamp: str
class DispatchResponse(BaseModel):
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: List[DispatchSetpoint]
feasibility_score: float # 0.0 - 1.0 (1.0 = target pienamente raggiunto)
timestamp: str
# --- Registro in-memory (in produzione: database PostgreSQL + cache Redis) ---
_vpp_registry: Dict[str, List[str]] = {} # vpp_id -> [der_id]
_der_registry: Dict[str, DERAsset] = {} # der_id -> DERAsset
_telemetry_store = TelemetryStore()
_optimizer = DispatchOptimizer()
# --- Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok", "version": "2.1.0", "timestamp": datetime.now(timezone.utc).isoformat()}
@app.get("/api/v1/vpp/{vpp_id}/portfolio", response_model=VPPPortfolioResponse)
async def get_vpp_portfolio(vpp_id: str):
"""
Ritorna la snapshot aggregata dello stato corrente di una VPP.
Consolida telemetria di tutti i DER nel portfolio.
"""
if vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{vpp_id}' non trovata")
der_ids = _vpp_registry[vpp_id]
der_assets = [_der_registry[did] for did in der_ids if did in _der_registry]
if not der_assets:
raise HTTPException(status_code=503, detail="Nessun DER disponibile nel portfolio")
# Aggregazione metriche
total_capacity = sum(a.capabilities.max_power_kw for a in der_assets)
current_dispatch = 0.0
available_capacity = 0.0
storage_assets = []
for asset in der_assets:
telemetry = _telemetry_store.get_latest(asset.id)
if telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.DISPATCHING]:
current_dispatch += telemetry.active_power_kw
# capacità disponibile = differenza tra massimo e corrente
if telemetry.available_discharge_kw is not None:
available_capacity += telemetry.available_discharge_kw
else:
available_capacity += (asset.capabilities.max_power_kw - telemetry.active_power_kw)
# Raccogli SoC per storage
if telemetry.soc_pct is not None:
storage_assets.append(telemetry.soc_pct)
battery_soc_avg = (sum(storage_assets) / len(storage_assets)) if storage_assets else None
return VPPPortfolioResponse(
vpp_id=vpp_id,
der_count=len(der_assets),
total_capacity_kw=round(total_capacity, 2),
available_capacity_kw=round(max(0, available_capacity), 2),
current_dispatch_kw=round(current_dispatch, 2),
battery_soc_avg_pct=round(battery_soc_avg, 1) if battery_soc_avg else None,
timestamp=datetime.now(timezone.utc).isoformat()
)
@app.post("/api/v1/vpp/dispatch", response_model=DispatchResponse)
async def dispatch_vpp(request: DispatchRequest, background_tasks: BackgroundTasks):
"""
Esegue un dispatch ottimizzato della VPP verso il target di potenza richiesto.
Usa Linear Programming per distribuire il carico tra i DER disponibili.
"""
if request.vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{request.vpp_id}' non trovata")
# Recupero DER disponibili e telemetria aggiornata
der_ids = _vpp_registry[request.vpp_id]
available_ders = []
for der_id in der_ids:
asset = _der_registry.get(der_id)
telemetry = _telemetry_store.get_latest(der_id)
if asset and telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.STANDBY]:
available_ders.append((asset, telemetry))
if not available_ders:
raise HTTPException(status_code=503, detail="Nessun DER disponibile per il dispatch")
# Ottimizzazione del dispatch tramite LP
result = _optimizer.optimize_dispatch(
ders=available_ders,
target_power_kw=request.target_power_kw,
duration_minutes=request.duration_minutes
)
# Invio setpoint in background (async, non bloccante)
background_tasks.add_task(
_send_setpoints_to_ders,
setpoints=result.setpoints,
dispatch_id=result.dispatch_id
)
return result
async def _send_setpoints_to_ders(setpoints: List[DispatchSetpoint], dispatch_id: str):
"""Invia i setpoint a ogni DER in parallelo tramite il rispettivo adapter."""
logger.info(f"Inizio dispatch {dispatch_id} - {len(setpoints)} setpoint")
tasks = [_send_single_setpoint(sp) for sp in setpoints]
results = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning(f"Dispatch {dispatch_id}: {len(errors)} errori su {len(setpoints)} setpoint")
logger.info(f"Dispatch {dispatch_id} completato")
async def _send_single_setpoint(setpoint: DispatchSetpoint):
"""Stub: in produzione chiama l'adapter specifico del protocollo del DER."""
asset = _der_registry.get(setpoint.der_id)
if not asset:
raise ValueError(f"DER {setpoint.der_id} non trovato nel registro")
logger.debug(f"Setpoint {setpoint.der_id}: {setpoint.power_kw} kW per {setpoint.duration_minutes} min")
# In produzione: chiamata all'adapter (SunSpec/OpenADR/OCPP)
await asyncio.sleep(0.1) # Simulazione latenza rete
Dispatch Optimizer s lineárním programováním (PuLP)
Srdcem optimalizace je problém lineárního programování (LP), který minimalizuje odchylku z cíle výkonu s ohledem na fyzická omezení každého DER:
# dispatch_optimizer.py - Ottimizzazione LP per dispatch DER
import pulp
import uuid
import logging
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import List, Tuple
from models import DERAsset, DERTelemetry, DERType, DERStatus
logger = logging.getLogger(__name__)
@dataclass
class OptimizationResult:
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: list
feasibility_score: float
timestamp: str
solver_status: str
solve_time_ms: float
class DispatchOptimizer:
"""
Ottimizzatore LP per dispatch di portfolio DER.
Problema: data una richiesta di potenza target P_target,
trovare i setpoint p_i per ogni DER i nel portfolio
che minimizzino |sum(p_i) - P_target|, rispettando:
- p_i_min <= p_i <= p_i_max per ogni DER
- Vincoli SoC per storage (BESS/EV)
- Vincoli ramp rate
- Vincoli di topologia di rete (opzionale)
"""
def optimize_dispatch(
self,
ders: List[Tuple[DERAsset, DERTelemetry]],
target_power_kw: float,
duration_minutes: int,
vpp_id: str = "vpp-001"
) -> OptimizationResult:
import time
start_time = time.time()
dispatch_id = f"disp-{uuid.uuid4().hex[:8]}"
# Costruzione del problema LP con PuLP
prob = pulp.LpProblem(
name=f"DER_Dispatch_{dispatch_id}",
sense=pulp.LpMinimize
)
# Variabili decisionali: setpoint per ogni DER (kW)
# Vincolate tra min e max fisico del dispositivo
p_vars = {}
for asset, telemetry in ders:
# Calcola limiti effettivi considerando SoC per storage
p_min, p_max = self._compute_effective_limits(asset, telemetry, duration_minutes)
var_name = f"p_{asset.id.replace('-', '_')}"
p_vars[asset.id] = pulp.LpVariable(
name=var_name,
lowBound=p_min,
upBound=p_max,
cat=pulp.constants.LpContinuous
)
# Variabile di slack per la deviazione dal target (non-negativa)
slack_pos = pulp.LpVariable("slack_pos", lowBound=0) # Eccesso rispetto target
slack_neg = pulp.LpVariable("slack_neg", lowBound=0) # Deficit rispetto target
# Funzione obiettivo: minimizzare la deviazione assoluta dal target
# Pesi differenziati: penalizza di più il deficit (mancata fornitura)
prob += 1.5 * slack_neg + 1.0 * slack_pos, "MinimizeDeviation"
# Vincolo di bilanciamento della potenza
total_power = pulp.lpSum(p_vars[asset.id] for asset, _ in ders)
prob += (total_power - target_power_kw == slack_pos - slack_neg), "PowerBalance"
# Risoluzione (COIN-BC solver, open source)
solver = pulp.COIN_CMD(msg=False, timeLimit=5.0)
status = prob.solve(solver)
solve_time_ms = (time.time() - start_time) * 1000
solver_status = pulp.LpStatus[prob.status]
# Costruzione setpoint dal risultato
setpoints = []
achieved_power = 0.0
if status in [pulp.LpStatusOptimal := 1, -1]: # Optimal or Infeasible
for asset, telemetry in ders:
if asset.id in p_vars:
p_val = pulp.value(p_vars[asset.id]) or 0.0
achieved_power += p_val
if abs(p_val) > 0.1: # Ignora setpoint trascurabili
setpoints.append({
"der_id": asset.id,
"power_kw": round(p_val, 2),
"duration_minutes": duration_minutes,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Calcolo feasibility score: 1.0 se target raggiunto, < 1.0 se parziale
if abs(target_power_kw) > 0.1:
deviation = abs(achieved_power - target_power_kw) / abs(target_power_kw)
feasibility_score = max(0.0, 1.0 - deviation)
else:
feasibility_score = 1.0
logger.info(
f"Dispatch {dispatch_id}: target={target_power_kw}kW, "
f"achieved={achieved_power:.1f}kW, score={feasibility_score:.3f}, "
f"solver={solver_status}, time={solve_time_ms:.0f}ms"
)
return OptimizationResult(
dispatch_id=dispatch_id,
vpp_id=vpp_id,
target_power_kw=target_power_kw,
achieved_power_kw=round(achieved_power, 2),
setpoints=setpoints,
feasibility_score=round(feasibility_score, 4),
timestamp=datetime.now(timezone.utc).isoformat(),
solver_status=solver_status,
solve_time_ms=round(solve_time_ms, 1)
)
def _compute_effective_limits(
self,
asset: DERAsset,
telemetry: DERTelemetry,
duration_minutes: int
) -> Tuple[float, float]:
"""
Calcola i limiti effettivi di potenza per un DER considerando:
- Limiti fisici dichiarati
- SoC corrente per storage (quanta energia residua disponibile)
- Temperatura (semplificato)
"""
p_min = asset.capabilities.min_power_kw
p_max = asset.capabilities.max_power_kw
# Vincoli aggiuntivi per storage (BESS o EV)
if asset.capabilities.capacity_kwh and telemetry.soc_pct is not None:
cap_kwh = asset.capabilities.capacity_kwh
soc = telemetry.soc_pct / 100.0
min_soc = (asset.capabilities.min_soc_pct or 10.0) / 100.0
max_soc = (asset.capabilities.max_soc_pct or 95.0) / 100.0
efficiency = asset.capabilities.roundtrip_efficiency or 0.92
duration_h = duration_minutes / 60.0
# Energia disponibile in scarica (discharge -> positivo)
energy_available_discharge_kwh = (soc - min_soc) * cap_kwh * efficiency
p_max_soc = energy_available_discharge_kwh / duration_h if duration_h > 0 else 0
p_max = min(p_max, p_max_soc)
# Energia disponibile in carica (charge -> negativo = consumo)
energy_available_charge_kwh = (max_soc - soc) * cap_kwh / efficiency
p_min_soc = -(energy_available_charge_kwh / duration_h) if duration_h > 0 else 0
p_min = max(p_min, p_min_soc)
return p_min, p_max
Komunikační protokoly: Růženec standardů
Volba komunikačního protokolu hluboce ovlivňuje latenci, škálovatelnost a náklady integrace DERMS. Neexistuje žádný univerzální protokol: používá se každá úroveň hierarchie protokoly optimalizované pro vaše specifické potřeby.
| Protokol | Vrstvy | Doprava | Typická latence | Škálovatelnost | Hlavní případ použití |
|---|---|---|---|---|---|
| OpenADR 2.0b | Platforma → Místo | HTTP/XML nebo JSON | 1-30 sekund | Průměr (průzkum) | Reakce na poptávku, události DR |
| IEEE 2030.5 (SEP2) | Platforma → Zařízení | HTTPS/REST | 1-60 sekund | Vysoká (škálovatelný REST) | FV, sklad, bytový EV |
| SunSpec Modbus TCP | Edge → Zařízení | TCP/Modbus | 50-500 ms | Nízká (sekvenční dotazování) | FV střídač, BESS C&I |
| OCPP 2.0.1 | Platforma → Nabíječka | WebSocket/JSON | 100 ms-5s | Vysoká (WebSocket) | EV sloupy |
| MQTT | Edge → Platforma | TCP (TLS) | 10-500 ms | Velmi vysoká (makléř) | Obecná telemetrie IoT |
| IEC 60870-5-104 | SCADA → RTU | TCP | 50-200 ms | Průměrný | Rozvodny, starší RTU |
| DNP3 | SCADA → Pole | Serial/TCP | 100 ms-1s | Nízký | Starší nástroje SCADA |
| IEC 61850 GOOSE | Rozvodna | Ethernet (multicast) | < 4 ms | Vysoká (LAN) | Ochrany, SE automatizace |
OpenADR 2.0b: architektura VTN/VEN
# openadr_adapter.py - Client OpenADR 2.0b (VEN - Virtual End Node)
# Implementazione semplificata per illustrare il flusso di comunicazione
import httpx
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class DREvent:
"""Evento di Demand Response ricevuto dal VTN (utility/DERMS)"""
event_id: str
program_id: str
signal_name: str # "SIMPLE" | "ELECTRICITY_PRICE" | "LOAD_DISPATCH"
signal_type: str # "LEVEL" | "PRICE" | "X-LOAD_DISPATCH"
signal_value: float # Valore del segnale (es. livello 1/2/3 o prezzo EUR/MWh)
dtstart: str # ISO 8601 - inizio evento
duration_minutes: int
randomize_start_minutes: int = 0 # Randomizzazione per evitare picchi sincroni
class OpenADRVENClient:
"""
Virtual End Node (VEN): rappresenta un sito/aggregatore che riceve
eventi DR dal Virtual Top Node (VTN) del DERMS o della utility.
Flusso tipico OpenADR 2.0b in modalità PULL:
1. VEN -> VTN: oadrRequestEvent (richiede eventi disponibili)
2. VTN -> VEN: oadrDistributeEvent (lista eventi attivi)
3. VEN -> VTN: oadrCreatedEvent (conferma ricezione con optIn/optOut)
4. VEN -> VTN: oadrUpdateReport (report su energia effettivamente modificata)
"""
def __init__(self, vtn_url: str, ven_id: str, ven_name: str):
self.vtn_url = vtn_url.rstrip("/")
self.ven_id = ven_id
self.ven_name = ven_name
self._client = httpx.AsyncClient(timeout=30.0)
self._registered = False
self._active_events: dict = {}
async def register(self) -> bool:
"""Registrazione del VEN sul VTN - obbligatoria prima di ricevere eventi."""
payload = self._build_register_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiRegisterParty",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
# Parsing della risposta oadrCreatedParty
registration_id = self._extract_registration_id(root)
if registration_id:
self._registered = True
logger.info(f"VEN {self.ven_id} registrato con ID: {registration_id}")
return True
except Exception as e:
logger.error(f"Errore registrazione VEN: {e}")
return False
async def poll_events(self) -> list:
"""Polling degli eventi DR disponibili sul VTN."""
if not self._registered:
raise RuntimeError("VEN non registrato. Chiamare register() prima.")
payload = self._build_request_event_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
events = self._parse_distribute_event(root)
logger.info(f"VEN {self.ven_id}: ricevuti {len(events)} eventi DR")
return events
except Exception as e:
logger.error(f"Errore polling eventi: {e}")
return []
async def confirm_event(self, event_id: str, opt_in: bool = True) -> bool:
"""Conferma ricezione evento e comunicazione optIn/optOut."""
status = "optIn" if opt_in else "optOut"
payload = self._build_created_event_payload(event_id, status)
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
logger.info(f"Evento {event_id}: {status} confermato")
return True
except Exception as e:
logger.error(f"Errore conferma evento {event_id}: {e}")
return False
async def run_polling_loop(self, poll_interval_seconds: int = 60):
"""Loop di polling continuo - eseguito come task asyncio."""
logger.info(f"Avvio polling loop VEN {self.ven_id} ogni {poll_interval_seconds}s")
while True:
events = await self.poll_events()
for event in events:
if event.event_id not in self._active_events:
self._active_events[event.event_id] = event
# OptIn automatico (in produzione: logica di accettazione business)
await self.confirm_event(event.event_id, opt_in=True)
logger.info(
f"Nuovo evento DR: {event.event_id} | "
f"Segnale: {event.signal_name}={event.signal_value} | "
f"Durata: {event.duration_minutes} min"
)
await asyncio.sleep(poll_interval_seconds)
def _build_register_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRegisterReport specificationID="TELEMETRY_STATUS">
<ei:venID>{self.ven_id}</ei:venID>
</oadrRegisterReport>
</oadrSignedObject>
</oadrPayload>"""
def _build_request_event_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRequestEvent>
<ei:eiRequestEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:replyLimit>10</ei:replyLimit>
</ei:eiRequestEvent>
</oadrRequestEvent>
</oadrSignedObject>
</oadrPayload>"""
def _build_created_event_payload(self, event_id: str, status: str) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrCreatedEvent>
<ei:eiCreatedEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:eventResponses>
<ei:eventResponse>
<ei:responseCode>200</ei:responseCode>
<ei:requestID>{event_id}</ei:requestID>
<ei:qualifiedEventID>
<ei:eventID>{event_id}</ei:eventID>
<ei:modificationNumber>0</ei:modificationNumber>
</ei:qualifiedEventID>
<ei:optType>{status}</ei:optType>
</ei:eventResponse>
</ei:eventResponses>
</ei:eiCreatedEvent>
</oadrCreatedEvent>
</oadrSignedObject>
</oadrPayload>"""
def _parse_distribute_event(self, root: ET.Element) -> list:
"""Parser semplificato per oadrDistributeEvent."""
events = []
# In produzione: parsing completo con namespace XML OpenADR
# Qui simuliamo la struttura per chiarezza
return events
def _extract_registration_id(self, root: ET.Element) -> Optional[str]:
"""Estrae l'ID di registrazione dalla risposta VTN."""
return "reg-001" # Semplificato
Škálovatelnost: Od 1 000 do 1 000 000 DER
Škálovatelnost je nejkritičtější technickou výzvou moderních DERMS. Spravujte 1 000 DER a na dosah jakýkoli dobře navržený systém. Spravujte 1 000 000 DER s nižšími než sekundovými latencemi odeslání vyžaduje radikálně odlišnou architekturu inspirovanou velkoobjemovými systémy v průmyslu finanční a sociální média.
Čísla škálovatelnosti
Telemetrická zátěž plně funkční
- 1 000 DER: ~60 000 zpráv/hodinu (dotazování každých 60 s) – zvládnutelné pomocí jediné mikroslužby
- 100 000 DER: ~6 000 000 zpráv/hodinu (100 000 msg/min) – potřebuje sharding a Kafka
- 1 000 000 DER: ~60 000 000 zpráv/hodinu – vyhrazená architektura streamování událostí
Při průměrném užitečném zatížení 200 bajtů na zprávu generuje přibližně 1 milion DER 3,3 GB/hodinu nezpracované telemetrie, před kompresí (která obvykle vede k 300-400 MB/hod).
Kafkova architektura pro vysokou škálovatelnost DERMS
# kafka_derms_config.py - Configurazione Kafka per DERMS scalabile
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
from dataclasses import asdict
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# === TOPIC DESIGN ===
# Strategia: topic separati per tipo di dato, partitionati per DER ID
# La key del messaggio = der_id garantisce che tutti i messaggi dello stesso
# DER vadano alla stessa partizione (ordering garantito per dispositivo)
KAFKA_TOPICS = {
# Telemetria (alta frequenza, alta velocità)
"der.telemetry.raw": {
"partitions": 48, # 48 partizioni per parallelismo elevato
"replication_factor": 3,
"retention_ms": 86400000, # 24 ore (poi su time-series DB)
"compression_type": "lz4", # LZ4 per compressione veloce
"config": {"cleanup.policy": "delete"}
},
# Comandi di dispatch (bassa frequenza, alta affidabilità)
"der.dispatch.commands": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000, # 7 giorni
"compression_type": "gzip",
"config": {"cleanup.policy": "delete", "min.insync.replicas": "2"}
},
# Conferme dispatch (ack dai dispositivi)
"der.dispatch.acks": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000,
"config": {"cleanup.policy": "delete"}
},
# Aggregati VPP (output dell'aggregation service)
"vpp.portfolio.snapshots": {
"partitions": 4,
"replication_factor": 3,
"retention_ms": 2592000000, # 30 giorni
"config": {"cleanup.policy": "compact"} # Log compaction: mantieni ultima snapshot
},
# Alert e fault detection
"der.alerts": {
"partitions": 6,
"replication_factor": 3,
"retention_ms": 2592000000,
"config": {"cleanup.policy": "delete"}
}
}
class DERMSTelemetryProducer:
"""
Producer Kafka per la telemetria DER.
Usato dai Gateway Edge per pubblicare telemetria verso il cloud.
"""
def __init__(self, bootstrap_servers: str):
self._producer = Producer({
"bootstrap.servers": bootstrap_servers,
"client.id": "derms-telemetry-producer",
# Affidabilità: ACK da tutti i broker in-sync
"acks": "all",
# Performance: batching aggressivo per throughput
"linger.ms": 20,
"batch.size": 65536,
"compression.type": "lz4",
# Retry per fault tolerance
"retries": 5,
"retry.backoff.ms": 200,
"enable.idempotence": True # Exactly-once semantics
})
def publish_telemetry(self, der_id: str, telemetry: dict) -> None:
"""
Pubblica telemetria su Kafka.
La key = der_id garantisce ordering per dispositivo.
"""
payload = json.dumps({
**telemetry,
"_published_at": datetime.now(timezone.utc).isoformat()
}).encode("utf-8")
self._producer.produce(
topic="der.telemetry.raw",
key=der_id.encode("utf-8"),
value=payload,
on_delivery=self._delivery_callback
)
self._producer.poll(0) # Non-blocking flush
def flush(self, timeout: float = 10.0) -> None:
"""Flush dei messaggi in coda prima dello shutdown."""
pending = self._producer.flush(timeout=timeout)
if pending > 0:
logger.warning(f"{pending} messaggi non ancora consegnati dopo flush")
def _delivery_callback(self, err, msg):
if err:
logger.error(f"Errore consegna messaggio: {err}")
else:
logger.debug(f"Messaggio consegnato: topic={msg.topic()}, partition={msg.partition()}")
class DERMSDispatchConsumer:
"""
Consumer Kafka per i comandi di dispatch.
Ogni Site Aggregator consuma dal topic dispatch.commands
i setpoint relativi ai propri DER.
"""
def __init__(self, bootstrap_servers: str, group_id: str, site_id: str):
self.site_id = site_id
self._consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"client.id": f"site-aggregator-{site_id}",
"auto.offset.reset": "latest", # Solo messaggi recenti (no backlog storico)
"enable.auto.commit": False, # Commit manuale dopo elaborazione
"max.poll.interval.ms": 30000,
"session.timeout.ms": 10000
})
self._consumer.subscribe(["der.dispatch.commands"])
def process_commands(self, timeout_seconds: float = 1.0):
"""Poll e processa comandi di dispatch per questo sito."""
msg = self._consumer.poll(timeout=timeout_seconds)
if msg is None:
return None
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
return None
logger.error(f"Errore consumer: {msg.error()}")
return None
command = json.loads(msg.value().decode("utf-8"))
# Filtra solo i comandi per i DER di questo sito
if command.get("site_id") == self.site_id:
logger.info(
f"Sito {self.site_id}: ricevuto dispatch "
f"der={command['der_id']} power={command['power_kw']}kW"
)
# Commit esplicito dopo elaborazione riuscita (at-least-once)
self._consumer.commit(msg)
return command
return None
CQRS a Event Sourcing v DERMS
Ve vysoce škálovatelném DERMS vzor CQRS odděluje:
Příkazová strana: odesílání příkazů na Kafka (neměnné, pouze pro připojení)
- každá odeslaná požadovaná hodnota se stane trvalou událostí v protokolu.
Strana dotazu: Zhmotněné projekce v Redis (mezipaměť aktuálního stavu
každého DER) a v InfluxDB/TimescaleDB (časové řady pro prognózování a M&V).
Tento vzor umožňuje v případě chyb přepočítat stav systému od začátku
projekce, jednoduše přečtením protokolu událostí (Sourcing událostí).
Odezva poptávky: Teorie a realizace
Demand Response (DR) je schopnost modifikovat spotřebu elektřiny v reakci na signály sítě nebo trhu. Je to jedna z nejziskovějších služeb, které může VPP nabídnout, a v dané době sám o sobě jeden z nejsložitějších na správnou implementaci kvůli požadavkům na měření a ověřování (M&V).
Typy DR programů
| DR program | Signál | Dodací lhůta | Typická doba trvání | Typická odměna (IT) |
|---|---|---|---|---|
| Rychlá rezervace (FR) | Frekvence sítě (automatická) | < 1 sekunda | 15 minut | ~20-30 EUR/MW/hod |
| Sekundární rezerva (RS) | signál AGC Terna | Sekundy | 15 min - hodin | ~15-25 EUR/MW/hod |
| Terciální rezervace (RT) | Terna explicitní odeslání | 15 minut | 1-4 hodiny | ~5-15 EUR/MWh |
| Zůstatek (MB) | Nabídka na trhu v reálném čase | 5-30 minut | 15 min - hodin | Tržní cena (proměnná) |
| Přerušitelnost DR | Volání operátora | 15-30 minut | 1-4 hodiny | ~30 000-50 000 EUR/MW/rok |
| Usnesení ARERA 300/2017 | Signál GSE (pobídka CER) | Zápis | Variabilní | Motivační prémie GSE |
Výpočet základní linie a M&V
# mv_service.py - Measurement & Verification per Demand Response
# Calcola la riduzione effettiva di carico rispetto alla baseline
import numpy as np
from typing import List, Tuple
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class MVService:
"""
Measurement & Verification (M&V) per programmi Demand Response.
Metodo: CBL (Customer Baseline Load) - approccio standard FERC/ENTSO-E
La baseline e calcolata come media degli N giorni simili più recenti
prima dell'evento, escludendo giorni con altri eventi DR.
"""
def __init__(self, n_baseline_days: int = 10, exclude_top_bottom: bool = True):
self.n_baseline_days = n_baseline_days
self.exclude_top_bottom = exclude_top_bottom # High-5 / Low-5 exclusion
def calculate_baseline(
self,
site_id: str,
event_date: datetime,
event_hour: int,
historical_consumption: dict # {date_str: {hour: kw}}
) -> dict:
"""
Calcola la Customer Baseline Load (CBL) per il sito.
Algoritmo CBL con High-5/Low-5 exclusion (CAISO/PJM standard):
1. Seleziona N giorni simili recenti (stessa tipologia giorno: lavorativo/festivo)
2. Esclude il top 20% e il bottom 20% dei giorni per consumo nell'ora evento
3. Media i restanti giorni
"""
target_weekday = event_date.weekday() # 0=Lunedi, 6=Domenica
is_target_workday = target_weekday < 5 # Lavorativo vs weekend
# Raccolta dati storici compatibili
similar_days = []
check_date = event_date - timedelta(days=1)
while len(similar_days) < self.n_baseline_days and check_date > event_date - timedelta(days=60):
date_str = check_date.strftime("%Y-%m-%d")
is_check_workday = check_date.weekday() < 5
# Stesso tipo di giorno (lavorativo/festivo)
if is_check_workday == is_target_workday and date_str in historical_consumption:
day_data = historical_consumption[date_str]
if event_hour in day_data:
similar_days.append({
"date": date_str,
"consumption_kw": day_data[event_hour]
})
check_date -= timedelta(days=1)
if len(similar_days) < 3:
logger.warning(f"Dati insufficienti per baseline sito {site_id}: solo {len(similar_days)} giorni")
return {"baseline_kw": None, "method": "insufficient_data", "days_used": len(similar_days)}
consumptions = [d["consumption_kw"] for d in similar_days]
# High-5/Low-5 exclusion (standard CAISO)
if self.exclude_top_bottom and len(consumptions) >= 10:
n_exclude = max(1, len(consumptions) // 5) # 20% per lato
sorted_consumptions = sorted(consumptions)
filtered_consumptions = sorted_consumptions[n_exclude:-n_exclude]
else:
filtered_consumptions = consumptions
baseline_kw = np.mean(filtered_consumptions)
return {
"site_id": site_id,
"baseline_kw": round(baseline_kw, 2),
"method": "CBL_HighLow_Exclusion",
"days_analyzed": len(similar_days),
"days_used": len(filtered_consumptions),
"std_dev_kw": round(np.std(filtered_consumptions), 2)
}
def calculate_demand_reduction(
self,
site_id: str,
baseline_kw: float,
actual_consumption_kw: float,
event_duration_hours: float
) -> dict:
"""
Calcola la riduzione di carico e l'energia risparmiata.
Risultato usato per il settlement del programma DR.
"""
reduction_kw = max(0, baseline_kw - actual_consumption_kw)
reduction_pct = (reduction_kw / baseline_kw * 100) if baseline_kw > 0 else 0
energy_reduced_kwh = reduction_kw * event_duration_hours
return {
"site_id": site_id,
"baseline_kw": baseline_kw,
"actual_kw": actual_consumption_kw,
"reduction_kw": round(reduction_kw, 2),
"reduction_pct": round(reduction_pct, 1),
"energy_reduced_kwh": round(energy_reduced_kwh, 3),
"verified": reduction_pct >= 5.0 # Soglia minima per settlement
}
Italský kontext: CER, GSE a trh doplňkových služeb
Itálie představuje jeden z nejzajímavějších trhů v Evropě pro DERMS a VPP, a to díky a rychle se vyvíjející regulační prostředí a jedna z nejvyšších fotovoltaických základen v Evropě (přibližně 37 GW fotovoltaiky instalované na konci roku 2024, cíl 80 GW do roku 2030).
Společenství obnovitelné energie (CER)
Vyhláška MASE č. 127 ze dne 16. května 2025, zveřejněné 25. června 2025, zavedlo důležité novinky pro CER, rozšíření pobídek i pro obce do 50 000 obyvatel. CER představují italská laboratoř pro distribuovanou agregaci DER:
Struktura pobídek CER (následující legislativní vyhláška 199/2021 a ministerská vyhláška 2025)
- Motivační sazba: rozpoznáno na sdílené energii, závisí na geografické oblasti a velikosti systému. Pro systémy < 200 kWp: 80-110 EUR/MWh (střed-sever), 90-120 EUR/MWh (jih a ostrovy)
- Poplatek ARERA: valorizace vlastní spotřebované energie ~8 EUR/MWh (náhrada tarifních složek)
- Trvání: 20 let od data uvedení do provozu
- Povolený výkon: až 1 MW pro jednu elektrárnu (možnost více elektráren ve stejném CER)
- Geografický požadavek: odběrná místa musí být připojena pod stejnou primární rozvodnou
Italský trh doplňkových služeb (MSD/MGP)
Terna, italský TSO, řídí trhy, na kterých mohou VPP nabídnout flexibilitu. v roce 2025 díky postupnému zavádění UVAM (Mixed Virtual Units) a UVAC (Virtual Units). Consumption Enabled), agregované distribuované zdroje se mohou také účastnit MSD:
| Trh | Akronym | Typologie | Horizont | VPP/UVAM přístup |
|---|---|---|---|---|
| Den před trhem | MGP | Energie na den | D-1 9:00 | Ano (přes BSP) |
| Vnitrodenní trh | MI | Intradenní energie | D-0 na 6 sezeních | Ano (přes BSP) |
| Ex-ante trh doplňkových služeb | MSD ex-ante | Rezerva, vyrovnávání | D-1 až D-0 | Ano (UVAM povoleno) |
| Vyrovnávací trh | MB | Vyrovnávání v reálném čase | D-0 v reálném čase | Ano (UVAM s latencí < 15 min) |
| Rychlá rezerva | FR | Ultra rychlá rezerva | Automatický | Pouze BESS s latencí < 1s |
| MACSE (mechanismus nákupu úložné kapacity) | MACSE | skladovací kapacita | 15 let | Skladování pouze na mřížce |
MACSE v roce 2025
První aukce MACSE, která se konala v září 2025, poskytla v oblastech kapacitu 10 GWh Střed, jih a ostrovy s 15letými smlouvami o mýtném za průměrné ceny kolem 13 000 EUR/MWh/rok. Tento mechanismus zaručuje stabilní výnosy pro velké sítě BESS systémy, ale ne např přístupné malým agregovaným VPP. U obytných/komerčních VPP trasa hlavní zůstává MSD přes UVAM.
PNRR a investice do flexibility
Přechod PNRR 5.0 vyčlenil značné prostředky na digitalizaci energetiky a energetická společenství. Kromě opatření pro CER plán zahrnuje investice do modernizace distribučních sítí (2G chytré měřiče, automatizace rozvoden) které představují infrastrukturu umožňující novou generaci DERMS.
Případová studie: Italský regionální VPP (5 000 FV + 500 BESS)
Analyzujeme konkrétní případ návrhu a dimenzování regionální VPP v Itálii, na základě realistických parametrů italského trhu 2025.
Referenční scénář
Portfolio VPP "SunFlex Puglia"
- Rezidenční FV: 4 500 systémů, střední velikost 5 kWp, celkem 22,5 MWp
- FV C&I: 500 systémů, průměrná velikost 80 kWp, celkem 40 MWp
- Rezidenční BESS: 450 systémů, střední velikost 10 kWh/5 kW, celkem 4,5 MWh/2,25 MW
- BESS C&I: 50 systémů, střední velikost 500 kWh/250 kW, celkem 25 MWh/12,5 MW
- Celková kapacita: 62,5 MWp FVE + 29,5 MWh/14,75 MW BESS
- Zeměpisná oblast: Puglia (jižní oblast – vysoká ozáření, vysoká penetrace PV)
Toky tržeb VPP
| Stream tržeb | Trh | Síla/energie | Odhadovaný příjem | Poznámky |
|---|---|---|---|---|
| Prodej fotovoltaické energie | MGP den dopředu | 62,5 MW (špička), ~1 750 ekv. hodin/rok | ~5,2 mil. EUR/rok | Při průměrné spotové ceně ~48 EUR/MWh |
| Rychlá rezervace BESS C&I | FR Terna | 12,5 MW (24/7) | ~2,7 mil. EUR/rok | ~25 EUR/MW/hod x 8 760 hodin |
| MSD vyvážení | MSD/MB | Ekvivalent 5 MW (BESS + DR) | ~0,8 mil. EUR/rok | Pay-as-bid, vysoká variabilita |
| Krácení FV (doplňkové) | MSD ex-ante | K dispozici 20MW omezení | ~0,4 mil. EUR/rok | Vylepšení redukce |
| Pobídka CER (5 CER po 1 MW) | GSE - CER | 5 MWp v konfiguraci CER | ~0,6 mil. EUR/rok | Jižní motivační tarif: ~120 EUR/MWh |
| CELKOVÝ | ~9,7 mil. EUR/rok | Hrubá částka, před náklady na platformu a síť |
Zásobník technologií platformy
# docker-compose.yml - Stack DERMS per VPP regionale
# Configurazione di sviluppo/staging (produzione su Kubernetes)
version: "3.9"
services:
# ========================
# INGESTION LAYER
# ========================
# Broker MQTT per telemetria edge
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT non sicuro (solo LAN interna)
- "8883:8883" # MQTT over TLS (produzione)
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- mosquitto-data:/mosquitto/data
# Apache Kafka (broker eventi principale)
kafka:
image: confluentinc/cp-kafka:7.6.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 # 3 in produzione
KAFKA_LOG_RETENTION_HOURS: 168 # 7 giorni
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# ========================
# PROCESSING LAYER
# ========================
# DERMS Aggregation Service (FastAPI)
aggregation-service:
build: ./services/aggregation
ports:
- "8080:8080"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
INFLUXDB_URL: http://influxdb:8086
INFLUXDB_TOKEN: {{INFLUXDB_TOKEN}}
INFLUXDB_ORG: sunflex-puglia
INFLUXDB_BUCKET: der-telemetry
depends_on:
- kafka
- redis
- influxdb
# Dispatch Optimizer Service
dispatch-optimizer:
build: ./services/dispatch
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
SOLVER: COIN_CMD # o CPLEX in produzione per portfolio grandi
depends_on:
- kafka
- redis
# Forecasting Service (ML - produzione FV + carico)
forecasting-service:
build: ./services/forecasting
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
INFLUXDB_URL: http://influxdb:8086
MODEL_REGISTRY_URL: http://mlflow:5000
WEATHER_API_KEY: {{OPENMETEO_API_KEY}}
depends_on:
- kafka
- influxdb
- mlflow
# OpenADR VTN (Virtual Top Node - invia eventi DR ai siti)
openadr-vtn:
build: ./services/openadr-vtn
ports:
- "8081:8081"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
POSTGRES_URL: postgresql://postgres:5432/derms
depends_on:
- kafka
- postgres
# ========================
# STORAGE LAYER
# ========================
# Time-Series Database per telemetria
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
volumes:
- influxdb-data:/var/lib/influxdb2
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_ORG: sunflex-puglia
DOCKER_INFLUXDB_INIT_BUCKET: der-telemetry
DOCKER_INFLUXDB_INIT_RETENTION: 30d
# Cache per stato corrente DER (Device Shadow)
redis:
image: redis:7.2-alpine
command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
# Database relazionale per asset registry, events, settlement
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: derms
POSTGRES_USER: derms_user
POSTGRES_PASSWORD: {{POSTGRES_PASSWORD}}
volumes:
- postgres-data:/var/lib/postgresql/data
# ========================
# OBSERVABILITY
# ========================
# MLflow per tracking modelli forecasting
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --backend-store-uri postgresql://mlflow:5432/mlflow
# Grafana per dashboard operativo
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./config/grafana/dashboards:/etc/grafana/provisioning/dashboards
volumes:
influxdb-data:
postgres-data:
redis-data:
grafana-data:
mosquitto-data:
VPP Provozní KPI a SLA
| KPI | Cíl | Opatření | Dopad, pokud není respektován |
|---|---|---|---|
| Latence odeslání (95. percentil) | < 500 ms | Čas od příkazu k nastavené hodnotě na zařízení | Nekvalifikuje se do Fast Reserve Terna |
| Svěžest telemetrie | < 60 sekund | Maximální stáří telemetrických dat pro odeslání | Optimalizace na základě zastaralých dat |
| Dostupnost DER (online sazba) | > 95 % | Procento DER dosažitelné kdykoli | Snížení kapacity VPP, tržní sankce |
| Přesnost odeslání | > 90% skóre proveditelnosti | Průměrná odchylka od cíle odeslání | Sankce na trhu MSD/MB |
| Předpověď MAPE (FV 1h dopředu) | < 8 % | Střední absolutní procentní chyba | Ztráty z nerovnováhy na trhu |
| Doba provozu platformy | > 99,5 % | Dostupnost cloudu DERMS | Ztráta závazků flexibility |
| Kybernetická bezpečnost - MTTR | < 4 hodiny | Průměrná doba odezvy na bezpečnostní incident | Požadavek NIS2 (platný od října 2024) |
Osvědčené postupy a anti-vzory v DERMS
Nejlepší postupy
1. Vzor stínu zařízení
Vždy mějte aktualizovaný "stín" pro každý DER v mezipaměti (Redis): obsahuje nejnovější stav známé zařízení (SoC, aktuální výkon, stav) s časovým razítkem. Odeslání nečeká na telemetrie v reálném čase: používá stín, který je aktualizován asynchronně. Tím se snižuje latence odeslání od sekund do milisekund.
2. Půvabná degradace v úrovních
Definujte jasné provozní úrovně: NORMAL (cloud + edge), DEGRADED (pouze okraj, optimalizace zjednodušené místní), MINIMÁLNÍ (pouze bezpečnostní ochrany - žádné ekonomické expedice). Systém musí vždy vědět, na jaké úrovni se nachází, a sdělit to trhům prostřednictvím notifikací aktualizovaná dostupnost.
3. Impotency v Dispatch Commands
Každý příkaz k odeslání musí mít jedinečné ID. Adaptéry zařízení musí implementovat idempotency: přijetí stejného příkazu dvakrát (pro opakování) nesmí způsobit dvě odeslání. Pro deduplikaci použijte protokol posledních příkazů s TTL.
4. Oddělení mezi plánováním a v reálném čase
Plánování (plánování tržních nabídek, 24 hodin předem) využívá optimalizační modely složité a pomalé (MILP s minutami doby řešení). Odesílání v reálném čase používá zjednodušené LP které se vyřeší v milisekundách. Nikdy nemíchejte tyto dvě cesty ve stejném procesu.
Anti-vzory, kterým je třeba se vyhnout
Anti-Pattern 1: Kaskádové synchronní dotazování
Nejběžnější vzor v první generaci DERMS: centrální server dotazuje každý DER v pořadí. S 1 000 DER a 10 sekundami časového limitu na zařízení je cyklus dotazování dokončen trvá 10 000 sekund (téměř 3 hodiny!). Řešení: paralelní dotazování s poolem připojení + řízené událostmi pro zařízení, která podporují push.
Anti-Pattern 2: Neomezená optimalizace SoC
Agresivní odesílání bez respektování omezení SoC baterií vede k cyklům nabíjení/vybíjení hluboké, které rychle degradují buněčné balíčky (30-50% zkrácení životnosti během několika měsíců). Každý optimalizátor musí vždy obsahovat omezení SoC jako tvrdá omezení, nikoli měkká.
Anti-Pattern 3: Relační databáze pro telemetrii
Použití PostgreSQL nebo MySQL k ukládání milionů telemetrických bodů za sekundu způsobuje rychlé problémy s výkonem a neudržitelné náklady na úložiště. Databáze časových řad (InfluxDB, TimescaleDB, QuestDB) komprimovat data 10-50x ve srovnání s relačními databázemi a podpora optimalizovaných dočasných dotazů (funkce oken, automatické převzorkování).
Anti-Pattern 4: Ignorování topologie sítě
Agregace DER bez uvážení fyzické topologie distribuční sítě může vést k situace, kdy vyslání VPP způsobí místní přetížení, narušení napětí popř přetížení transformátorů. Dalším nezbytným krokem je integrace s ADMS PDS pro zralou DERMS. Evropský projekt ATTEST (2021-2024) definoval standardy pro tuto integraci.
Zabezpečení a dodržování předpisů pro DERMS
DERMS řídí kritickou infrastrukturu: úspěšný kybernetický útok může způsobit výpadky lokalizovat nebo destabilizovat síť. Směrnice NIS2 (implementovaná v Itálii legislativním nařízením 138/2024, s účinností od října 2024) klasifikuje DERMS jako „provozovatele nezbytných služeb“ s výhradou přísné povinnosti v oblasti kybernetické bezpečnosti.
Klíčové bezpečnostní požadavky pro DERMS
- Ověření zařízení: Každý DER se ověřuje pomocí certifikátů X.509 (PKI) – žádné sdílené statické přihlašovací údaje
- Šifrování při přenosu: TLS 1.3 povinné pro veškerou komunikaci – včetně starších protokolů (MQTT přes TLS, HTTPS pro OpenADR/IEEE 2030.5)
- Síť s nulovou důvěrou: žádné zařízení a ve výchozím nastavení "důvěryhodné" - každý požadavek je ověřen a autorizován
- Granulovaný RBAC: Operátoři, agregátoři a vlastníci DER mají přísně odlišená oprávnění
- Neměnný protokol auditu: každý příkaz k odeslání je zaprotokolován s digitálním podpisem (neodmítnutelný pro vyřízení)
- Integrace SIEM: detekce anomálií v reálném čase na telemetrických vzorech a příkazech
- MTTR < 4 hodiny: Požadavek NIS2 pro reakci na incidenty
- Segmentace sítě: fyzické/logické oddělení mezi OT (polní protokoly) a IT (cloud DERMS)
Závěry a další kroky
Architektura moderního DERMS je jednou z nejsložitějších výzev softwarového inženýrství energetický sektor: vyžaduje integraci heterogenních protokolů vyvinutých v různých desetiletích (Modbus z roku 1979, OpenADR z roku 2009, ISO 15118 z roku 2022), rozsah od tisíců do milionů distribuovaná zařízení udržující subsekundové latence a fungují v regulačním prostředí neustále se vyvíjející.
Klíčové body k zapamatování:
- DERMS není SCADA: funguje v doméně koncového zákazníka, překračuje hranici metru, se všemi právními důsledky a důsledky pro správu souhlasu, které z toho vyplývají.
- Vícevrstvá architektura (Field / Edge / Platform / Market) a povinné oddělení odpovědností – není možné
- Kafka + CQRS + Event Sourcing a de facto zásobník na škálování nad 100 000 DER – architektury relačních databází neobstojí
- Optimalizace odeslání přes LP/MILP je matematicky dobře definovaná, ale musí vždy respektovat fyzická omezení (SoC, rychlost rampy), jako jsou tvrdá omezení
- Italský kontext (CER, UVAM, MACSE, NIS2) se rychle vyvíjí: konkurenční výhoda je postavena společně s regulačními a technickými dovednostmi
Další článek ze série EnergyTech se ponoří do Battery Management System (BMS): řídicí algoritmy pro úložné systémy (BESS), z odhadu stavu nabití (SoC). s Kalmanovým filtrem pro tepelnou ochranu a vyvážení článků. Nezbytné čtení pro kdokoli, kdo pracuje s úložnými systémy integrovanými do VPP.
Zdroje a statistiky
- Specifikace OpenADR 2.0b: openadr.org (stažení zdarma po registraci)
- IEEE 2030.5-2023: IEEE Xplore (placené), bezplatný přehled na smartgrid.ieee.org
- Provozní řád CACER/CER: gse.it (Příloha 1, červenec 2025)
- Dokumentace Terna UVAM: terna.it, Oddíl expedice
- PuLP (řešič Python LP): coin-or.github.io/pulp
- Konfluentní klient Kafka Python: github.com/confluentinc/confluent-kafka-python
- FERC Order 2222 (americký benchmark pro agregaci DER): ferc.gov
- Projekt ATTEST EU (koordinace DSO-TSO): atest-project.eu
Související články
- série MLOps: pro nasazení FV předpovědních modelů ve výrobě integrované do DERMS - viz série MLOps na tomto blogu
- AI Engineering / řada RAG: LLM za provozní pomoc operátorům VPP (dotaz v přirozeném jazyce na tržní data, inteligentní upozornění)
- Řada PostgreSQL AI: pgvector pro vyhledávání podobnosti vzorců spotřeby historické (užitečné pro CBL a prognózy)







