Arhitectura DERMS: agregarea a milioane de resurse distribuite
În 2025, pentru prima dată în istorie, mai mult de jumătate din energia electrică produsă în Europa va proveni din din surse regenerabile. O realizare extraordinară, care însă aduce cu sine o provocare la fel de extraordinară: cum se administrează o rețea de energie electrică în care producția nu mai este concentrată în câteva centrale mari, ci distribuite în milioane de sisteme fotovoltaice pe acoperișuri, acumulatori în garaje, vehicule electrice conectat la priză, pompe de căldură inteligente și micro-cogeneratoare industriale.
Răspunsul la această provocare are un nume: DERME, Sistem de management al resurselor energetice distribuite. Este o platformă software care agregă, monitorizează, optimizează și coordonează în timp real mii de - sau milioane - de resurse energetice distribuite (DER), transformându-le în un activ flexibil și controlabil pentru rețea. Fără un DERMS, creșterea surselor regenerabile distribuite riscă să destabilizeze rețeaua în loc să o decarboneze.
Piața DERMS este în plină expansiune: estimările pentru 2025 variază între 1,1 $ și 1,42 miliarde $ in functie de sursa, cu previziuni de crestere pana la 2,2 miliarde de dolari până în 2030 la un CAGR cu 14-16%. Dar adevărata revoluție are loc din punct de vedere arhitectural: scalarea de la 1.000 la 1.000.000 de dispozitive distribuit, păstrând latența sub 500 ms pentru expedierea în timp real, este o problemă de inginerie software pe care foarte puțini l-au rezolvat în producție.
În acest articol ne aprofundăm în întreaga arhitectură a unui DERMS modern: de la protocoalele de comunicare câmp (OpenADR 2.0b, IEEE 2030.5, SunSpec Modbus) către platformele cloud bazate pe evenimente cu Kafka, de la matematica optimizării dispecerului (programare liniară cu PuLP) către piața serviciilor auxiliare italiană (MSD/MGP din Terna). Cu cod Python funcțional și un studiu de caz real al VPP regional italian.
Ce veți învăța în acest articol
- Definirea și poziționarea DERMS în ecosistemul de utilități (diferențe cu EMS, SCADA, ADMS)
- Tipuri de DER: PV rezidențial, BESS, EV/V2G, răspuns la cerere, cogenerare
- Arhitectură cu mai multe straturi: câmp, margine, platformă, piață
- Centrală electrică virtuală: cum să agregați mii de DER într-un singur activ de pe piață
- Implementare Python: serviciu FastAPI + optimizare dispecer cu PuLP
- Protocoale de comunicație: OpenADR 2.0b, IEEE 2030.5, MQTT, SunSpec Modbus
- Scalabilitate bazată pe evenimente cu Apache Kafka și CQRS pentru 1 milion de DER
- Răspuns la cerere: programe DR, M&V, evenimente OpenADR
- Context italian: CER, GSE, MACSE, MSD/MGP Terna
- Studiu de caz: VPP regional cu 5.000 PV + 500 BESS
Seria EnergyTech - 10 articole
| # | Articol | Stat |
|---|---|---|
| 1 | Smart Grid și IoT: Arhitectură pentru rețeaua electrică a viitorului | Publicat |
| 2 | Arhitectura DERMS: agregarea a milioane de resurse distribuite (sunteți aici) | Actual |
| 3 | Sistem de management al bateriei: algoritmi de control pentru BESS | Următorul |
| 4 | Digital Twin al rețelei electrice cu Python și Pandapower | În curând |
| 5 | Prognoza energiei regenerabile: ML pentru PV și eolian | În curând |
| 6 | Echilibrarea sarcinii EV: V2G și încărcare inteligentă cu OCPP | În curând |
| 7 | MQTT și InfluxDB pentru telemetrie energetică în timp real | În curând |
| 8 | IEC 61850: Comunicarea în stația electrică | În curând |
| 9 | Software de contabilizare a carbonului: Măsurarea și reducerea emisiilor | În curând |
| 10 | Blockchain pentru tranzacționarea energiei P2P în CER | În curând |
Ce este un DERMS și cum este poziționat în ecosistemul de utilități
Înainte de a intra în arhitectura tehnică, este esențial să clarificăm ce deosebește un DERMS de alții sisteme de management al energiei pe care utilitățile le folosesc de zeci de ani. Confuzia terminologică din aceasta industrie și mare, iar vânzătorii folosesc adesea aceste acronime în mod interschimbabil în scopuri de marketing.
Ierarhia sistemelor de management
În ecosistemul unei utilități moderne coexistă mai multe sisteme, fiecare cu responsabilități specifice:
| Sistem | Acronim | Domeniu | Resurse gestionate | Latența tipică |
|---|---|---|---|---|
| Sistem de management al energiei | EMS | Transmisie (HV) | Centrale mari, interconexiuni | Secunde-minute |
| EXPIRĂ | EXPIRĂ | Transmisie + Distributie | Întrerupătoare, transformatoare, linii | 100ms - 1s |
| Sistem avansat de management al distribuției | ADMS | Distribuție (MT/LV) | Rețea de distribuție, cabine | secunde |
| Sistem de management al resurselor energetice distribuite | DERME | Distributie + Client final | FV, BESS, EV, DR, VPP | 100 ms - 5 minute |
| Sistem de management al energiei la domiciliu | HEMS | Client rezidential | Dispozitive pentru o singură casă | Secunde-minute |
DERMS ocupă o poziție unică: este primul sistem care traversează limita contorului, intrarea în domeniul clientului final. Acest lucru creează implicații juridice (consimțământ, confidențialitatea datelor), tehnic (interoperabilitate cu mii de mărci/modele de dispozitive) și de afaceri (cine deține datele? cine împarte veniturile?).
Standard de referință: IEEE 2030.x și OpenADR
Două familii de standarde conduc ecosistemul DERMS:
IEEE 2030.5 (Smart Energy Profile 2.0 / SEP2)
Publicat în 2013 și actualizat în 2023 (IEEE 2030.5-2023, decembrie 2024), definește protocolul comunicarea între utilitățile și dispozitivele distribuite clientului. Bazat pe arhitectura RESTful pe HTTP/HTTPS, acceptă TLS 1.2+ pentru securitate. Acoperă: răspuns la cerere, control al sarcinii, prețuri dinamic, management DER (fotovoltaic, stocare, EV). Și mandatul California (Regula 21) pentru toată lumea noile sisteme fotovoltaice și de stocare. A introdus caracteristici specifice DER în profilul 2023.
OpenADR 2.0b
Open Automated Demand Response, dezvoltat de OpenADR Alliance. Versiunea 2.0b și profilul complet pentru servere și clienți sofisticați (2.0a și pentru dispozitive simple). Utilizați XML/JSON peste HTTP, definește un Virtual Top Node (VTN - DERMS/utilitate) și Virtual End Node (VEN - dispozitivul/agregatorul). Suportă modurile push (VTN initia) și pull (VEN necesită). În 2025, primele produse certificate S-au anunțat OpenADR 3.0 (Platforma E.ON SWITCH), dar 2.0b rămâne referința operațională pentru marea majoritate a implementărilor globale.
Tipuri de DER: Bestiarul resurselor distribuite
Un DERMS trebuie să gestioneze o grădină zoologică cu tehnologii eterogene, fiecare cu caracteristici fizice diferite, diferite interfeţe de comunicare şi diferite constrângeri operaţionale. Cunoașterea lor în detaliu este o condiție prealabilă pentru a proiecta un sistem eficient de agregare.
| tip DER | capacitate tipică | Controlabilitate | Latența comunicării | Protocolul principal | Constrângeri cheie |
|---|---|---|---|---|---|
| PV rezidential | 3-10 kWp | Reducere, rata de rampă | 5-60 de secunde | IEEE 2030.5, SunSpec | Depinde de iradiere |
| FV C&I (comercial și industrial) | 50-5.000 kWp | Reducere, putere reactivă | 1-5 secunde | Modbus TCP, DNP3 | Contract PPA, constrângeri de rețea |
| BESS Rezidențial | 5-15 kWh / 3-10 kW | Ridicat (încărcare/descărcare/standby) | 100 ms - 2 secunde | SunSpec, IEEE 2030.5 | SoC min/max, cicluri de viață |
| BESS C&I / Grid-Scale | 100 kWh - 1 GWh | Foarte mare, răspunsul ms | 50-500 ms | Modbus TCP, IEC 60870, IEC 61850 | Temperatura, SoC, degradare |
| EV (V2G de la vehicul la rețea) | 7-100 kW bidirectional | Mare dacă este conectat și activat | 1-10 secunde (OCPP 2.0.1) | OCPP 2.0.1, ISO 15118 | SoC utilizator, timpi de încărcare |
| EV (încărcare inteligentă V1G) | 3,7-22 kW mondial-direcțional | Medie (doar reducerea) | 5-30 de secunde | OCPP 1.6/2.0.1 | Preferințele utilizatorului, SoC țintă |
| Răspuns la cerere (încărcări industriale) | 50kW - 50MW | Mare dacă este precalificat | 10-300 de secunde | OpenADR 2.0b | Durata evenimentului, recuperare |
| Pompe de căldură (CP) | 3-20 kW termic | Medie (decalaj de timp) | 30-300 de secunde | Modbus, OpenADR | Confort termic, punct de referință |
| Micro-cogenerare (CHP) | 1-1.000 kWe | Înalt (rampable) | 1-30 de secunde | Modbus TCP, OPC-UA | Eficiență termică, gaz |
Complexitatea eterogenității
Un adevărat DERMS trebuie să interfațeze cu sute de modele diferite de invertoare, BMS, coloane încărcare industrială și controlere. Fiecare producător implementează protocoale ușor diferit, cu erori specifice, timeout-uri non-standard și subseturi de funcționalități. Un strat de driver robust cu adaptor model și prima prioritate arhitecturală înainte de a se gândi măcar la optimizare.
Arhitectura software DERMS: modelul multistrat
Un DERMS modern este împărțit în patru straturi distincte, fiecare cu responsabilități bine definite și tehnologii specifice. Separarea clară între straturi este fundamentală pentru scalabilitate și mentenabilitatea sistemului.
# Architettura DERMS - Vista ad alto livello
+================================================================+
| LAYER 4: MARKET |
| Mercati Energia: MGP, MSD, MO, Capacity Market |
| DSO Flexibility Markets, Aggregatori terzi |
| Revenue stacking, Portfolio optimization |
+================================================================+
| |
Bid/Offer API Settlement data
| |
+================================================================+
| LAYER 3: PLATFORM (Cloud DERMS) |
| |
| +------------------+ +------------------+ |
| | Forecasting | | Dispatch Engine | |
| | (ML: FV, load, | | (Optimization: | |
| | EV availability)| | LP/MILP/MPC) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Aggregation | | Market Interface | |
| | Service (VPP | | (Bid builder, | |
| | portfolio mgmt) | | settlement) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Event Bus | | Time-Series DB | |
| | (Apache Kafka) | | (InfluxDB/ | |
| | | | TimescaleDB) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Device Registry | | API Gateway | |
| | (DER catalog, | | (REST, WebSocket,| |
| | metadata, caps) | | gRPC) | |
| +------------------+ +------------------+ |
+================================================================+
| |
Commands (dispatch) Telemetry (status)
| |
+================================================================+
| LAYER 2: EDGE |
| |
| +------------------+ +------------------+ |
| | Site Aggregator | | Protocol Gateway | |
| | (building/plant | | (Modbus->MQTT, | |
| | controller) | | SunSpec->JSON) | |
| +------------------+ +------------------+ |
| |
| Local optimization, failsafe, buffering, compression |
+================================================================+
| |
Device protocols (Modbus, SunSpec, OCPP, BACnet, OPC-UA)
| |
+================================================================+
| LAYER 1: FIELD |
| Inverter FV - BESS BMS - EV Charger - Smart Meter |
| Industrial Loads - CHP Controller - Heat Pump |
+================================================================+
Principii cheie arhitecturale
Controlat de evenimente cu CQRS
Inima DERMS este o magistrală de evenimente (Apache Kafka în producție) care separă comenzile de scriere (Partea de comandă - comenzi de expediere) din interogări de citire (Partea de interogare - tablou de bord, raportare). Modelul CQRS (Command Query Responsibility Segregation) vă permite să scalați cele două căi independent: interogările de telemetrie sunt cu frecvență foarte mare (milioane de mesaje/oră), în timp ce comenzile expedierile sunt mai puțin frecvente, dar necesită garanții de livrare (cel puțin o dată sau exact o dată).
Reziliența pe marginea întâi
Stratul Edge nu este un simplu releu: are capacitatea de a funcționa în modul offline (degradare grațioasă) atunci când conexiunea la cloud este întreruptă. Agregatoarele de site efectuează optimizarea locală cu un subset de reguli simplificate, asigurându-se că bateria nu se descarcă sub SoC minim și că încărcăturile critice rămân alimentate chiar și fără supravegherea cloud-ului DERMS.
Model adaptor driver
Fiecare tip de dispozitiv are propriul adaptor care traduce protocolul nativ (SunSpec Modbus, OpenADR, OCPP) într-un model de date intern standardizat (Device Shadow, inspirat de AWS IoT). Acest lucru este izolator complet logica de afaceri din complexitatea protocoalelor de teren și vă permite să adăugați noi tipuri de DER fără a atinge nucleul sistemului.
Centrală electrică virtuală: Transformarea DER-urilor într-un activ de piață
Centrala electrică virtuală (VPP) este conceptul cheie care dă valoare economică agregării DER. Un VPP nu este o plantă fizică: este un portofoliu de resurse distribuite care, văzute din exterior (de pe piața de energie electrică sau din rețeaua de transport), se comportă ca o centrală virtuală cu caracteristici controlabile și previzibile.
Piața globală de VPP a crescut de la 5,7 miliarde USD în 2025 și se preconizează că va ajunge 28,4 miliarde de dolari până în 2035 (CAGR 17,4%). Software-ul de agregare și orchestrare domina cu 46% din cota de piata. Capacitatea VPP agregată în America de Nord a atins 37,5 GW în 2025, în timp ce ținta globală pentru 2030 depășește 500 GW activată de V2G.
Cum funcționează agregarea VPP
Procesul de agregare are loc în patru faze succesive care se repetă la fiecare 15 minute (perioada tipică de programare a piețelor europene):
- Prognoza individuală: pentru fiecare DER din portofoliu, sistemul calculează disponibilitate estimată în următoarele ore. Pentru un sistem fotovoltaic depinde de radiația așteptată; pentru un BESS, starea curentă de încărcare (SoC) și ciclurile deja planificate; pentru un EV, de la probabilitatea de a fi conectat (pe baza modelelor istorice ale utilizatorilor).
- Agregarea portofoliului: Prognozele individuale sunt agregate la nivel a VPP, ținând cont de constrângerile rețelei (managementul congestionării) și de corelațiile dintre resurse. Rezultatul este o „curbă licitată” care descrie câtă putere poate furniza VPP la fiecare preț.
- Optimizare și licitare: un algoritm de optimizare (programare liniară sau MILP) determină strategia optimă de licitare pe piețe (MGP, MSD, piețe de capacitate) maximizarea veniturilor aşteptate ale portofoliului.
- Expediere in timp real: odată ce contractul a fost atribuit pe piață, Dispatch Engine trimite puncte de referință fiecărui DER individual, respectând constrângerile fizice și echilibrarea răspunsul agregat cu piața țintă.
Implementarea Python: Serviciul de agregare DERMS
Construim un serviciu complet de agregare DER cu FastAPI și optimizare a expedierii bazată pe programare liniară. Codul este structurat în module realiste pentru un sistem producție.
Model de date DER
# models.py - Modello dati per le risorse distribuite
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import datetime
class DERType(Enum):
SOLAR_PV = "solar_pv"
BATTERY_STORAGE = "battery_storage"
EV_CHARGER = "ev_charger"
DEMAND_RESPONSE = "demand_response"
CHP = "chp"
HEAT_PUMP = "heat_pump"
class DERStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
DISPATCHING = "dispatching"
FAULT = "fault"
STANDBY = "standby"
@dataclass
class DERCapabilities:
"""capacità fisiche di una DER"""
max_power_kw: float # Potenza massima erogabile (kW)
min_power_kw: float # Potenza minima (0 per curtailment FV)
ramp_up_kw_per_sec: float # Velocita rampa salita (kW/s)
ramp_down_kw_per_sec: float # Velocita rampa discesa (kW/s)
# Solo per storage (BESS/EV)
capacity_kwh: Optional[float] = None
min_soc_pct: Optional[float] = None # SoC minimo (es. 10%)
max_soc_pct: Optional[float] = None # SoC massimo (es. 95%)
roundtrip_efficiency: Optional[float] = None # Efficienza ciclo (es. 0.92)
@dataclass
class DERTelemetry:
"""Stato in tempo reale di una DER"""
der_id: str
timestamp: datetime.datetime
active_power_kw: float # Potenza attuale (positiva = generazione)
reactive_power_kvar: float
voltage_v: float
current_a: float
status: DERStatus
# Solo per storage
soc_pct: Optional[float] = None
available_charge_kw: Optional[float] = None
available_discharge_kw: Optional[float] = None
# Solo per FV
irradiance_wm2: Optional[float] = None
temperature_c: Optional[float] = None
@dataclass
class DERAsset:
"""Registro completo di una DER nel portfolio"""
id: str
type: DERType
name: str
site_id: str # Sito fisico di appartenenza
grid_node_id: str # Nodo di rete (per vincoli topologici)
capabilities: DERCapabilities
protocol: str # "sunspec", "openadr", "ocpp", "modbus"
endpoint: str # URL/IP del dispositivo o del gateway
owner_id: str # Proprietario (utente o azienda)
aggregation_vpp_ids: list = field(default_factory=list) # VPP di appartenenza
# Telemetria più recente (aggiornata dal telemetry service)
last_telemetry: Optional[DERTelemetry] = None
Serviciu de agregare cu FastAPI
# aggregation_service.py - Servizio di aggregazione VPP
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, List, Optional
import asyncio
import logging
from datetime import datetime, timezone
# Import interni
from models import DERAsset, DERTelemetry, DERType, DERStatus
from dispatch_optimizer import DispatchOptimizer
from telemetry_store import TelemetryStore
logger = logging.getLogger(__name__)
app = FastAPI(title="DERMS Aggregation Service", version="2.1.0")
# --- Pydantic schemas per la API ---
class VPPPortfolioResponse(BaseModel):
vpp_id: str
der_count: int
total_capacity_kw: float
available_capacity_kw: float
current_dispatch_kw: float
battery_soc_avg_pct: Optional[float]
timestamp: str
class DispatchRequest(BaseModel):
vpp_id: str
target_power_kw: float # Potenza target per la VPP (positiva = generazione)
duration_minutes: int # Durata del dispatch
priority: str = "normal" # "emergency" | "normal" | "economic"
max_deviation_pct: float = 5.0 # Tolleranza deviazione dal target
class DispatchSetpoint(BaseModel):
der_id: str
power_kw: float
duration_minutes: int
timestamp: str
class DispatchResponse(BaseModel):
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: List[DispatchSetpoint]
feasibility_score: float # 0.0 - 1.0 (1.0 = target pienamente raggiunto)
timestamp: str
# --- Registro in-memory (in produzione: database PostgreSQL + cache Redis) ---
_vpp_registry: Dict[str, List[str]] = {} # vpp_id -> [der_id]
_der_registry: Dict[str, DERAsset] = {} # der_id -> DERAsset
_telemetry_store = TelemetryStore()
_optimizer = DispatchOptimizer()
# --- Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok", "version": "2.1.0", "timestamp": datetime.now(timezone.utc).isoformat()}
@app.get("/api/v1/vpp/{vpp_id}/portfolio", response_model=VPPPortfolioResponse)
async def get_vpp_portfolio(vpp_id: str):
"""
Ritorna la snapshot aggregata dello stato corrente di una VPP.
Consolida telemetria di tutti i DER nel portfolio.
"""
if vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{vpp_id}' non trovata")
der_ids = _vpp_registry[vpp_id]
der_assets = [_der_registry[did] for did in der_ids if did in _der_registry]
if not der_assets:
raise HTTPException(status_code=503, detail="Nessun DER disponibile nel portfolio")
# Aggregazione metriche
total_capacity = sum(a.capabilities.max_power_kw for a in der_assets)
current_dispatch = 0.0
available_capacity = 0.0
storage_assets = []
for asset in der_assets:
telemetry = _telemetry_store.get_latest(asset.id)
if telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.DISPATCHING]:
current_dispatch += telemetry.active_power_kw
# capacità disponibile = differenza tra massimo e corrente
if telemetry.available_discharge_kw is not None:
available_capacity += telemetry.available_discharge_kw
else:
available_capacity += (asset.capabilities.max_power_kw - telemetry.active_power_kw)
# Raccogli SoC per storage
if telemetry.soc_pct is not None:
storage_assets.append(telemetry.soc_pct)
battery_soc_avg = (sum(storage_assets) / len(storage_assets)) if storage_assets else None
return VPPPortfolioResponse(
vpp_id=vpp_id,
der_count=len(der_assets),
total_capacity_kw=round(total_capacity, 2),
available_capacity_kw=round(max(0, available_capacity), 2),
current_dispatch_kw=round(current_dispatch, 2),
battery_soc_avg_pct=round(battery_soc_avg, 1) if battery_soc_avg else None,
timestamp=datetime.now(timezone.utc).isoformat()
)
@app.post("/api/v1/vpp/dispatch", response_model=DispatchResponse)
async def dispatch_vpp(request: DispatchRequest, background_tasks: BackgroundTasks):
"""
Esegue un dispatch ottimizzato della VPP verso il target di potenza richiesto.
Usa Linear Programming per distribuire il carico tra i DER disponibili.
"""
if request.vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{request.vpp_id}' non trovata")
# Recupero DER disponibili e telemetria aggiornata
der_ids = _vpp_registry[request.vpp_id]
available_ders = []
for der_id in der_ids:
asset = _der_registry.get(der_id)
telemetry = _telemetry_store.get_latest(der_id)
if asset and telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.STANDBY]:
available_ders.append((asset, telemetry))
if not available_ders:
raise HTTPException(status_code=503, detail="Nessun DER disponibile per il dispatch")
# Ottimizzazione del dispatch tramite LP
result = _optimizer.optimize_dispatch(
ders=available_ders,
target_power_kw=request.target_power_kw,
duration_minutes=request.duration_minutes
)
# Invio setpoint in background (async, non bloccante)
background_tasks.add_task(
_send_setpoints_to_ders,
setpoints=result.setpoints,
dispatch_id=result.dispatch_id
)
return result
async def _send_setpoints_to_ders(setpoints: List[DispatchSetpoint], dispatch_id: str):
"""Invia i setpoint a ogni DER in parallelo tramite il rispettivo adapter."""
logger.info(f"Inizio dispatch {dispatch_id} - {len(setpoints)} setpoint")
tasks = [_send_single_setpoint(sp) for sp in setpoints]
results = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning(f"Dispatch {dispatch_id}: {len(errors)} errori su {len(setpoints)} setpoint")
logger.info(f"Dispatch {dispatch_id} completato")
async def _send_single_setpoint(setpoint: DispatchSetpoint):
"""Stub: in produzione chiama l'adapter specifico del protocollo del DER."""
asset = _der_registry.get(setpoint.der_id)
if not asset:
raise ValueError(f"DER {setpoint.der_id} non trovato nel registro")
logger.debug(f"Setpoint {setpoint.der_id}: {setpoint.power_kw} kW per {setpoint.duration_minutes} min")
# In produzione: chiamata all'adapter (SunSpec/OpenADR/OCPP)
await asyncio.sleep(0.1) # Simulazione latenza rete
Optimizator de expediere cu programare liniară (PuLP)
Inima optimizării este o problemă de programare liniară (LP) care minimizează abaterea din ținta de putere respectând constrângerile fizice ale fiecărui DER:
# dispatch_optimizer.py - Ottimizzazione LP per dispatch DER
import pulp
import uuid
import logging
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import List, Tuple
from models import DERAsset, DERTelemetry, DERType, DERStatus
logger = logging.getLogger(__name__)
@dataclass
class OptimizationResult:
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: list
feasibility_score: float
timestamp: str
solver_status: str
solve_time_ms: float
class DispatchOptimizer:
"""
Ottimizzatore LP per dispatch di portfolio DER.
Problema: data una richiesta di potenza target P_target,
trovare i setpoint p_i per ogni DER i nel portfolio
che minimizzino |sum(p_i) - P_target|, rispettando:
- p_i_min <= p_i <= p_i_max per ogni DER
- Vincoli SoC per storage (BESS/EV)
- Vincoli ramp rate
- Vincoli di topologia di rete (opzionale)
"""
def optimize_dispatch(
self,
ders: List[Tuple[DERAsset, DERTelemetry]],
target_power_kw: float,
duration_minutes: int,
vpp_id: str = "vpp-001"
) -> OptimizationResult:
import time
start_time = time.time()
dispatch_id = f"disp-{uuid.uuid4().hex[:8]}"
# Costruzione del problema LP con PuLP
prob = pulp.LpProblem(
name=f"DER_Dispatch_{dispatch_id}",
sense=pulp.LpMinimize
)
# Variabili decisionali: setpoint per ogni DER (kW)
# Vincolate tra min e max fisico del dispositivo
p_vars = {}
for asset, telemetry in ders:
# Calcola limiti effettivi considerando SoC per storage
p_min, p_max = self._compute_effective_limits(asset, telemetry, duration_minutes)
var_name = f"p_{asset.id.replace('-', '_')}"
p_vars[asset.id] = pulp.LpVariable(
name=var_name,
lowBound=p_min,
upBound=p_max,
cat=pulp.constants.LpContinuous
)
# Variabile di slack per la deviazione dal target (non-negativa)
slack_pos = pulp.LpVariable("slack_pos", lowBound=0) # Eccesso rispetto target
slack_neg = pulp.LpVariable("slack_neg", lowBound=0) # Deficit rispetto target
# Funzione obiettivo: minimizzare la deviazione assoluta dal target
# Pesi differenziati: penalizza di più il deficit (mancata fornitura)
prob += 1.5 * slack_neg + 1.0 * slack_pos, "MinimizeDeviation"
# Vincolo di bilanciamento della potenza
total_power = pulp.lpSum(p_vars[asset.id] for asset, _ in ders)
prob += (total_power - target_power_kw == slack_pos - slack_neg), "PowerBalance"
# Risoluzione (COIN-BC solver, open source)
solver = pulp.COIN_CMD(msg=False, timeLimit=5.0)
status = prob.solve(solver)
solve_time_ms = (time.time() - start_time) * 1000
solver_status = pulp.LpStatus[prob.status]
# Costruzione setpoint dal risultato
setpoints = []
achieved_power = 0.0
if status in [pulp.LpStatusOptimal := 1, -1]: # Optimal or Infeasible
for asset, telemetry in ders:
if asset.id in p_vars:
p_val = pulp.value(p_vars[asset.id]) or 0.0
achieved_power += p_val
if abs(p_val) > 0.1: # Ignora setpoint trascurabili
setpoints.append({
"der_id": asset.id,
"power_kw": round(p_val, 2),
"duration_minutes": duration_minutes,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Calcolo feasibility score: 1.0 se target raggiunto, < 1.0 se parziale
if abs(target_power_kw) > 0.1:
deviation = abs(achieved_power - target_power_kw) / abs(target_power_kw)
feasibility_score = max(0.0, 1.0 - deviation)
else:
feasibility_score = 1.0
logger.info(
f"Dispatch {dispatch_id}: target={target_power_kw}kW, "
f"achieved={achieved_power:.1f}kW, score={feasibility_score:.3f}, "
f"solver={solver_status}, time={solve_time_ms:.0f}ms"
)
return OptimizationResult(
dispatch_id=dispatch_id,
vpp_id=vpp_id,
target_power_kw=target_power_kw,
achieved_power_kw=round(achieved_power, 2),
setpoints=setpoints,
feasibility_score=round(feasibility_score, 4),
timestamp=datetime.now(timezone.utc).isoformat(),
solver_status=solver_status,
solve_time_ms=round(solve_time_ms, 1)
)
def _compute_effective_limits(
self,
asset: DERAsset,
telemetry: DERTelemetry,
duration_minutes: int
) -> Tuple[float, float]:
"""
Calcola i limiti effettivi di potenza per un DER considerando:
- Limiti fisici dichiarati
- SoC corrente per storage (quanta energia residua disponibile)
- Temperatura (semplificato)
"""
p_min = asset.capabilities.min_power_kw
p_max = asset.capabilities.max_power_kw
# Vincoli aggiuntivi per storage (BESS o EV)
if asset.capabilities.capacity_kwh and telemetry.soc_pct is not None:
cap_kwh = asset.capabilities.capacity_kwh
soc = telemetry.soc_pct / 100.0
min_soc = (asset.capabilities.min_soc_pct or 10.0) / 100.0
max_soc = (asset.capabilities.max_soc_pct or 95.0) / 100.0
efficiency = asset.capabilities.roundtrip_efficiency or 0.92
duration_h = duration_minutes / 60.0
# Energia disponibile in scarica (discharge -> positivo)
energy_available_discharge_kwh = (soc - min_soc) * cap_kwh * efficiency
p_max_soc = energy_available_discharge_kwh / duration_h if duration_h > 0 else 0
p_max = min(p_max, p_max_soc)
# Energia disponibile in carica (charge -> negativo = consumo)
energy_available_charge_kwh = (max_soc - soc) * cap_kwh / efficiency
p_min_soc = -(energy_available_charge_kwh / duration_h) if duration_h > 0 else 0
p_min = max(p_min, p_min_soc)
return p_min, p_max
Protocoale de comunicare: Rozariul standardelor
Alegerea protocolului de comunicare influențează profund latența, scalabilitatea și costul de integrare DERMS. Nu există un protocol universal: fiecare nivel al ierarhiei folosește protocoale optimizate pentru nevoile dumneavoastră specifice.
| Protocol | Straturi | Transport | Latența tipică | Scalabilitate | Caz de utilizare principal |
|---|---|---|---|---|---|
| OpenADR 2.0b | Platformă → Site | HTTP/XML sau JSON | 1-30 de secunde | Medie (sondaj) | Demand Response, evenimente DR |
| IEEE 2030.5 (SEP2) | Platformă → Dispozitiv | HTTPS/REST | 1-60 de secunde | Ridicat (REST scalabil) | PV, depozitare, EV rezidential |
| SunSpec Modbus TCP | Edge → Dispozitiv | TCP/Modbus | 50-500 ms | Scăzut (sondare secvenţială) | Invertor PV, BESS C&I |
| OCPP 2.0.1 | Platformă → Încărcător | WebSocket/JSON | 100 ms-5s | Ridicat (WebSocket) | coloane EV |
| MQTT | Edge → Platformă | TCP (TLS) | 10-500 ms | Foarte mare (broker) | Telemetrie IoT generică |
| IEC 60870-5-104 | SCADA → RTU | TCP | 50-200 ms | Medie | Substații, RTU vechi |
| DNP3 | SCADA → Câmp | Serial/TCP | 100 ms-1s | Scăzut | Utilitare SCADA vechi |
| IEC 61850 GÂSĂ | Substație | Ethernet (multicast) | < 4 ms | Ridicat (LAN) | Protecții, automatizări SE |
OpenADR 2.0b: arhitectură VTN/VEN
# openadr_adapter.py - Client OpenADR 2.0b (VEN - Virtual End Node)
# Implementazione semplificata per illustrare il flusso di comunicazione
import httpx
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class DREvent:
"""Evento di Demand Response ricevuto dal VTN (utility/DERMS)"""
event_id: str
program_id: str
signal_name: str # "SIMPLE" | "ELECTRICITY_PRICE" | "LOAD_DISPATCH"
signal_type: str # "LEVEL" | "PRICE" | "X-LOAD_DISPATCH"
signal_value: float # Valore del segnale (es. livello 1/2/3 o prezzo EUR/MWh)
dtstart: str # ISO 8601 - inizio evento
duration_minutes: int
randomize_start_minutes: int = 0 # Randomizzazione per evitare picchi sincroni
class OpenADRVENClient:
"""
Virtual End Node (VEN): rappresenta un sito/aggregatore che riceve
eventi DR dal Virtual Top Node (VTN) del DERMS o della utility.
Flusso tipico OpenADR 2.0b in modalità PULL:
1. VEN -> VTN: oadrRequestEvent (richiede eventi disponibili)
2. VTN -> VEN: oadrDistributeEvent (lista eventi attivi)
3. VEN -> VTN: oadrCreatedEvent (conferma ricezione con optIn/optOut)
4. VEN -> VTN: oadrUpdateReport (report su energia effettivamente modificata)
"""
def __init__(self, vtn_url: str, ven_id: str, ven_name: str):
self.vtn_url = vtn_url.rstrip("/")
self.ven_id = ven_id
self.ven_name = ven_name
self._client = httpx.AsyncClient(timeout=30.0)
self._registered = False
self._active_events: dict = {}
async def register(self) -> bool:
"""Registrazione del VEN sul VTN - obbligatoria prima di ricevere eventi."""
payload = self._build_register_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiRegisterParty",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
# Parsing della risposta oadrCreatedParty
registration_id = self._extract_registration_id(root)
if registration_id:
self._registered = True
logger.info(f"VEN {self.ven_id} registrato con ID: {registration_id}")
return True
except Exception as e:
logger.error(f"Errore registrazione VEN: {e}")
return False
async def poll_events(self) -> list:
"""Polling degli eventi DR disponibili sul VTN."""
if not self._registered:
raise RuntimeError("VEN non registrato. Chiamare register() prima.")
payload = self._build_request_event_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
events = self._parse_distribute_event(root)
logger.info(f"VEN {self.ven_id}: ricevuti {len(events)} eventi DR")
return events
except Exception as e:
logger.error(f"Errore polling eventi: {e}")
return []
async def confirm_event(self, event_id: str, opt_in: bool = True) -> bool:
"""Conferma ricezione evento e comunicazione optIn/optOut."""
status = "optIn" if opt_in else "optOut"
payload = self._build_created_event_payload(event_id, status)
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
logger.info(f"Evento {event_id}: {status} confermato")
return True
except Exception as e:
logger.error(f"Errore conferma evento {event_id}: {e}")
return False
async def run_polling_loop(self, poll_interval_seconds: int = 60):
"""Loop di polling continuo - eseguito come task asyncio."""
logger.info(f"Avvio polling loop VEN {self.ven_id} ogni {poll_interval_seconds}s")
while True:
events = await self.poll_events()
for event in events:
if event.event_id not in self._active_events:
self._active_events[event.event_id] = event
# OptIn automatico (in produzione: logica di accettazione business)
await self.confirm_event(event.event_id, opt_in=True)
logger.info(
f"Nuovo evento DR: {event.event_id} | "
f"Segnale: {event.signal_name}={event.signal_value} | "
f"Durata: {event.duration_minutes} min"
)
await asyncio.sleep(poll_interval_seconds)
def _build_register_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRegisterReport specificationID="TELEMETRY_STATUS">
<ei:venID>{self.ven_id}</ei:venID>
</oadrRegisterReport>
</oadrSignedObject>
</oadrPayload>"""
def _build_request_event_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRequestEvent>
<ei:eiRequestEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:replyLimit>10</ei:replyLimit>
</ei:eiRequestEvent>
</oadrRequestEvent>
</oadrSignedObject>
</oadrPayload>"""
def _build_created_event_payload(self, event_id: str, status: str) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrCreatedEvent>
<ei:eiCreatedEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:eventResponses>
<ei:eventResponse>
<ei:responseCode>200</ei:responseCode>
<ei:requestID>{event_id}</ei:requestID>
<ei:qualifiedEventID>
<ei:eventID>{event_id}</ei:eventID>
<ei:modificationNumber>0</ei:modificationNumber>
</ei:qualifiedEventID>
<ei:optType>{status}</ei:optType>
</ei:eventResponse>
</ei:eventResponses>
</ei:eiCreatedEvent>
</oadrCreatedEvent>
</oadrSignedObject>
</oadrPayload>"""
def _parse_distribute_event(self, root: ET.Element) -> list:
"""Parser semplificato per oadrDistributeEvent."""
events = []
# In produzione: parsing completo con namespace XML OpenADR
# Qui simuliamo la struttura per chiarezza
return events
def _extract_registration_id(self, root: ET.Element) -> Optional[str]:
"""Estrae l'ID di registrazione dalla risposta VTN."""
return "reg-001" # Semplificato
Scalabilitate: de la 1.000 la 1.000.000 DER
Scalabilitatea este cea mai critică provocare tehnică a DERMS-ului modern. Gestionați 1.000 de DER și la îndemână orice sistem bine conceput. Gestionați 1.000.000 de DER cu latențe de expediere sub secunde necesită o arhitectură radical diferită, inspirată de sistemele de mare volum din industrie financiar și social media.
Numerele de scalabilitate
Sarcina de telemetrie complet operațională
- 1.000 DER: ~60.000 de mesaje/oră (sondaj la fiecare 60 de secunde) - gestionabil cu un singur microserviciu
- 100.000 DER: ~6.000.000 de mesaje/oră (100.000 msg/min) - necesită sharding și Kafka
- 1.000.000 DER: ~60.000.000 de mesaje/oră - arhitectură de streaming dedicată evenimentelor
Cu o sarcină utilă medie de 200 de octeți per mesaj, aproximativ 1 milion de DER generează 3,3 GB/oră de telemetrie brută, înainte de compresie (care de obicei are ca rezultat 300-400 MB/oră).
Arhitectura Kafka pentru DERMS de înaltă scalabilitate
# kafka_derms_config.py - Configurazione Kafka per DERMS scalabile
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
from dataclasses import asdict
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# === TOPIC DESIGN ===
# Strategia: topic separati per tipo di dato, partitionati per DER ID
# La key del messaggio = der_id garantisce che tutti i messaggi dello stesso
# DER vadano alla stessa partizione (ordering garantito per dispositivo)
KAFKA_TOPICS = {
# Telemetria (alta frequenza, alta velocità)
"der.telemetry.raw": {
"partitions": 48, # 48 partizioni per parallelismo elevato
"replication_factor": 3,
"retention_ms": 86400000, # 24 ore (poi su time-series DB)
"compression_type": "lz4", # LZ4 per compressione veloce
"config": {"cleanup.policy": "delete"}
},
# Comandi di dispatch (bassa frequenza, alta affidabilità)
"der.dispatch.commands": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000, # 7 giorni
"compression_type": "gzip",
"config": {"cleanup.policy": "delete", "min.insync.replicas": "2"}
},
# Conferme dispatch (ack dai dispositivi)
"der.dispatch.acks": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000,
"config": {"cleanup.policy": "delete"}
},
# Aggregati VPP (output dell'aggregation service)
"vpp.portfolio.snapshots": {
"partitions": 4,
"replication_factor": 3,
"retention_ms": 2592000000, # 30 giorni
"config": {"cleanup.policy": "compact"} # Log compaction: mantieni ultima snapshot
},
# Alert e fault detection
"der.alerts": {
"partitions": 6,
"replication_factor": 3,
"retention_ms": 2592000000,
"config": {"cleanup.policy": "delete"}
}
}
class DERMSTelemetryProducer:
"""
Producer Kafka per la telemetria DER.
Usato dai Gateway Edge per pubblicare telemetria verso il cloud.
"""
def __init__(self, bootstrap_servers: str):
self._producer = Producer({
"bootstrap.servers": bootstrap_servers,
"client.id": "derms-telemetry-producer",
# Affidabilità: ACK da tutti i broker in-sync
"acks": "all",
# Performance: batching aggressivo per throughput
"linger.ms": 20,
"batch.size": 65536,
"compression.type": "lz4",
# Retry per fault tolerance
"retries": 5,
"retry.backoff.ms": 200,
"enable.idempotence": True # Exactly-once semantics
})
def publish_telemetry(self, der_id: str, telemetry: dict) -> None:
"""
Pubblica telemetria su Kafka.
La key = der_id garantisce ordering per dispositivo.
"""
payload = json.dumps({
**telemetry,
"_published_at": datetime.now(timezone.utc).isoformat()
}).encode("utf-8")
self._producer.produce(
topic="der.telemetry.raw",
key=der_id.encode("utf-8"),
value=payload,
on_delivery=self._delivery_callback
)
self._producer.poll(0) # Non-blocking flush
def flush(self, timeout: float = 10.0) -> None:
"""Flush dei messaggi in coda prima dello shutdown."""
pending = self._producer.flush(timeout=timeout)
if pending > 0:
logger.warning(f"{pending} messaggi non ancora consegnati dopo flush")
def _delivery_callback(self, err, msg):
if err:
logger.error(f"Errore consegna messaggio: {err}")
else:
logger.debug(f"Messaggio consegnato: topic={msg.topic()}, partition={msg.partition()}")
class DERMSDispatchConsumer:
"""
Consumer Kafka per i comandi di dispatch.
Ogni Site Aggregator consuma dal topic dispatch.commands
i setpoint relativi ai propri DER.
"""
def __init__(self, bootstrap_servers: str, group_id: str, site_id: str):
self.site_id = site_id
self._consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"client.id": f"site-aggregator-{site_id}",
"auto.offset.reset": "latest", # Solo messaggi recenti (no backlog storico)
"enable.auto.commit": False, # Commit manuale dopo elaborazione
"max.poll.interval.ms": 30000,
"session.timeout.ms": 10000
})
self._consumer.subscribe(["der.dispatch.commands"])
def process_commands(self, timeout_seconds: float = 1.0):
"""Poll e processa comandi di dispatch per questo sito."""
msg = self._consumer.poll(timeout=timeout_seconds)
if msg is None:
return None
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
return None
logger.error(f"Errore consumer: {msg.error()}")
return None
command = json.loads(msg.value().decode("utf-8"))
# Filtra solo i comandi per i DER di questo sito
if command.get("site_id") == self.site_id:
logger.info(
f"Sito {self.site_id}: ricevuto dispatch "
f"der={command['der_id']} power={command['power_kw']}kW"
)
# Commit esplicito dopo elaborazione riuscita (at-least-once)
self._consumer.commit(msg)
return command
return None
CQRS și Event Sourcing în DERMS
Într-un DERMS extrem de scalabil, modelul CQRS separă:
Partea de comandă: trimite comenzi pe Kafka (imuabil, numai pentru adăugare)
- fiecare setpoint trimis devine un eveniment permanent în jurnal.
Partea de interogare: proiecții materializate în Redis (cache-ul de stare curentă
a fiecărui DER) și în InfluxDB/TimescaleDB (seri temporale pentru prognoză și M&V).
Acest model vă permite să recalculați starea sistemului de la zero în cazul unor erori
proiecții, pur și simplu prin recitirea jurnalului de evenimente (Event Sourcing).
Răspunsul la cerere: teorie și implementare
Răspunsul la cerere (DR) este capacitatea de a modifica consumul de energie electrică ca răspuns la semnale ale rețelei sau pieței. Este unul dintre cele mai profitabile servicii pe care le poate oferi un VPP, și la momentul respectiv în sine unul dintre cele mai complexe de implementat corect datorită cerințelor de măsurare și Verificare (M&V).
Tipuri de programe DR
| program DR | Semnal | Perioada de graţie | Durata tipică | Remunerarea tipică (IT) |
|---|---|---|---|---|
| Rezervă rapidă (FR) | Frecvența rețelei (automat) | < 1 secundă | 15 minute | ~20-30 EUR/MW/oră |
| Rezervă secundară (RS) | Semnal AGC Terna | secunde | 15 min - ore | ~15-25 EUR/MW/oră |
| Rezerva tertiara (RT) | Terna expediere explicită | 15 minute | 1-4 ore | ~5-15 EUR/MWh |
| Sold (MB) | Oferta pe piata in timp real | 5-30 minute | 15 min - ore | Prețul pieței (variabil) |
| DR Interruptibilitate | Apelul operatorului | 15-30 minute | 1-4 ore | ~30.000-50.000 EUR/MW/an |
| Rezoluția ARERA 300/2017 | Semnal GSE (stimulent CER) | Minute | Variabilă | Primă de stimulare GSE |
Calcul de referință și M&V
# mv_service.py - Measurement & Verification per Demand Response
# Calcola la riduzione effettiva di carico rispetto alla baseline
import numpy as np
from typing import List, Tuple
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class MVService:
"""
Measurement & Verification (M&V) per programmi Demand Response.
Metodo: CBL (Customer Baseline Load) - approccio standard FERC/ENTSO-E
La baseline e calcolata come media degli N giorni simili più recenti
prima dell'evento, escludendo giorni con altri eventi DR.
"""
def __init__(self, n_baseline_days: int = 10, exclude_top_bottom: bool = True):
self.n_baseline_days = n_baseline_days
self.exclude_top_bottom = exclude_top_bottom # High-5 / Low-5 exclusion
def calculate_baseline(
self,
site_id: str,
event_date: datetime,
event_hour: int,
historical_consumption: dict # {date_str: {hour: kw}}
) -> dict:
"""
Calcola la Customer Baseline Load (CBL) per il sito.
Algoritmo CBL con High-5/Low-5 exclusion (CAISO/PJM standard):
1. Seleziona N giorni simili recenti (stessa tipologia giorno: lavorativo/festivo)
2. Esclude il top 20% e il bottom 20% dei giorni per consumo nell'ora evento
3. Media i restanti giorni
"""
target_weekday = event_date.weekday() # 0=Lunedi, 6=Domenica
is_target_workday = target_weekday < 5 # Lavorativo vs weekend
# Raccolta dati storici compatibili
similar_days = []
check_date = event_date - timedelta(days=1)
while len(similar_days) < self.n_baseline_days and check_date > event_date - timedelta(days=60):
date_str = check_date.strftime("%Y-%m-%d")
is_check_workday = check_date.weekday() < 5
# Stesso tipo di giorno (lavorativo/festivo)
if is_check_workday == is_target_workday and date_str in historical_consumption:
day_data = historical_consumption[date_str]
if event_hour in day_data:
similar_days.append({
"date": date_str,
"consumption_kw": day_data[event_hour]
})
check_date -= timedelta(days=1)
if len(similar_days) < 3:
logger.warning(f"Dati insufficienti per baseline sito {site_id}: solo {len(similar_days)} giorni")
return {"baseline_kw": None, "method": "insufficient_data", "days_used": len(similar_days)}
consumptions = [d["consumption_kw"] for d in similar_days]
# High-5/Low-5 exclusion (standard CAISO)
if self.exclude_top_bottom and len(consumptions) >= 10:
n_exclude = max(1, len(consumptions) // 5) # 20% per lato
sorted_consumptions = sorted(consumptions)
filtered_consumptions = sorted_consumptions[n_exclude:-n_exclude]
else:
filtered_consumptions = consumptions
baseline_kw = np.mean(filtered_consumptions)
return {
"site_id": site_id,
"baseline_kw": round(baseline_kw, 2),
"method": "CBL_HighLow_Exclusion",
"days_analyzed": len(similar_days),
"days_used": len(filtered_consumptions),
"std_dev_kw": round(np.std(filtered_consumptions), 2)
}
def calculate_demand_reduction(
self,
site_id: str,
baseline_kw: float,
actual_consumption_kw: float,
event_duration_hours: float
) -> dict:
"""
Calcola la riduzione di carico e l'energia risparmiata.
Risultato usato per il settlement del programma DR.
"""
reduction_kw = max(0, baseline_kw - actual_consumption_kw)
reduction_pct = (reduction_kw / baseline_kw * 100) if baseline_kw > 0 else 0
energy_reduced_kwh = reduction_kw * event_duration_hours
return {
"site_id": site_id,
"baseline_kw": baseline_kw,
"actual_kw": actual_consumption_kw,
"reduction_kw": round(reduction_kw, 2),
"reduction_pct": round(reduction_pct, 1),
"energy_reduced_kwh": round(energy_reduced_kwh, 3),
"verified": reduction_pct >= 5.0 # Soglia minima per settlement
}
Contextul italian: CER, GSE și piața serviciilor auxiliare
Italia reprezintă una dintre cele mai interesante piețe din Europa pentru DERMS și VPP, datorită a mediu de reglementare în evoluție rapidă și una dintre cele mai înalte baze instalate fotovoltaice din Europa (aproximativ 37 GW de fotovoltaic instalat la sfârșitul anului 2024, țintă 80 GW până în 2030).
Comunități de energie regenerabilă (CER)
Decretul MASE nr. 127 din 16 mai 2025, publicată la 25 iunie 2025, a introdus importante noutăți pentru CER, extinderea stimulentelor la municipalitățile cu până la 50.000 de locuitori. CER-urile reprezintă laboratorul italian pentru agregarea distribuită a DER:
Structura de stimulare CER (Decretul post-legislativ 199/2021 și Decretul Ministerial 2025)
- Rata de stimulare: recunoscut pe energie partajată, depinde de aria geografică și de dimensiunea sistemului. Pentru sisteme < 200 kWp: 80-110 EUR/MWh (Centrul-Nord), 90-120 EUR/MWh (Sud și Insule)
- taxa ARERA: valorificarea energiei autoconsumate ~8 EUR/MWh (rambursarea componentelor tarifare)
- Durată: 20 de ani de la data intrării în funcţiune
- Putere permisă: până la 1 MW pentru o singură centrală (posibilitatea de a mai multe centrale în același CER)
- Cerință geografică: punctele de consum trebuie racordate sub aceeasi statie primara
Piața italiană de servicii auxiliare (MSD/MGP)
Terna, OTS-ul italian, gestionează piețele pe care VPP-urile pot oferi flexibilitate. În 2025, datorită introducerii progresive a UVAM (Mixed Virtual Units) și UVAC (Virtual Units Consumul activat), resursele distribuite agregate pot participa și la MSD:
| Piaţă | Acronim | Tipologie | Orizont | Acces VPP/UVAM |
|---|---|---|---|---|
| Cu o zi înainte de piață | MGP | Energia pentru ziua următoare | D-1 9am | Da (prin BSP) |
| Piața intrazilnică | MI | Energie intrazilnică | D-0 la 6 sedinte | Da (prin BSP) |
| Piața serviciilor auxiliare ex-ante | MSD ex-ante | Rezervă, echilibrare | D-1 până la D-0 | Da (UVAM activat) |
| Piața de echilibrare | MB | Echilibrare în timp real | D-0 în timp real | Da (UVAM cu latență < 15 min) |
| Rezervă rapidă | FR | Rezervă ultra-rapidă | Automat | Doar BESS cu latență < 1s |
| MACSE (Storage Capacity Procurement Mechanism) | MACSE | capacitatea de stocare | 15 ani | Stocare doar la scară de rețea |
MACSE în 2025
Prima licitație MACSE, desfășurată în septembrie 2025, a atribuit o capacitate de 10 GWh în zone Centru, Sud și Insule cu contracte de taxare pe 15 ani la prețuri medii de aproximativ 13.000 EUR/MWh/an. Acest mecanism garantează venituri stabile pentru sistemele BESS la scară mare de rețea, dar nu e accesibile VPP-urilor agregate mici. Pentru VPP-uri rezidențiale/comerciale, ruta principalul rămâne MSD prin UVAM.
PNRR și investiții în flexibilitate
Tranziția PNRR 5.0 a alocat resurse semnificative pentru digitalizarea energiei și comunități energetice. Pe lângă măsurile pentru CER, planul include investiții pentru modernizarea rețelelor de distribuție (contoare inteligente 2G, automatizare substații) care ele constituie infrastructura de sprijin pentru DERMS de nouă generație.
Studiu de caz: VPP regional italian (5.000 FV + 500 BESS)
Analizăm un caz concret de proiectare și dimensionare a unui VPP regional în Italia, pe baza parametrilor realiști ai pieței italiene 2025.
Scenariu de referință
Portofoliul VPP „SunFlex Puglia”
- PV rezidential: 4.500 sisteme, dimensiune medie 5 kWp, total 22,5 MWp
- FV C&I: 500 sisteme, dimensiune medie 80 kWp, total 40 MWp
- BESS rezidential: 450 sisteme, dimensiune medie 10 kWh/5 kW, total 4,5 MWh/2,25 MW
- BESS C&I: 50 sisteme, dimensiune medie 500 kWh/250 kW, total 25 MWh/12,5 MW
- Capacitate totală: 62,5 MWp PV + 29,5 MWh/14,75 MW BESS
- Zona geografică: Puglia (zona de sud - iradiere mare, penetrare PV mare)
Fluxuri de venituri VPP
| Fluxul de venituri | Piaţă | Putere/Energie | Venitul estimat | Note |
|---|---|---|---|---|
| Vânzări de energie fotovoltaică | Ziua MGP urmează | 62,5 MW (vârf), ~1.750 echiv. ore/an | ~5,2 M EUR/an | La preț mediu spot ~48 EUR/MWh |
| Rezervă rapidă BESS C&I | FR Terna | 12,5 MW (24/7) | ~2,7 M EUR/an | ~25 EUR/MW/oră x 8.760 ore |
| Echilibrare MSD | MSD/MB | 5 MW echivalent (BESS + DR) | ~0,8 milioane EUR/an | Plată ca licitație, variabilitate ridicată |
| Reducerea VF (accesiră) | MSD ex-ante | Reducere de 20 MW disponibilă | ~0,4 milioane EUR/an | Îmbunătățirea reducerii |
| Stimulent CER (5 CER de 1 MW) | GSE - CER | 5 MWp în configurație CER | ~0,6 M EUR/an | Tarif stimulativ pentru sud: ~120 EUR/MWh |
| TOTAL | ~9,7 milioane EUR/an | Brut, înainte de costurile platformei și rețelei |
Stiva de tehnologie a platformei
# docker-compose.yml - Stack DERMS per VPP regionale
# Configurazione di sviluppo/staging (produzione su Kubernetes)
version: "3.9"
services:
# ========================
# INGESTION LAYER
# ========================
# Broker MQTT per telemetria edge
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT non sicuro (solo LAN interna)
- "8883:8883" # MQTT over TLS (produzione)
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- mosquitto-data:/mosquitto/data
# Apache Kafka (broker eventi principale)
kafka:
image: confluentinc/cp-kafka:7.6.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 # 3 in produzione
KAFKA_LOG_RETENTION_HOURS: 168 # 7 giorni
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# ========================
# PROCESSING LAYER
# ========================
# DERMS Aggregation Service (FastAPI)
aggregation-service:
build: ./services/aggregation
ports:
- "8080:8080"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
INFLUXDB_URL: http://influxdb:8086
INFLUXDB_TOKEN: {{INFLUXDB_TOKEN}}
INFLUXDB_ORG: sunflex-puglia
INFLUXDB_BUCKET: der-telemetry
depends_on:
- kafka
- redis
- influxdb
# Dispatch Optimizer Service
dispatch-optimizer:
build: ./services/dispatch
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
SOLVER: COIN_CMD # o CPLEX in produzione per portfolio grandi
depends_on:
- kafka
- redis
# Forecasting Service (ML - produzione FV + carico)
forecasting-service:
build: ./services/forecasting
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
INFLUXDB_URL: http://influxdb:8086
MODEL_REGISTRY_URL: http://mlflow:5000
WEATHER_API_KEY: {{OPENMETEO_API_KEY}}
depends_on:
- kafka
- influxdb
- mlflow
# OpenADR VTN (Virtual Top Node - invia eventi DR ai siti)
openadr-vtn:
build: ./services/openadr-vtn
ports:
- "8081:8081"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
POSTGRES_URL: postgresql://postgres:5432/derms
depends_on:
- kafka
- postgres
# ========================
# STORAGE LAYER
# ========================
# Time-Series Database per telemetria
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
volumes:
- influxdb-data:/var/lib/influxdb2
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_ORG: sunflex-puglia
DOCKER_INFLUXDB_INIT_BUCKET: der-telemetry
DOCKER_INFLUXDB_INIT_RETENTION: 30d
# Cache per stato corrente DER (Device Shadow)
redis:
image: redis:7.2-alpine
command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
# Database relazionale per asset registry, events, settlement
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: derms
POSTGRES_USER: derms_user
POSTGRES_PASSWORD: {{POSTGRES_PASSWORD}}
volumes:
- postgres-data:/var/lib/postgresql/data
# ========================
# OBSERVABILITY
# ========================
# MLflow per tracking modelli forecasting
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --backend-store-uri postgresql://mlflow:5432/mlflow
# Grafana per dashboard operativo
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./config/grafana/dashboards:/etc/grafana/provisioning/dashboards
volumes:
influxdb-data:
postgres-data:
redis-data:
grafana-data:
mosquitto-data:
KPI-uri și SLA-uri operaționale VPP
| KPI-uri | Ţintă | Măsură | Impact dacă nu este respectat |
|---|---|---|---|
| Latența expedierii (percentila 95) | < 500 ms | Timpul de la comandă la punctul de referință pe dispozitiv | Nu se califică pentru Fast Reserve Terna |
| Prospețimea telemetriei | < 60 de secunde | Vârsta maximă a datelor de telemetrie pentru expediere | Optimizare bazată pe date învechite |
| Disponibilitate DER (tarifa online) | > 95% | Procent DER accesibil în orice moment | Reducerea capacității VPP, penalități de piață |
| Precizia expedierii | > 90% scor de fezabilitate | Abatere medie de la ținta de expediere | Penalități pe piața MSD/MB |
| Prognoza MAPE (FV cu 1h înainte) | < 8% | Eroare procentuală absolută medie | Pierderi de dezechilibru pe piață |
| Perioada de funcționare a platformei | > 99,5% | Disponibilitatea cloud DERMS | Pierderea obligațiilor de flexibilitate |
| Securitate cibernetică - MTTR | < 4 ore | Timp mediu de răspuns la incidentul de securitate | Cerința NIS2 (în vigoare din octombrie 2024) |
Cele mai bune practici și anti-modele în DERMS
Cele mai bune practici
1. Model de umbră a dispozitivului
Păstrați întotdeauna o „umbră” actualizată pentru fiecare DER din cache (Redis): conține cea mai recentă stare dispozitiv cunoscut (SoC, putere curentă, stare) cu marcaj de timp. Expedierea nu așteaptă telemetrie în timp real: folosește umbra, care este actualizată asincron. Acest lucru reduce latența de expediere de la secunde la milisecunde.
2. Degradare grațioasă în niveluri
Definiți niveluri clare de operare: NORMAL (nor + margine), DEGRADAT (numai marginea, optimizare local simplificat), MINIMAL (numai protectii de siguranta - fara expediere economica). Sistemul trebuie să știe întotdeauna la ce nivel se află și să o comunice piețelor prin notificări disponibilitate actualizată.
3. Idempotenta în comenzile de expediere
Fiecare comandă de expediere trebuie să aibă un ID unic. Adaptoarele pentru dispozitive trebuie implementate idempotenta: primirea aceleiași comenzi de două ori (pentru reîncercare) nu trebuie să provoace două dispeceri. Utilizați un jurnal al comenzilor recente cu TTL pentru deduplicare.
4. Separarea între programare și timp real
Programarea (planificarea ofertelor pieței, cu 24 de ore înainte) folosește modele de optimizare complex și lent (MILP cu minute de timp de rezolvare). Expedierea în timp real utilizează LP-uri simplificate care se rezolvă în milisecunde. Nu amestecați niciodată aceste două căi în același proces.
Anti-modele de evitat
Anti-Pattern 1: Sondaj sincron în cascadă
Cel mai comun model în DERMS de prima generație: serverul central sondajează fiecare DER în succesiune. Cu 1.000 de DER și 10 secunde de timeout per dispozitiv, ciclul de sondare este complet durează 10.000 de secunde (aproape 3 ore!). Soluție: sondaj paralel cu pool de conexiuni + controlat de evenimente pentru dispozitivele care acceptă push.
Anti-Pattern 2: Optimizare neconstrânsă SoC
Expedierea agresivă fără respectarea constrângerilor SoC ale bateriilor duce la cicluri de încărcare/descărcare cele profunde care degradează rapid pachetele de celule (reducere de 30-50% a duratei de viață utilă în câteva luni). Fiecare optimizator trebuie să includă întotdeauna constrângeri SoC ca constrângeri dure, nu soft.
Anti-Pattern 3: Baza de date relațională pentru telemetrie
Folosirea PostgreSQL sau MySQL pentru a stoca milioane de puncte de telemetrie pe secundă cauzează rapid probleme de performanță și costuri de stocare nesustenabile. Baze de date cu serii de timp (InfluxDB, TimescaleDB, QuestDB) comprimă datele de 10-50x în comparație cu bazele de date relaționale și suportă interogări temporale optimizate (funcții de fereastră, eșantionare automată).
Anti-Pattern 4: Ignorarea topologiei rețelei
Agregarea DER fără a lua în considerare topologia fizică a rețelei de distribuție poate duce la situații în care expedierea unui VPP provoacă congestie locală, încălcări ale tensiunii sau suprasarcini la transformatoare. Integrarea cu ADMS al DSO este următorul pas necesar pentru DERME mature. Proiectul european ATTEST (2021-2024) a definit standarde pentru această integrare.
Securitate și conformitate pentru DERMS
DERMS controlează infrastructura critică: un atac cibernetic de succes poate provoca întreruperi localizat sau destabiliza rețeaua. Directiva NIS2 (implementată în Italia prin Decretul Legislativ 138/2024, în vigoare din octombrie 2024) clasifică DERMS drept „operatori de servicii esențiale” sub rezerva obligații stricte de securitate cibernetică.
Cerințe cheie de securitate pentru DERMS
- Autentificarea dispozitivului: Fiecare DER se autentifică cu certificate X.509 (PKI) - fără acreditări statice partajate
- Criptare în tranzit: TLS 1.3 obligatoriu pentru toate comunicațiile - inclusiv protocoalele vechi (MQTT peste TLS, HTTPS pentru OpenADR/IEEE 2030.5)
- Rețeaua Zero Trust: niciun dispozitiv și „de încredere” în mod implicit - fiecare cerere este autentificată și autorizată
- RBAC granular: Operatorii, agregatorii și proprietarii DER au permisiuni strict diferențiate
- Jurnal de audit imuabil: fiecare comandă de expediere este înregistrată cu o semnătură digitală (nerefudibilă pentru decontare)
- Integrare SIEM: detectarea în timp real a anomaliilor privind modelele și comenzile de telemetrie
- MTTR < 4 ore: Cerința NIS2 pentru răspunsul la incident
- Segmentarea rețelei: separare fizică/logică între OT (protocoale de câmp) și IT (DERMS în cloud)
Concluzii și pașii următori
Arhitectura unui DERMS modern este una dintre cele mai complexe provocări de inginerie software din sectorul energetic: necesită integrarea protocoalelor eterogene dezvoltate în diferite decenii (Modbus din 1979, OpenADR din 2009, ISO 15118 din 2022), scala de la mii la milioane de dispozitive distribuite care mențin latențe sub secunde și funcționează într-un mediu de reglementare în continuă evoluție.
Puncte cheie de reținut:
- DERMS nu este un SCADA: funcționează în domeniul clientului final, trecând granița contorului, cu toate implicațiile legale și de gestionare a consimțământului care decurg din acesta.
- Arhitectură cu mai multe straturi (Field / Edge / Platform / Market) și separarea obligatorie a responsabilităților - nu o opțiune
- Kafka + CQRS + Event Sourcing și stiva de facto la scară dincolo de 100.000 de DER - arhitecturile de baze de date relaționale nu rezistă
- Optimizarea expedierii prin LP/MILP este bine definită din punct de vedere matematic, dar trebuie să respecte întotdeauna constrângerile fizice (SoC, rata de rampă), cum ar fi constrângerile dure
- Contextul italian (CER, UVAM, MACSE, NIS2) evoluează rapid: avantajul competitiv este construit împreună cu abilitățile de reglementare și tehnice.
Următorul articol din seria EnergyTech analizează Sistem de management al bateriei (BMS): algoritmi de control pentru sisteme de stocare (BESS), din estimarea stării de încărcare (SoC). cu filtru Kalman pentru protectie termica si echilibrarea celulelor. O lectură esențială pentru oricine lucrează cu sisteme de stocare integrate în VPP-uri.
Resurse și perspective
- Specificații OpenADR 2.0b: openadr.org (descărcare gratuită la înregistrare)
- IEEE 2030.5-2023: IEEE Xplore (plătit), prezentare generală gratuită pe smartgrid.ieee.org
- Reguli operaționale CACER/CER: gse.it (Anexa 1, iulie 2025)
- Documentația Terna UVAM: terna.it, secțiunea de expediere
- PuLP (solutor Python LP): coin-or.github.io/pulp
- Client Kafka Python confluent: github.com/confluentinc/confluent-kafka-python
- Ordinul FERC 2222 (referință SUA pentru agregarea DER): ferc.gov
- Proiect ATTEST UE (coordonare DSO-TSO): attest-project.eu
Articole înrudite
- Seria MLOps: pentru implementarea modelelor de prognoză PV în producție integrat în DERMS - vezi seria MLOps pe acest blog
- Seria AI Engineering / RAG: LLM pentru asistenta operationala a operatorilor VPP (interogare în limbaj natural pe datele pieței, alerte inteligente)
- Seria PostgreSQL AI: pgvector pentru căutarea de similaritate pe modelele de consum istoric (util pentru CBL și prognoză)







