Architektura DERMS: agregacja milionów rozproszonych zasobów
W 2025 r. po raz pierwszy w historii ponad połowa energii elektrycznej produkowanej w Europie będzie pochodzić ze źródeł odnawialnych. Niezwykłe osiągnięcie, które jednak niesie ze sobą równie niezwykłe wyzwanie: jak zarządzać siecią elektroenergetyczną, w której produkcja nie jest już skupiona w kilku dużych elektrowniach, ale dystrybuowane w milionach systemów fotowoltaicznych na dachach, akumulatorach w garażach, pojazdach elektrycznych podłączone do gniazdka, inteligentne pompy ciepła i mikrokogeneratory przemysłowe.
Odpowiedź na to wyzwanie ma swoją nazwę: DERMY, Rozproszony System Zarządzania Zasobami Energii. Jest to platforma oprogramowania, która agreguje, monitoruje, optymalizuje i koordynuje w czasie rzeczywistym tysiące - lub miliony - Rozproszonych Zasobów Energii (DER), przekształcając je w elastyczny i kontrolowalny element sieci. Bez DERMS rozwój rozproszonych odnawialnych źródeł energii grozi to destabilizacją sieci zamiast jej dekarbonizacji.
Rynek DERMS przeżywa rozkwit: szacunki na rok 2025 wahają się pomiędzy 1,1 i 1,42 miliarda dolarów w zależności od źródła, z prognozami wzrostu do 2,2 miliarda dolarów do 2030 roku w CAGR o 14-16%. Ale prawdziwa rewolucja ma miejsce od strony architektonicznej: skalowanie od 1 000 do 1 000 000 urządzeń rozproszone, utrzymujące opóźnienia poniżej 500 ms w przypadku wysyłki w czasie rzeczywistym, to problem inżynieryjny oprogramowanie, które bardzo niewielu rozwiązało w środowisku produkcyjnym.
W tym artykule zagłębiamy się w całą architekturę współczesnego DERMS: od protokołów komunikacyjnych dziedzinie (OpenADR 2.0b, IEEE 2030.5, SunSpec Modbus) do sterowanych zdarzeniami platform chmurowych z Kafką, od matematyka optymalizacji wysyłek (programowanie liniowe z PuLP) na rynek usług pomocniczych Włoski (MSD/MGP z Terny). Z działającym kodem Pythona i prawdziwym studium przypadku włoskiego regionalnego VPP.
Czego dowiesz się w tym artykule
- Definicja i umiejscowienie DERMS w ekosystemie użytkowym (różnice z EMS, SCADA, ADMS)
- Rodzaje DER: PV dla budynków mieszkalnych, BESS, EV/V2G, odpowiedź zapotrzebowania, kogeneracja
- Architektura wielowarstwowa: Field, Edge, Platform, Market
- Wirtualna elektrownia: jak połączyć tysiące DER w jeden zasób rynkowy
- Implementacja Pythona: usługa FastAPI + optymalizacja wysyłek za pomocą PuLP
- Protokoły komunikacyjne: OpenADR 2.0b, IEEE 2030.5, MQTT, SunSpec Modbus
- Skalowalność sterowana zdarzeniami za pomocą Apache Kafka i CQRS dla 1 miliona DER
- Odpowiedź na popyt: programy DR, M&V, wydarzenia OpenADR
- Kontekst włoski: CER, GSE, MACSE, MSD/MGP Terna
- Studium przypadku: Regionalne VPP z 5000 PV + 500 BESS
Seria EnergyTech - 10 artykułów
| # | Przedmiot | Państwo |
|---|---|---|
| 1 | Inteligentna sieć i IoT: architektura sieci elektroenergetycznej przyszłości | Opublikowany |
| 2 | Architektura DERMS: agregacja milionów rozproszonych zasobów (tutaj jesteś) | Aktualny |
| 3 | System zarządzania akumulatorami: Algorytmy sterowania dla BESS | Następny |
| 4 | Cyfrowy bliźniak sieci elektroenergetycznej z Pythonem i Pandapower | Już wkrótce |
| 5 | Prognozowanie energii odnawialnej: ML dla fotowoltaiki i wiatru | Już wkrótce |
| 6 | Równoważenie obciążenia pojazdów elektrycznych: V2G i inteligentne ładowanie za pomocą OCPP | Już wkrótce |
| 7 | MQTT i InfluxDB do telemetrii energii w czasie rzeczywistym | Już wkrótce |
| 8 | IEC 61850: Komunikacja w podstacji elektrycznej | Już wkrótce |
| 9 | Oprogramowanie do rozliczania emisji dwutlenku węgla: pomiar i redukcja emisji | Już wkrótce |
| 10 | Blockchain do handlu energią P2P w jednostkach CER | Już wkrótce |
Co to jest DERMS i jak jest umiejscowiony w ekosystemie użyteczności publicznej
Przed przystąpieniem do architektury technicznej konieczne jest wyjaśnienie, co odróżnia DERMS od innych systemy zarządzania energią, z których przedsiębiorstwa użyteczności publicznej korzystają od dziesięcioleci. Zamieszanie terminologiczne w tym branżowe i wysokie, a dostawcy często używają tych akronimów zamiennie w celach marketingowych.
Hierarchia systemów zarządzania
W ekosystemie nowoczesnego przedsiębiorstwa użyteczności publicznej współistnieje wiele systemów, z których każdy ma określone obowiązki:
| System | Akronim | Domena | Zarządzane zasoby | Typowe opóźnienie |
|---|---|---|---|---|
| System zarządzania energią | EMS | Transmisja (WN) | Duże elektrownie, połączenia międzysystemowe | Sekundy-minuty |
| WYGASA | WYGASA | Transmisja + Dystrybucja | Przełączniki, transformatory, linie | 100 ms - 1 s |
| Zaawansowany system zarządzania dystrybucją | ADMS | Dystrybucja (SN/NN) | Sieć dystrybucyjna, kabiny | Towary drugiej jakości |
| Rozproszony system zarządzania zasobami energii | DERMY | Dystrybucja + Klient końcowy | FV, BESS, EV, DR, VPP | 100 ms - 5 minut |
| Domowy system zarządzania energią | HEMS | Klient mieszkalny | Pojedyncze urządzenia domowe | Sekundy-minuty |
DERMS zajmuje wyjątkową pozycję: jest pierwszym systemem, który przekroczył granicę licznika, wejście na domenę klienta końcowego. Rodzi to konsekwencje prawne (zgoda, prywatność danych), techniczne (współpraca z tysiącami marek/modeli urządzeń) i biznesowe (kto jest właścicielem dane? kto dzieli się dochodami?).
Standard referencyjny: IEEE 2030.x i OpenADR
Dwie rodziny standardów napędzają ekosystem DERMS:
IEEE 2030.5 (profil inteligentnej energii 2.0 / SEP2)
Opublikowany w 2013 r. i zaktualizowany w 2023 r. (IEEE 2030.5-2023, grudzień 2024 r.) definiuje protokół komunikacja pomiędzy mediami i urządzeniami dystrybuowanymi do klienta. Oparty na architekturze RESTful na HTTP/HTTPS, obsługuje TLS 1.2+ ze względów bezpieczeństwa. Obejmuje: reakcję na popyt, kontrolę obciążenia, ustalanie cen dynamiczna, zarządzanie DER (fotowoltaika, magazynowanie, EV). I mandat kalifornijski (Zasada 21) dla wszystkich nowe systemy PV i magazynowania. W profilu 2023 wprowadzono funkcje specyficzne dla DER.
OpenADR 2.0b
Open Automated Demand Response, opracowany przez OpenADR Alliance. Wersja 2.0b i profil kompletny dla zaawansowanych serwerów i klientów (2.0a i dla prostych urządzeń). Używaj XML/JSON przez HTTP, definiuje wirtualny węzeł górny (VTN – DERMS/narzędzie) i wirtualny węzeł końcowy (VEN – urządzenie/agregator). Obsługuje tryby push (inicjacja VTN) i pull (wymaga VEN). W 2025 roku pierwsze certyfikowane produkty Ogłoszono OpenADR 3.0 (platforma E.ON SWITCH), ale 2.0b pozostaje operacyjnym odniesieniem w zdecydowanej większości wdrożeń na całym świecie.
Rodzaje DER: Bestiariusz rozproszonych zasobów
DERMS musi zarządzać zoo heterogenicznych technologii, z których każda ma inną charakterystykę fizyczną, różne interfejsy komunikacyjne i różne ograniczenia operacyjne. Warunkiem jest ich dokładne poznanie zaprojektować efektywny system agregacji.
| typ DER | typowa pojemność | Sterowanie | Opóźnienie komunikacji | Główny protokół | Kluczowe ograniczenia |
|---|---|---|---|---|---|
| Fotowoltaika mieszkaniowa | 3-10 kWp | Ograniczenie, prędkość rampowa | 5-60 sekund | IEEE 2030.5, specyfikacja Sun | To zależy od napromieniowania |
| FV C&I (komercyjne i przemysłowe) | 50-5 000 kWp | Ograniczenie mocy biernej | 1-5 sekund | Modbus TCP, DNP3 | Umowa PPA, ograniczenia sieciowe |
| Mieszkanie BESS | 5-15 kWh / 3-10 kW | Wysoki (ładowanie/rozładowanie/czuwanie) | 100 ms - 2 sekundy | Specyfikacja Sun, IEEE 2030.5 | SoC min/max, cykle życia |
| BESS C&I / Skala siatki | 100 kWh - 1 GWh | Bardzo wysoki, odpowiedź pani | 50-500ms | Modbus TCP, IEC 60870, IEC 61850 | Temperatura, SoC, degradacja |
| EV (pojazd-sieć V2G) | 7-100 kW dwukierunkowy | Wysoki, jeśli jest podłączony i włączony | 1-10 sekund (OCPP 2.0.1) | OCPP 2.0.1, ISO 15118 | SoC użytkownika, czasy ładowania |
| EV (inteligentne ładowanie V1G) | 3,7-22 kW w kierunku światowym | Średni (tylko ograniczenie) | 5-30 sekund | OCPP 1.6/2.0.1 | Preferencje użytkownika, docelowy SoC |
| Odpowiedź na zapotrzebowanie (obciążenia przemysłowe) | 50kW - 50MW | Wysoki, jeśli został wstępnie zakwalifikowany | 10-300 sekund | OpenADR 2.0b | Czas trwania zdarzenia, powrót do zdrowia |
| Pompy ciepła (HP) | 3-20 kW cieplna | Średnia (przesunięcie w czasie) | 30-300 sekund | Modbus, OpenADR | Komfort cieplny, nastawa |
| Mikrokogeneracja (CHP) | 1-1000 kWe | Wysoka (z możliwością rampy) | 1-30 sekund | Modbus TCP, OPC-UA | Sprawność cieplna, gaz |
Złożoność heterogeniczności
Prawdziwy DERMS musi współpracować z setkami różnych modeli falowników, BMS, kolumn przemysłowe ładowanie i sterowniki. Każdy producent implementuje protokoły nieco inaczej, z określonymi błędami, niestandardowymi limitami czasu i podzbiorami funkcjonalności. Solidna warstwa sterownika z adapterem wzór i pierwszy priorytet architektoniczny, zanim jeszcze pomyślano o optymalizacji.
Architektura oprogramowania DERMS: model wielowarstwowy
Nowoczesny DERMS jest podzielony na cztery odrębne warstwy, każda z dobrze określonymi obowiązkami i konkretnych technologii. Wyraźny podział pomiędzy warstwami ma fundamentalne znaczenie dla skalowalności i łatwość konserwacji systemu.
# Architettura DERMS - Vista ad alto livello
+================================================================+
| LAYER 4: MARKET |
| Mercati Energia: MGP, MSD, MO, Capacity Market |
| DSO Flexibility Markets, Aggregatori terzi |
| Revenue stacking, Portfolio optimization |
+================================================================+
| |
Bid/Offer API Settlement data
| |
+================================================================+
| LAYER 3: PLATFORM (Cloud DERMS) |
| |
| +------------------+ +------------------+ |
| | Forecasting | | Dispatch Engine | |
| | (ML: FV, load, | | (Optimization: | |
| | EV availability)| | LP/MILP/MPC) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Aggregation | | Market Interface | |
| | Service (VPP | | (Bid builder, | |
| | portfolio mgmt) | | settlement) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Event Bus | | Time-Series DB | |
| | (Apache Kafka) | | (InfluxDB/ | |
| | | | TimescaleDB) | |
| +------------------+ +------------------+ |
| |
| +------------------+ +------------------+ |
| | Device Registry | | API Gateway | |
| | (DER catalog, | | (REST, WebSocket,| |
| | metadata, caps) | | gRPC) | |
| +------------------+ +------------------+ |
+================================================================+
| |
Commands (dispatch) Telemetry (status)
| |
+================================================================+
| LAYER 2: EDGE |
| |
| +------------------+ +------------------+ |
| | Site Aggregator | | Protocol Gateway | |
| | (building/plant | | (Modbus->MQTT, | |
| | controller) | | SunSpec->JSON) | |
| +------------------+ +------------------+ |
| |
| Local optimization, failsafe, buffering, compression |
+================================================================+
| |
Device protocols (Modbus, SunSpec, OCPP, BACnet, OPC-UA)
| |
+================================================================+
| LAYER 1: FIELD |
| Inverter FV - BESS BMS - EV Charger - Smart Meter |
| Industrial Loads - CHP Controller - Heat Pump |
+================================================================+
Kluczowe zasady architektoniczne
Sterowanie zdarzeniami za pomocą CQRS
Sercem DERMS jest magistrala zdarzeń (w produkcji Apache Kafka), która oddziela polecenia zapisu (Strona dowodzenia – wysyłanie zamówień) z zapytań o odczyt (strona zapytań – dashboard, raportowanie). Wzór CQRS (Segregacja odpowiedzialności za zapytania poleceń) umożliwia niezależne skalowanie obu ścieżek: zapytania telemetryczne mają bardzo dużą częstotliwość (miliony wiadomości/godzinę), natomiast polecenia wysyłki są rzadsze, ale wymagają gwarancji dostawy (przynajmniej raz lub dokładnie raz).
Odporność na krawędzi
Warstwa Edge nie jest prostym przekaźnikiem: ma możliwość pracy w trybie offline (łagodna degradacja) gdy połączenie z chmurą zostanie przerwane. Agregatory witryn wykonują optymalizację lokalną z podzbiorem uproszczonych zasad, zapewniających, że bateria nie rozładuje się poniżej minimalnego SoC oraz że obciążenia krytyczne pozostają zasilane nawet bez nadzoru chmury DERMS.
Wzór adaptera sterownika
Każdy typ urządzenia ma swój własny adapter, który tłumaczy natywny protokół (SunSpec Modbus, OpenADR, OCPP) w ustandaryzowanym wewnętrznym modelu danych (Device Shadow, inspirowany AWS IoT). To jest izolowanie całkowicie logikę biznesową od złożoności protokołów terenowych i pozwala na dodawanie nowe typy DER bez dotykania rdzenia systemu.
Wirtualna elektrownia: przekształcanie DER w aktywa rynkowe
Kluczową koncepcją nadającą wartość ekonomiczną agregacji DER jest wirtualna elektrownia (VPP). VPP nie jest fizyczną fabryką: jest to portfel rozproszonych zasobów, patrząc z zewnątrz (z rynku energii elektrycznej lub sieci przesyłowej), zachowuje się jak wirtualna elektrownia o kontrolowanych i przewidywalnych cechach.
Globalny rynek VPP wzrósł z 5,7 miliarda dolarów w 2025 r. i przewiduje się, że osiągnie ten poziom 28,4 miliarda dolarów do 2035 roku (CAGR 17,4%). Oprogramowanie do agregacji i orkiestracji dominuje z 46% udziałem w rynku. Osiągnięto łączną wydajność VPP w Ameryce Północnej 37,5 GW w 2025 r., zaś globalny cel na 2030 r. przekracza 500 GW, które umożliwi V2G.
Jak działa agregacja VPP
Proces agregacji przebiega w czterech kolejnych fazach, które powtarzają się co 15 minut (typowy okres rozliczeniowy rynków europejskich):
- Prognozowanie indywidualne: dla każdego DER w portfelu system oblicza dostępność spodziewana w ciągu najbliższych kilku godzin. W przypadku systemu fotowoltaicznego zależy to od oczekiwanego promieniowania; w przypadku BESS – aktualny stan naładowania (SoC) i już zaplanowane cykle; dla pojazdu elektrycznego, od prawdopodobieństwo połączenia (w oparciu o historyczne wzorce użytkowników).
- Agregacja portfela: Poszczególne prognozy są agregowane do poziomu VPP, biorąc pod uwagę ograniczenia sieciowe (zarządzanie ograniczeniami) i korelacje pomiędzy zasobami. Wynikiem jest „krzywa oferty”, która opisuje, jaką moc może zapewnić VPP przy każdej cenie.
- Optymalizacja i licytacja: algorytm optymalizacji (programowanie liniowe lub MILP) określa optymalną strategię licytacji na rynkach (MGP, MSD, rynki mocy) maksymalizacja oczekiwanych przychodów z portfela.
- Wysyłka w czasie rzeczywistym: po przyznaniu zamówienia na rynku, Dispatch Engine wysyła nastawy do każdego pojedynczego DER, przestrzegając ograniczeń fizycznych i równoważąc zagregowaną reakcję z celem rynkowym.
Implementacja Pythona: Usługa agregacji DERMS
Budujemy kompletną usługę agregacji DER z FastAPI i optymalizacją wysyłek w oparciu o programowanie liniowe. Kod składa się z realistycznych modułów dla systemu produkcja.
Model danych DER
# models.py - Modello dati per le risorse distribuite
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import datetime
class DERType(Enum):
SOLAR_PV = "solar_pv"
BATTERY_STORAGE = "battery_storage"
EV_CHARGER = "ev_charger"
DEMAND_RESPONSE = "demand_response"
CHP = "chp"
HEAT_PUMP = "heat_pump"
class DERStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
DISPATCHING = "dispatching"
FAULT = "fault"
STANDBY = "standby"
@dataclass
class DERCapabilities:
"""capacità fisiche di una DER"""
max_power_kw: float # Potenza massima erogabile (kW)
min_power_kw: float # Potenza minima (0 per curtailment FV)
ramp_up_kw_per_sec: float # Velocita rampa salita (kW/s)
ramp_down_kw_per_sec: float # Velocita rampa discesa (kW/s)
# Solo per storage (BESS/EV)
capacity_kwh: Optional[float] = None
min_soc_pct: Optional[float] = None # SoC minimo (es. 10%)
max_soc_pct: Optional[float] = None # SoC massimo (es. 95%)
roundtrip_efficiency: Optional[float] = None # Efficienza ciclo (es. 0.92)
@dataclass
class DERTelemetry:
"""Stato in tempo reale di una DER"""
der_id: str
timestamp: datetime.datetime
active_power_kw: float # Potenza attuale (positiva = generazione)
reactive_power_kvar: float
voltage_v: float
current_a: float
status: DERStatus
# Solo per storage
soc_pct: Optional[float] = None
available_charge_kw: Optional[float] = None
available_discharge_kw: Optional[float] = None
# Solo per FV
irradiance_wm2: Optional[float] = None
temperature_c: Optional[float] = None
@dataclass
class DERAsset:
"""Registro completo di una DER nel portfolio"""
id: str
type: DERType
name: str
site_id: str # Sito fisico di appartenenza
grid_node_id: str # Nodo di rete (per vincoli topologici)
capabilities: DERCapabilities
protocol: str # "sunspec", "openadr", "ocpp", "modbus"
endpoint: str # URL/IP del dispositivo o del gateway
owner_id: str # Proprietario (utente o azienda)
aggregation_vpp_ids: list = field(default_factory=list) # VPP di appartenenza
# Telemetria più recente (aggiornata dal telemetry service)
last_telemetry: Optional[DERTelemetry] = None
Usługa agregacji z FastAPI
# aggregation_service.py - Servizio di aggregazione VPP
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, List, Optional
import asyncio
import logging
from datetime import datetime, timezone
# Import interni
from models import DERAsset, DERTelemetry, DERType, DERStatus
from dispatch_optimizer import DispatchOptimizer
from telemetry_store import TelemetryStore
logger = logging.getLogger(__name__)
app = FastAPI(title="DERMS Aggregation Service", version="2.1.0")
# --- Pydantic schemas per la API ---
class VPPPortfolioResponse(BaseModel):
vpp_id: str
der_count: int
total_capacity_kw: float
available_capacity_kw: float
current_dispatch_kw: float
battery_soc_avg_pct: Optional[float]
timestamp: str
class DispatchRequest(BaseModel):
vpp_id: str
target_power_kw: float # Potenza target per la VPP (positiva = generazione)
duration_minutes: int # Durata del dispatch
priority: str = "normal" # "emergency" | "normal" | "economic"
max_deviation_pct: float = 5.0 # Tolleranza deviazione dal target
class DispatchSetpoint(BaseModel):
der_id: str
power_kw: float
duration_minutes: int
timestamp: str
class DispatchResponse(BaseModel):
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: List[DispatchSetpoint]
feasibility_score: float # 0.0 - 1.0 (1.0 = target pienamente raggiunto)
timestamp: str
# --- Registro in-memory (in produzione: database PostgreSQL + cache Redis) ---
_vpp_registry: Dict[str, List[str]] = {} # vpp_id -> [der_id]
_der_registry: Dict[str, DERAsset] = {} # der_id -> DERAsset
_telemetry_store = TelemetryStore()
_optimizer = DispatchOptimizer()
# --- Endpoints ---
@app.get("/health")
async def health_check():
return {"status": "ok", "version": "2.1.0", "timestamp": datetime.now(timezone.utc).isoformat()}
@app.get("/api/v1/vpp/{vpp_id}/portfolio", response_model=VPPPortfolioResponse)
async def get_vpp_portfolio(vpp_id: str):
"""
Ritorna la snapshot aggregata dello stato corrente di una VPP.
Consolida telemetria di tutti i DER nel portfolio.
"""
if vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{vpp_id}' non trovata")
der_ids = _vpp_registry[vpp_id]
der_assets = [_der_registry[did] for did in der_ids if did in _der_registry]
if not der_assets:
raise HTTPException(status_code=503, detail="Nessun DER disponibile nel portfolio")
# Aggregazione metriche
total_capacity = sum(a.capabilities.max_power_kw for a in der_assets)
current_dispatch = 0.0
available_capacity = 0.0
storage_assets = []
for asset in der_assets:
telemetry = _telemetry_store.get_latest(asset.id)
if telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.DISPATCHING]:
current_dispatch += telemetry.active_power_kw
# capacità disponibile = differenza tra massimo e corrente
if telemetry.available_discharge_kw is not None:
available_capacity += telemetry.available_discharge_kw
else:
available_capacity += (asset.capabilities.max_power_kw - telemetry.active_power_kw)
# Raccogli SoC per storage
if telemetry.soc_pct is not None:
storage_assets.append(telemetry.soc_pct)
battery_soc_avg = (sum(storage_assets) / len(storage_assets)) if storage_assets else None
return VPPPortfolioResponse(
vpp_id=vpp_id,
der_count=len(der_assets),
total_capacity_kw=round(total_capacity, 2),
available_capacity_kw=round(max(0, available_capacity), 2),
current_dispatch_kw=round(current_dispatch, 2),
battery_soc_avg_pct=round(battery_soc_avg, 1) if battery_soc_avg else None,
timestamp=datetime.now(timezone.utc).isoformat()
)
@app.post("/api/v1/vpp/dispatch", response_model=DispatchResponse)
async def dispatch_vpp(request: DispatchRequest, background_tasks: BackgroundTasks):
"""
Esegue un dispatch ottimizzato della VPP verso il target di potenza richiesto.
Usa Linear Programming per distribuire il carico tra i DER disponibili.
"""
if request.vpp_id not in _vpp_registry:
raise HTTPException(status_code=404, detail=f"VPP '{request.vpp_id}' non trovata")
# Recupero DER disponibili e telemetria aggiornata
der_ids = _vpp_registry[request.vpp_id]
available_ders = []
for der_id in der_ids:
asset = _der_registry.get(der_id)
telemetry = _telemetry_store.get_latest(der_id)
if asset and telemetry and telemetry.status in [DERStatus.ONLINE, DERStatus.STANDBY]:
available_ders.append((asset, telemetry))
if not available_ders:
raise HTTPException(status_code=503, detail="Nessun DER disponibile per il dispatch")
# Ottimizzazione del dispatch tramite LP
result = _optimizer.optimize_dispatch(
ders=available_ders,
target_power_kw=request.target_power_kw,
duration_minutes=request.duration_minutes
)
# Invio setpoint in background (async, non bloccante)
background_tasks.add_task(
_send_setpoints_to_ders,
setpoints=result.setpoints,
dispatch_id=result.dispatch_id
)
return result
async def _send_setpoints_to_ders(setpoints: List[DispatchSetpoint], dispatch_id: str):
"""Invia i setpoint a ogni DER in parallelo tramite il rispettivo adapter."""
logger.info(f"Inizio dispatch {dispatch_id} - {len(setpoints)} setpoint")
tasks = [_send_single_setpoint(sp) for sp in setpoints]
results = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.warning(f"Dispatch {dispatch_id}: {len(errors)} errori su {len(setpoints)} setpoint")
logger.info(f"Dispatch {dispatch_id} completato")
async def _send_single_setpoint(setpoint: DispatchSetpoint):
"""Stub: in produzione chiama l'adapter specifico del protocollo del DER."""
asset = _der_registry.get(setpoint.der_id)
if not asset:
raise ValueError(f"DER {setpoint.der_id} non trovato nel registro")
logger.debug(f"Setpoint {setpoint.der_id}: {setpoint.power_kw} kW per {setpoint.duration_minutes} min")
# In produzione: chiamata all'adapter (SunSpec/OpenADR/OCPP)
await asyncio.sleep(0.1) # Simulazione latenza rete
Optymalizator wysyłki z programowaniem liniowym (PuLP)
Sercem optymalizacji jest problem programowania liniowego (LP), który minimalizuje odchylenie od docelowej mocy, z uwzględnieniem ograniczeń fizycznych każdego DER:
# dispatch_optimizer.py - Ottimizzazione LP per dispatch DER
import pulp
import uuid
import logging
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import List, Tuple
from models import DERAsset, DERTelemetry, DERType, DERStatus
logger = logging.getLogger(__name__)
@dataclass
class OptimizationResult:
dispatch_id: str
vpp_id: str
target_power_kw: float
achieved_power_kw: float
setpoints: list
feasibility_score: float
timestamp: str
solver_status: str
solve_time_ms: float
class DispatchOptimizer:
"""
Ottimizzatore LP per dispatch di portfolio DER.
Problema: data una richiesta di potenza target P_target,
trovare i setpoint p_i per ogni DER i nel portfolio
che minimizzino |sum(p_i) - P_target|, rispettando:
- p_i_min <= p_i <= p_i_max per ogni DER
- Vincoli SoC per storage (BESS/EV)
- Vincoli ramp rate
- Vincoli di topologia di rete (opzionale)
"""
def optimize_dispatch(
self,
ders: List[Tuple[DERAsset, DERTelemetry]],
target_power_kw: float,
duration_minutes: int,
vpp_id: str = "vpp-001"
) -> OptimizationResult:
import time
start_time = time.time()
dispatch_id = f"disp-{uuid.uuid4().hex[:8]}"
# Costruzione del problema LP con PuLP
prob = pulp.LpProblem(
name=f"DER_Dispatch_{dispatch_id}",
sense=pulp.LpMinimize
)
# Variabili decisionali: setpoint per ogni DER (kW)
# Vincolate tra min e max fisico del dispositivo
p_vars = {}
for asset, telemetry in ders:
# Calcola limiti effettivi considerando SoC per storage
p_min, p_max = self._compute_effective_limits(asset, telemetry, duration_minutes)
var_name = f"p_{asset.id.replace('-', '_')}"
p_vars[asset.id] = pulp.LpVariable(
name=var_name,
lowBound=p_min,
upBound=p_max,
cat=pulp.constants.LpContinuous
)
# Variabile di slack per la deviazione dal target (non-negativa)
slack_pos = pulp.LpVariable("slack_pos", lowBound=0) # Eccesso rispetto target
slack_neg = pulp.LpVariable("slack_neg", lowBound=0) # Deficit rispetto target
# Funzione obiettivo: minimizzare la deviazione assoluta dal target
# Pesi differenziati: penalizza di più il deficit (mancata fornitura)
prob += 1.5 * slack_neg + 1.0 * slack_pos, "MinimizeDeviation"
# Vincolo di bilanciamento della potenza
total_power = pulp.lpSum(p_vars[asset.id] for asset, _ in ders)
prob += (total_power - target_power_kw == slack_pos - slack_neg), "PowerBalance"
# Risoluzione (COIN-BC solver, open source)
solver = pulp.COIN_CMD(msg=False, timeLimit=5.0)
status = prob.solve(solver)
solve_time_ms = (time.time() - start_time) * 1000
solver_status = pulp.LpStatus[prob.status]
# Costruzione setpoint dal risultato
setpoints = []
achieved_power = 0.0
if status in [pulp.LpStatusOptimal := 1, -1]: # Optimal or Infeasible
for asset, telemetry in ders:
if asset.id in p_vars:
p_val = pulp.value(p_vars[asset.id]) or 0.0
achieved_power += p_val
if abs(p_val) > 0.1: # Ignora setpoint trascurabili
setpoints.append({
"der_id": asset.id,
"power_kw": round(p_val, 2),
"duration_minutes": duration_minutes,
"timestamp": datetime.now(timezone.utc).isoformat()
})
# Calcolo feasibility score: 1.0 se target raggiunto, < 1.0 se parziale
if abs(target_power_kw) > 0.1:
deviation = abs(achieved_power - target_power_kw) / abs(target_power_kw)
feasibility_score = max(0.0, 1.0 - deviation)
else:
feasibility_score = 1.0
logger.info(
f"Dispatch {dispatch_id}: target={target_power_kw}kW, "
f"achieved={achieved_power:.1f}kW, score={feasibility_score:.3f}, "
f"solver={solver_status}, time={solve_time_ms:.0f}ms"
)
return OptimizationResult(
dispatch_id=dispatch_id,
vpp_id=vpp_id,
target_power_kw=target_power_kw,
achieved_power_kw=round(achieved_power, 2),
setpoints=setpoints,
feasibility_score=round(feasibility_score, 4),
timestamp=datetime.now(timezone.utc).isoformat(),
solver_status=solver_status,
solve_time_ms=round(solve_time_ms, 1)
)
def _compute_effective_limits(
self,
asset: DERAsset,
telemetry: DERTelemetry,
duration_minutes: int
) -> Tuple[float, float]:
"""
Calcola i limiti effettivi di potenza per un DER considerando:
- Limiti fisici dichiarati
- SoC corrente per storage (quanta energia residua disponibile)
- Temperatura (semplificato)
"""
p_min = asset.capabilities.min_power_kw
p_max = asset.capabilities.max_power_kw
# Vincoli aggiuntivi per storage (BESS o EV)
if asset.capabilities.capacity_kwh and telemetry.soc_pct is not None:
cap_kwh = asset.capabilities.capacity_kwh
soc = telemetry.soc_pct / 100.0
min_soc = (asset.capabilities.min_soc_pct or 10.0) / 100.0
max_soc = (asset.capabilities.max_soc_pct or 95.0) / 100.0
efficiency = asset.capabilities.roundtrip_efficiency or 0.92
duration_h = duration_minutes / 60.0
# Energia disponibile in scarica (discharge -> positivo)
energy_available_discharge_kwh = (soc - min_soc) * cap_kwh * efficiency
p_max_soc = energy_available_discharge_kwh / duration_h if duration_h > 0 else 0
p_max = min(p_max, p_max_soc)
# Energia disponibile in carica (charge -> negativo = consumo)
energy_available_charge_kwh = (max_soc - soc) * cap_kwh / efficiency
p_min_soc = -(energy_available_charge_kwh / duration_h) if duration_h > 0 else 0
p_min = max(p_min, p_min_soc)
return p_min, p_max
Protokoły komunikacyjne: Różaniec Standardów
Wybór protokołu komunikacyjnego ma ogromny wpływ na opóźnienia, skalowalność i koszt integracji DERMS. Nie ma uniwersalnego protokołu: używa się go na każdym poziomie hierarchii protokoły zoptymalizowane pod kątem Twoich konkretnych potrzeb.
| Protokół | Warstwy | Transport | Typowe opóźnienie | Skalowalność | Główny przypadek użycia |
|---|---|---|---|---|---|
| OpenADR 2.0b | Platforma → Witryna | HTTP/XML lub JSON | 1-30 sekund | Średnia (ankieta) | Odpowiedź na popyt, wydarzenia DR |
| IEEE 2030.5 (wrzesień 2) | Platforma → Urządzenie | HTTPS/REST | 1-60 sekund | Wysoki (skalowalny REST) | Fotowoltaika, magazynowanie, pojazdy elektryczne do użytku domowego |
| SunSpec Modbus TCP | Krawędź → Urządzenie | TCP/Modbusa | 50-500ms | Niski (odpytywanie sekwencyjne) | Falownik fotowoltaiczny, BESS C&I |
| OCPP 2.0.1 | Platforma → Ładowarka | WebSocket/JSON | 100 ms-5 s | Wysoki (WebSocket) | Kolumny EV |
| MQTT | Krawędź → Platforma | TCP (TLS) | 10-500 ms | Bardzo wysoki (broker) | Ogólna telemetria IoT |
| IEC 60870-5-104 | SCADA → RTU | TCP | 50-200ms | Przeciętny | Podstacje, starsze RTU |
| DNP3 | SCADA → Pole | Szeregowy/TCP | 100 ms-1 s | Niski | Starsze narzędzia SCADA |
| GĘŚ IEC 61850 | Podstacja | Ethernet (multimisja) | < 4 ms | Wysoka (LAN) | Zabezpieczenia, automatyka SE |
OpenADR 2.0b: architektura VTN/VEN
# openadr_adapter.py - Client OpenADR 2.0b (VEN - Virtual End Node)
# Implementazione semplificata per illustrare il flusso di comunicazione
import httpx
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class DREvent:
"""Evento di Demand Response ricevuto dal VTN (utility/DERMS)"""
event_id: str
program_id: str
signal_name: str # "SIMPLE" | "ELECTRICITY_PRICE" | "LOAD_DISPATCH"
signal_type: str # "LEVEL" | "PRICE" | "X-LOAD_DISPATCH"
signal_value: float # Valore del segnale (es. livello 1/2/3 o prezzo EUR/MWh)
dtstart: str # ISO 8601 - inizio evento
duration_minutes: int
randomize_start_minutes: int = 0 # Randomizzazione per evitare picchi sincroni
class OpenADRVENClient:
"""
Virtual End Node (VEN): rappresenta un sito/aggregatore che riceve
eventi DR dal Virtual Top Node (VTN) del DERMS o della utility.
Flusso tipico OpenADR 2.0b in modalità PULL:
1. VEN -> VTN: oadrRequestEvent (richiede eventi disponibili)
2. VTN -> VEN: oadrDistributeEvent (lista eventi attivi)
3. VEN -> VTN: oadrCreatedEvent (conferma ricezione con optIn/optOut)
4. VEN -> VTN: oadrUpdateReport (report su energia effettivamente modificata)
"""
def __init__(self, vtn_url: str, ven_id: str, ven_name: str):
self.vtn_url = vtn_url.rstrip("/")
self.ven_id = ven_id
self.ven_name = ven_name
self._client = httpx.AsyncClient(timeout=30.0)
self._registered = False
self._active_events: dict = {}
async def register(self) -> bool:
"""Registrazione del VEN sul VTN - obbligatoria prima di ricevere eventi."""
payload = self._build_register_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiRegisterParty",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
# Parsing della risposta oadrCreatedParty
registration_id = self._extract_registration_id(root)
if registration_id:
self._registered = True
logger.info(f"VEN {self.ven_id} registrato con ID: {registration_id}")
return True
except Exception as e:
logger.error(f"Errore registrazione VEN: {e}")
return False
async def poll_events(self) -> list:
"""Polling degli eventi DR disponibili sul VTN."""
if not self._registered:
raise RuntimeError("VEN non registrato. Chiamare register() prima.")
payload = self._build_request_event_payload()
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
root = ET.fromstring(response.text)
events = self._parse_distribute_event(root)
logger.info(f"VEN {self.ven_id}: ricevuti {len(events)} eventi DR")
return events
except Exception as e:
logger.error(f"Errore polling eventi: {e}")
return []
async def confirm_event(self, event_id: str, opt_in: bool = True) -> bool:
"""Conferma ricezione evento e comunicazione optIn/optOut."""
status = "optIn" if opt_in else "optOut"
payload = self._build_created_event_payload(event_id, status)
try:
response = await self._client.post(
f"{self.vtn_url}/OpenADR2/Simple/2.0b/EiEvent",
content=payload,
headers={"Content-Type": "application/xml"}
)
response.raise_for_status()
logger.info(f"Evento {event_id}: {status} confermato")
return True
except Exception as e:
logger.error(f"Errore conferma evento {event_id}: {e}")
return False
async def run_polling_loop(self, poll_interval_seconds: int = 60):
"""Loop di polling continuo - eseguito come task asyncio."""
logger.info(f"Avvio polling loop VEN {self.ven_id} ogni {poll_interval_seconds}s")
while True:
events = await self.poll_events()
for event in events:
if event.event_id not in self._active_events:
self._active_events[event.event_id] = event
# OptIn automatico (in produzione: logica di accettazione business)
await self.confirm_event(event.event_id, opt_in=True)
logger.info(
f"Nuovo evento DR: {event.event_id} | "
f"Segnale: {event.signal_name}={event.signal_value} | "
f"Durata: {event.duration_minutes} min"
)
await asyncio.sleep(poll_interval_seconds)
def _build_register_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRegisterReport specificationID="TELEMETRY_STATUS">
<ei:venID>{self.ven_id}</ei:venID>
</oadrRegisterReport>
</oadrSignedObject>
</oadrPayload>"""
def _build_request_event_payload(self) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrRequestEvent>
<ei:eiRequestEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:replyLimit>10</ei:replyLimit>
</ei:eiRequestEvent>
</oadrRequestEvent>
</oadrSignedObject>
</oadrPayload>"""
def _build_created_event_payload(self, event_id: str, status: str) -> str:
return f"""<?xml version="1.0" encoding="UTF-8"?>
<oadrPayload>
<oadrSignedObject>
<oadrCreatedEvent>
<ei:eiCreatedEvent>
<ei:venID>{self.ven_id}</ei:venID>
<ei:eventResponses>
<ei:eventResponse>
<ei:responseCode>200</ei:responseCode>
<ei:requestID>{event_id}</ei:requestID>
<ei:qualifiedEventID>
<ei:eventID>{event_id}</ei:eventID>
<ei:modificationNumber>0</ei:modificationNumber>
</ei:qualifiedEventID>
<ei:optType>{status}</ei:optType>
</ei:eventResponse>
</ei:eventResponses>
</ei:eiCreatedEvent>
</oadrCreatedEvent>
</oadrSignedObject>
</oadrPayload>"""
def _parse_distribute_event(self, root: ET.Element) -> list:
"""Parser semplificato per oadrDistributeEvent."""
events = []
# In produzione: parsing completo con namespace XML OpenADR
# Qui simuliamo la struttura per chiarezza
return events
def _extract_registration_id(self, root: ET.Element) -> Optional[str]:
"""Estrae l'ID di registrazione dalla risposta VTN."""
return "reg-001" # Semplificato
Skalowalność: od 1 000 do 1 000 000 DER
Skalowalność jest najważniejszym wyzwaniem technicznym współczesnych DERMS. Zarządzaj 1000 DER w zasięgu ręki każdy dobrze zaprojektowany system. Zarządzaj 1 000 000 DER z opóźnieniami w wysyłce poniżej sekundy wymaga radykalnie odmiennej architektury, inspirowanej systemami masowymi stosowanymi w przemyśle media finansowe i społecznościowe.
Liczby skalowalności
Obciążenie telemetryczne w pełni sprawne
- 1000 DER: ~60 000 wiadomości/godzinę (odpytywanie co 60 s) - zarządzanie za pomocą jednej mikrousługi
- 100 000 DER: ~6 000 000 wiadomości/godzinę (100 000 msg/min) - wymaga fragmentowania i Kafki
- 1 000 000 DER: ~60 000 000 wiadomości/godzinę - dedykowana architektura strumieniowania zdarzeń
Przy średnim ładunku wynoszącym 200 bajtów na wiadomość generuje się około 1 miliona DER 3,3 GB/godzinę surowej telemetrii, przed kompresją (co zwykle skutkuje 300-400 MB/godzinę).
Architektura Kafki dla DERMS o wysokiej skalowalności
# kafka_derms_config.py - Configurazione Kafka per DERMS scalabile
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json
import logging
from dataclasses import asdict
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# === TOPIC DESIGN ===
# Strategia: topic separati per tipo di dato, partitionati per DER ID
# La key del messaggio = der_id garantisce che tutti i messaggi dello stesso
# DER vadano alla stessa partizione (ordering garantito per dispositivo)
KAFKA_TOPICS = {
# Telemetria (alta frequenza, alta velocità)
"der.telemetry.raw": {
"partitions": 48, # 48 partizioni per parallelismo elevato
"replication_factor": 3,
"retention_ms": 86400000, # 24 ore (poi su time-series DB)
"compression_type": "lz4", # LZ4 per compressione veloce
"config": {"cleanup.policy": "delete"}
},
# Comandi di dispatch (bassa frequenza, alta affidabilità)
"der.dispatch.commands": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000, # 7 giorni
"compression_type": "gzip",
"config": {"cleanup.policy": "delete", "min.insync.replicas": "2"}
},
# Conferme dispatch (ack dai dispositivi)
"der.dispatch.acks": {
"partitions": 12,
"replication_factor": 3,
"retention_ms": 604800000,
"config": {"cleanup.policy": "delete"}
},
# Aggregati VPP (output dell'aggregation service)
"vpp.portfolio.snapshots": {
"partitions": 4,
"replication_factor": 3,
"retention_ms": 2592000000, # 30 giorni
"config": {"cleanup.policy": "compact"} # Log compaction: mantieni ultima snapshot
},
# Alert e fault detection
"der.alerts": {
"partitions": 6,
"replication_factor": 3,
"retention_ms": 2592000000,
"config": {"cleanup.policy": "delete"}
}
}
class DERMSTelemetryProducer:
"""
Producer Kafka per la telemetria DER.
Usato dai Gateway Edge per pubblicare telemetria verso il cloud.
"""
def __init__(self, bootstrap_servers: str):
self._producer = Producer({
"bootstrap.servers": bootstrap_servers,
"client.id": "derms-telemetry-producer",
# Affidabilità: ACK da tutti i broker in-sync
"acks": "all",
# Performance: batching aggressivo per throughput
"linger.ms": 20,
"batch.size": 65536,
"compression.type": "lz4",
# Retry per fault tolerance
"retries": 5,
"retry.backoff.ms": 200,
"enable.idempotence": True # Exactly-once semantics
})
def publish_telemetry(self, der_id: str, telemetry: dict) -> None:
"""
Pubblica telemetria su Kafka.
La key = der_id garantisce ordering per dispositivo.
"""
payload = json.dumps({
**telemetry,
"_published_at": datetime.now(timezone.utc).isoformat()
}).encode("utf-8")
self._producer.produce(
topic="der.telemetry.raw",
key=der_id.encode("utf-8"),
value=payload,
on_delivery=self._delivery_callback
)
self._producer.poll(0) # Non-blocking flush
def flush(self, timeout: float = 10.0) -> None:
"""Flush dei messaggi in coda prima dello shutdown."""
pending = self._producer.flush(timeout=timeout)
if pending > 0:
logger.warning(f"{pending} messaggi non ancora consegnati dopo flush")
def _delivery_callback(self, err, msg):
if err:
logger.error(f"Errore consegna messaggio: {err}")
else:
logger.debug(f"Messaggio consegnato: topic={msg.topic()}, partition={msg.partition()}")
class DERMSDispatchConsumer:
"""
Consumer Kafka per i comandi di dispatch.
Ogni Site Aggregator consuma dal topic dispatch.commands
i setpoint relativi ai propri DER.
"""
def __init__(self, bootstrap_servers: str, group_id: str, site_id: str):
self.site_id = site_id
self._consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"client.id": f"site-aggregator-{site_id}",
"auto.offset.reset": "latest", # Solo messaggi recenti (no backlog storico)
"enable.auto.commit": False, # Commit manuale dopo elaborazione
"max.poll.interval.ms": 30000,
"session.timeout.ms": 10000
})
self._consumer.subscribe(["der.dispatch.commands"])
def process_commands(self, timeout_seconds: float = 1.0):
"""Poll e processa comandi di dispatch per questo sito."""
msg = self._consumer.poll(timeout=timeout_seconds)
if msg is None:
return None
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
return None
logger.error(f"Errore consumer: {msg.error()}")
return None
command = json.loads(msg.value().decode("utf-8"))
# Filtra solo i comandi per i DER di questo sito
if command.get("site_id") == self.site_id:
logger.info(
f"Sito {self.site_id}: ricevuto dispatch "
f"der={command['der_id']} power={command['power_kw']}kW"
)
# Commit esplicito dopo elaborazione riuscita (at-least-once)
self._consumer.commit(msg)
return command
return None
CQRS i pozyskiwanie zdarzeń w DERMS
W wysoce skalowalnym DERMS wzór CQRS oddziela:
Strona dowodzenia: polecenia wysyłania na platformie Kafka (niezmienne, tylko do dodawania)
- każda wysłana wartość zadana staje się trwałym zdarzeniem w logu.
Strona zapytania: Zmaterializowane projekcje w Redis (cache bieżącego stanu
każdego DER) oraz w InfluxDB/TimescaleDB (szereg czasowy do prognozowania i M&V).
Ten wzorzec pozwala na ponowne obliczenie stanu systemu od zera w przypadku błędów
prognoz, po prostu ponownie czytając dziennik zdarzeń (Event Sourcing).
Odpowiedź na popyt: teoria i wdrożenie
Reakcja zapotrzebowania (DR) to zdolność do modyfikowania zużycia energii elektrycznej w odpowiedzi na sygnały sieci lub rynku. Jest to jedna z najbardziej dochodowych usług, jakie może zaoferować VPP, i to w tamtym czasie sam w sobie jest jednym z najbardziej skomplikowanych do prawidłowego wdrożenia ze względu na wymagania pomiarowe i weryfikacja (M&V).
Rodzaje programów DR
| programu DR | Sygnał | Czas realizacji | Typowy czas trwania | Typowe wynagrodzenie (IT) |
|---|---|---|---|---|
| Szybka rezerwacja (FR) | Częstotliwość sieci (automatyczna) | < 1 sekunda | 15 minut | ~20-30 EUR/MW/godz |
| Rezerwa wtórna (RS) | Sygnał AGC Terna | Towary drugiej jakości | 15 min - godz | ~15-25 EUR/MW/godz |
| Rezerwa trzeciorzędna (RT) | Terna wyraźna wysyłka | 15 minut | 1-4 godziny | ~5-15 EUR/MWh |
| Saldo (MB) | Oferta na rynku czasu rzeczywistego | 5-30 minut | 15 min - godz | Cena rynkowa (zmienna) |
| Przerywalność DR | Telefon operatora | 15-30 minut | 1-4 godziny | ~30 000-50 000 EUR/MW/rok |
| Uchwała ARERA 300/2017 | Sygnał GSE (motywacja CER) | Protokół | Zmienny | Premia motywacyjna GSE |
Obliczenia bazowe i M&V
# mv_service.py - Measurement & Verification per Demand Response
# Calcola la riduzione effettiva di carico rispetto alla baseline
import numpy as np
from typing import List, Tuple
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class MVService:
"""
Measurement & Verification (M&V) per programmi Demand Response.
Metodo: CBL (Customer Baseline Load) - approccio standard FERC/ENTSO-E
La baseline e calcolata come media degli N giorni simili più recenti
prima dell'evento, escludendo giorni con altri eventi DR.
"""
def __init__(self, n_baseline_days: int = 10, exclude_top_bottom: bool = True):
self.n_baseline_days = n_baseline_days
self.exclude_top_bottom = exclude_top_bottom # High-5 / Low-5 exclusion
def calculate_baseline(
self,
site_id: str,
event_date: datetime,
event_hour: int,
historical_consumption: dict # {date_str: {hour: kw}}
) -> dict:
"""
Calcola la Customer Baseline Load (CBL) per il sito.
Algoritmo CBL con High-5/Low-5 exclusion (CAISO/PJM standard):
1. Seleziona N giorni simili recenti (stessa tipologia giorno: lavorativo/festivo)
2. Esclude il top 20% e il bottom 20% dei giorni per consumo nell'ora evento
3. Media i restanti giorni
"""
target_weekday = event_date.weekday() # 0=Lunedi, 6=Domenica
is_target_workday = target_weekday < 5 # Lavorativo vs weekend
# Raccolta dati storici compatibili
similar_days = []
check_date = event_date - timedelta(days=1)
while len(similar_days) < self.n_baseline_days and check_date > event_date - timedelta(days=60):
date_str = check_date.strftime("%Y-%m-%d")
is_check_workday = check_date.weekday() < 5
# Stesso tipo di giorno (lavorativo/festivo)
if is_check_workday == is_target_workday and date_str in historical_consumption:
day_data = historical_consumption[date_str]
if event_hour in day_data:
similar_days.append({
"date": date_str,
"consumption_kw": day_data[event_hour]
})
check_date -= timedelta(days=1)
if len(similar_days) < 3:
logger.warning(f"Dati insufficienti per baseline sito {site_id}: solo {len(similar_days)} giorni")
return {"baseline_kw": None, "method": "insufficient_data", "days_used": len(similar_days)}
consumptions = [d["consumption_kw"] for d in similar_days]
# High-5/Low-5 exclusion (standard CAISO)
if self.exclude_top_bottom and len(consumptions) >= 10:
n_exclude = max(1, len(consumptions) // 5) # 20% per lato
sorted_consumptions = sorted(consumptions)
filtered_consumptions = sorted_consumptions[n_exclude:-n_exclude]
else:
filtered_consumptions = consumptions
baseline_kw = np.mean(filtered_consumptions)
return {
"site_id": site_id,
"baseline_kw": round(baseline_kw, 2),
"method": "CBL_HighLow_Exclusion",
"days_analyzed": len(similar_days),
"days_used": len(filtered_consumptions),
"std_dev_kw": round(np.std(filtered_consumptions), 2)
}
def calculate_demand_reduction(
self,
site_id: str,
baseline_kw: float,
actual_consumption_kw: float,
event_duration_hours: float
) -> dict:
"""
Calcola la riduzione di carico e l'energia risparmiata.
Risultato usato per il settlement del programma DR.
"""
reduction_kw = max(0, baseline_kw - actual_consumption_kw)
reduction_pct = (reduction_kw / baseline_kw * 100) if baseline_kw > 0 else 0
energy_reduced_kwh = reduction_kw * event_duration_hours
return {
"site_id": site_id,
"baseline_kw": baseline_kw,
"actual_kw": actual_consumption_kw,
"reduction_kw": round(reduction_kw, 2),
"reduction_pct": round(reduction_pct, 1),
"energy_reduced_kwh": round(energy_reduced_kwh, 3),
"verified": reduction_pct >= 5.0 # Soglia minima per settlement
}
Kontekst włoski: CER, GSE i rynek usług pomocniczych
Włochy reprezentują jeden z najciekawszych rynków w Europie dla DERMS i VPP, dzięki a szybko zmieniające się otoczenie regulacyjne i jedna z największych baz zainstalowanych fotowoltaiki w Europie (około 37 GW zainstalowanej fotowoltaiki na koniec 2024 r., docelowo 80 GW do 2030 r.).
Społeczności energii odnawialnej (CER)
Dekret MASE nr. Ustawa nr 127 z dnia 16 maja 2025 r., opublikowana w dniu 25 czerwca 2025 r., wprowadziła istotne nowości dla jednostek CER, rozszerzające zachęty na gminy do 50 000 mieszkańców. Jednostki CER reprezentują włoskie laboratorium rozproszonej agregacji DER:
Struktura motywacyjna CER (dekret postlegislacyjny 199/2021 i dekret ministerialny 2025)
- Stawka motywacyjna: uznawany za energię współdzieloną, zależy od obszaru geograficznego i wielkości systemu. Dla systemów < 200 kWp: 80-110 EUR/MWh (Centralna Północ), 90-120 EUR/MWh (Południe i Wyspy)
- Opłata ARERA: waloryzacja energii zużytej na własne potrzeby ~8 EUR/MWh (refundacja składników taryfy)
- Czas trwania: 20 lat od daty rozpoczęcia eksploatacji
- Dozwolona moc: do 1 MW dla pojedynczej elektrowni (możliwość wielu elektrowni w tym samym CER)
- Wymóg geograficzny: punkty poboru muszą być połączone w ramach tej samej podstacji podstawowej
Włoski rynek usług pomocniczych (MSD/MGP)
Terna, włoski OSP, zarządza rynkami, na których VPP mogą oferować elastyczność. W 2025 r. dzięki stopniowemu wprowadzaniu UVAM (Mixed Virtual Units) i UVAC (Virtual Units Consumption Enabled), zagregowane rozproszone zasoby mogą również uczestniczyć w MSD:
| Rynek | Akronim | Typologia | Horyzont | Dostęp do VPP/UVAM |
|---|---|---|---|---|
| Dzień Przed Targiem | MGP | Energia na następny dzień | D-1 9 rano | Tak (przez BSP) |
| Rynek dnia bieżącego | MI | Energia śróddzienna | D-0 po 6 sesjach | Tak (przez BSP) |
| Rynek usług pomocniczych ex-ante | MSD ex ante | Rezerwa, równoważenie | D-1 do D-0 | Tak (z obsługą UVAM) |
| Rynek Bilansujący | MB | Równoważenie w czasie rzeczywistym | D-0 w czasie rzeczywistym | Tak (UVAM z opóźnieniem < 15 min) |
| Szybka rezerwa | FR | Ultraszybka rezerwa | Automatyczny | Tylko BESS z opóźnieniem < 1s |
| MACSE (mechanizm zakupu pojemności magazynowej) | MACZA | pojemność magazynowania | 15 lat | Tylko przechowywanie w skali siatki |
MACSE w 2025 r
Na pierwszej aukcji MACSE, która odbyła się we wrześniu 2025 r., przyznano 10 GWh mocy na obszarach Centrum, Południe i Wyspy z 15-letnimi umowami na opłaty za przejazd po średnich cenach około 13 000 EUR/MWh/rok. Mechanizm ten gwarantuje stabilne przychody dużym, sieciowym systemom BESS, ale nie np dostępne dla małych, zbiorczych VPP. W przypadku VPP o charakterze mieszkaniowym/komercyjnym trasa głównym pozostaje MSD poprzez UVAM.
PNRR i inwestycje w elastyczność
W ramach PNRR Transition 5.0 przeznaczono znaczne zasoby na cyfryzację energii i społeczności energetyczne. Oprócz działań na rzecz jednostek CER plan obejmuje inwestycje w zakresie: modernizacja sieci dystrybucyjnych (inteligentne liczniki 2G, automatyka stacyjna). stanowią infrastrukturę wspomagającą dla DERMS nowej generacji.
Studium przypadku: Włoski regionalny VPP (5000 FV + 500 BESS)
Analizujemy konkretny przypadek projektowania i wymiarowania regionalnego VPP we Włoszech, w oparciu o realistyczne parametry rynku włoskiego 2025.
Scenariusz referencyjny
Portfolio VPP „SunFlex Apulia”
- Fotowoltaika mieszkaniowa: 4500 systemów, średniej wielkości 5 kWp, łącznie 22,5 MWp
- FV C&I: 500 systemów, średnia wielkość 80 kWp, łącznie 40 MWp
- Mieszkaniowy BESS: 450 systemów, średniej wielkości 10 kWh/5 kW, łącznie 4,5 MWh/2,25 MW
- BESS C&I: 50 systemów, średniej wielkości 500 kWh/250 kW, łącznie 25 MWh/12,5 MW
- Całkowita pojemność: 62,5 MWp PV + 29,5 MWh/14,75 MW BESS
- Obszar geograficzny: Apulia (obszar południowy – wysokie natężenie promieniowania, wysoka penetracja fotowoltaiki)
Strumienie przychodów VPP
| Strumień przychodów | Rynek | Moc/Energia | Szacunkowe przychody | Notatki |
|---|---|---|---|---|
| Sprzedaż energii fotowoltaicznej | Dzień MGP przed nami | 62,5 MW (szczyt), ~1750 równ. godziny/rok | ~5,2 mln EUR/rok | Przy średniej cenie spot ~48 EUR/MWh |
| Szybka rezerwa BESS C&I | FR Terna | 12,5 MW (24/7) | ~2,7 mln EUR/rok | ~25 EUR/MW/godzinę x 8760 godzin |
| Równoważenie MSD | MSD/MB | Odpowiednik 5 MW (BESS + DR) | ~0,8 mln EUR/rok | Płatność zgodnie z ofertą, duża zmienność |
| Ograniczenie FV (dodatkowe) | MSD ex ante | Dostępne ograniczenie o 20 MW | ~0,4 mln EUR/rok | Wzmocnienie redukcji |
| Motywacja CER (5 jednostek CER o mocy 1 MW) | GSE-CER | 5 MWp w konfiguracji CER | ~0,6 mln EUR/rok | Taryfa motywacyjna Południowa: ~120 EUR/MWh |
| CAŁKOWITY | ~9,7 mln EUR/rok | Brutto, przed kosztami platformy i sieci |
Stos technologii platformy
# docker-compose.yml - Stack DERMS per VPP regionale
# Configurazione di sviluppo/staging (produzione su Kubernetes)
version: "3.9"
services:
# ========================
# INGESTION LAYER
# ========================
# Broker MQTT per telemetria edge
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT non sicuro (solo LAN interna)
- "8883:8883" # MQTT over TLS (produzione)
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- mosquitto-data:/mosquitto/data
# Apache Kafka (broker eventi principale)
kafka:
image: confluentinc/cp-kafka:7.6.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 1 # 3 in produzione
KAFKA_LOG_RETENTION_HOURS: 168 # 7 giorni
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# ========================
# PROCESSING LAYER
# ========================
# DERMS Aggregation Service (FastAPI)
aggregation-service:
build: ./services/aggregation
ports:
- "8080:8080"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
INFLUXDB_URL: http://influxdb:8086
INFLUXDB_TOKEN: {{INFLUXDB_TOKEN}}
INFLUXDB_ORG: sunflex-puglia
INFLUXDB_BUCKET: der-telemetry
depends_on:
- kafka
- redis
- influxdb
# Dispatch Optimizer Service
dispatch-optimizer:
build: ./services/dispatch
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
REDIS_URL: redis://redis:6379/0
SOLVER: COIN_CMD # o CPLEX in produzione per portfolio grandi
depends_on:
- kafka
- redis
# Forecasting Service (ML - produzione FV + carico)
forecasting-service:
build: ./services/forecasting
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
INFLUXDB_URL: http://influxdb:8086
MODEL_REGISTRY_URL: http://mlflow:5000
WEATHER_API_KEY: {{OPENMETEO_API_KEY}}
depends_on:
- kafka
- influxdb
- mlflow
# OpenADR VTN (Virtual Top Node - invia eventi DR ai siti)
openadr-vtn:
build: ./services/openadr-vtn
ports:
- "8081:8081"
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
POSTGRES_URL: postgresql://postgres:5432/derms
depends_on:
- kafka
- postgres
# ========================
# STORAGE LAYER
# ========================
# Time-Series Database per telemetria
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
volumes:
- influxdb-data:/var/lib/influxdb2
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_ORG: sunflex-puglia
DOCKER_INFLUXDB_INIT_BUCKET: der-telemetry
DOCKER_INFLUXDB_INIT_RETENTION: 30d
# Cache per stato corrente DER (Device Shadow)
redis:
image: redis:7.2-alpine
command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
# Database relazionale per asset registry, events, settlement
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: derms
POSTGRES_USER: derms_user
POSTGRES_PASSWORD: {{POSTGRES_PASSWORD}}
volumes:
- postgres-data:/var/lib/postgresql/data
# ========================
# OBSERVABILITY
# ========================
# MLflow per tracking modelli forecasting
mlflow:
image: ghcr.io/mlflow/mlflow:v2.11.0
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --backend-store-uri postgresql://mlflow:5432/mlflow
# Grafana per dashboard operativo
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./config/grafana/dashboards:/etc/grafana/provisioning/dashboards
volumes:
influxdb-data:
postgres-data:
redis-data:
grafana-data:
mosquitto-data:
Operacyjne KPI i SLA VPP
| KPI | Cel | Mierzyć | Wpływ, jeśli nie jest przestrzegany |
|---|---|---|---|
| Opóźnienie wysyłki (95. percentyl) | < 500 ms | Czas od polecenia do wartości zadanej na urządzeniu | Nie kwalifikuje się do Fast Reserve Terna |
| Świeżość telemetrii | < 60 sekund | Maksymalny wiek danych telemetrycznych do wysyłki | Optymalizacja na podstawie nieaktualnych danych |
| Dostępność DER (stawka online) | > 95% | Procent DER osiągalny w dowolnym momencie | Redukcja mocy VPP, kary rynkowe |
| Dokładność wysyłki | > 90% oceny wykonalności | Średnie odchylenie od celu wysyłki | Kary na rynku MSD/MB |
| Prognoza MAPE (FV 1h do przodu) | < 8% | Średni bezwzględny błąd procentowy | Straty niezbilansowane na rynku |
| Czas pracy platformy | > 99,5% | Dostępność chmury DERMS | Utrata obowiązków związanych z elastycznością |
| Cyberbezpieczeństwo – MTTR | < 4 godziny | Średni czas reakcji na incydent bezpieczeństwa | Wymóg NIS2 (obowiązuje od października 2024 r.) |
Najlepsze praktyki i anty-wzorce w DERMS
Najlepsze praktyki
1. Wzór cienia urządzenia
Zawsze przechowuj zaktualizowany „cień” dla każdego DER w pamięci podręcznej (Redis): zawiera najnowszy stan znane urządzenie (SoC, aktualna moc, status) ze znacznikiem czasu. Wysyłka nie czeka na Telemetria w czasie rzeczywistym: wykorzystuje cień, który jest aktualizowany asynchronicznie. Zmniejsza to opóźnienie wysyłania z sekund do milisekund.
2. Pełna wdzięku degradacja poziomów
Zdefiniuj jasne poziomy działania: NORMALNY (chmura + brzeg), Zdegradowany (tylko brzeg, optymalizacja uproszczone lokalnie), MINIMALNE (tylko zabezpieczenia – bez wysyłki ekonomicznej). System musi zawsze wiedzieć, na którym się znajduje poziomie, i przekazywać to rynkom za pomocą powiadomień zaktualizowana dostępność.
3. Idempotencja w poleceniach wysłania
Każde polecenie wysłania musi mieć unikalny identyfikator. Adaptery urządzeń muszą zostać zaimplementowane idempotencja: dwukrotne otrzymanie tego samego polecenia (w celu ponownej próby) nie może powodować dwóch wysyłek. Użyj dziennika ostatnich poleceń z TTL do deduplikacji.
4. Oddzielenie harmonogramu od czasu rzeczywistego
Scheduling (planowanie ofert rynkowych z 24-godzinnym wyprzedzeniem) wykorzystuje modele optymalizacyjne złożony i powolny (MILP z minutowym czasem rozwiązania). Wysyłka w czasie rzeczywistym wykorzystuje uproszczone LP które rozwiązują się w milisekundach. Nigdy nie mieszaj tych dwóch ścieżek w tym samym procesie.
Anty-wzorce, których należy unikać
Antywzorzec 1: Kaskadowe odpytywanie synchroniczne
Najczęstszy wzorzec w DERMS pierwszej generacji: serwer centralny odpytuje każdy DER w kolejności. Przy 1000 DER i 10 sekundach limitu czasu na urządzenie cykl odpytywania jest zakończony trwa 10 000 sekund (prawie 3 godziny!). Rozwiązanie: równoległe odpytywanie z pulą połączeń + sterowane zdarzeniami dla urządzeń obsługujących technologię push.
Anty-wzorzec 2: Nieograniczona optymalizacja SoC
Agresywna wysyłka bez przestrzegania ograniczeń SoC akumulatorów prowadzi do cykli ładowania/rozładowania głębokie, które szybko degradują pakiety ogniw (30-50% skrócenie okresu użytkowania w ciągu kilku miesięcy). Każdy optymalizator musi zawsze uwzględniać ograniczenia SoC jako ograniczenia twarde, a nie miękkie.
Anty-wzorzec 3: Relacyjna baza danych dla telemetrii
Używanie PostgreSQL lub MySQL do przechowywania milionów punktów telemetrycznych na sekundę powoduje szybkie problemy z wydajnością i niezrównoważone koszty przechowywania. Bazy danych szeregów czasowych (InfluxDB, TimescaleDB, QuestDB) kompresują dane 10-50x w porównaniu do relacyjnych baz danych i obsługuje zoptymalizowane zapytania tymczasowe (funkcje okna, automatyczne próbkowanie w dół).
Antywzorzec 4: Ignorowanie topologii sieci
Agregacja DER bez uwzględnienia topologii fizycznej sieci dystrybucyjnej może prowadzić do: sytuacje, w których wysyłka VPP powoduje lokalne zatory, naruszenia napięcia lub przeciążenia transformatorów. Kolejnym niezbędnym krokiem jest integracja z systemem ADMS OSD dla dojrzałych DERMS. Europejski projekt ATTEST (2021-2024) określił standardy tej integracji.
Bezpieczeństwo i zgodność dla DERMS
DERMS kontroluje infrastrukturę krytyczną: udany cyberatak może spowodować przerwy w dostawie prądu zlokalizować lub zdestabilizować sieć. Dyrektywa NIS2 (wdrożona we Włoszech dekretem legislacyjnym 138/2024, obowiązuje od października 2024 r.) klasyfikuje DERMS jako „operatorów usług podstawowych” podlegających rygorystyczne obowiązki w zakresie cyberbezpieczeństwa.
Kluczowe wymagania bezpieczeństwa dla DERMS
- Uwierzytelnianie urządzenia: Każdy DER uwierzytelnia się za pomocą certyfikatów X.509 (PKI) — bez współdzielonych poświadczeń statycznych
- Szyfrowanie w transporcie: TLS 1.3 obowiązkowy dla całej komunikacji – łącznie ze starszymi protokołami (MQTT przez TLS, HTTPS dla OpenADR/IEEE 2030.5)
- Sieć zerowego zaufania: brak urządzenia i domyślnie „zaufane” – każde żądanie jest uwierzytelniane i autoryzowane
- Granulowany RBAC: Operatorzy, agregatory i właściciele DER mają ściśle zróżnicowane uprawnienia
- Niezmienny dziennik audytu: każde polecenie wysyłki logowane jest podpisem cyfrowym (niepodlegającym rozliczeniu)
- Integracja SIEM-a: wykrywanie anomalii w czasie rzeczywistym we wzorcach i poleceniach telemetrycznych
- MTTR < 4 godziny: Wymóg NIS2 dotyczący reagowania na incydenty
- Segmentacja sieci: fizyczna/logiczna separacja pomiędzy OT (protokoły terenowe) i IT (DERMS w chmurze)
Wnioski i dalsze kroki
Architektura nowoczesnego DERMS jest jednym z najbardziej złożonych wyzwań inżynierii oprogramowania w branży sektor energetyczny: wymaga integracji heterogenicznych protokołów opracowanych w różnych dekadach (Modbus od 1979, OpenADR od 2009, ISO 15118 od 2022), skala od tysięcy do milionów urządzenia rozproszone, utrzymujące opóźnienia poniżej sekundy i działające w środowisku regulacyjnym stale się rozwija.
Kluczowe punkty do zapamiętania:
- DERMS nie jest SCADA: działa w domenie klienta końcowego, przekraczając granicę licznika, ze wszystkimi wynikającymi z tego konsekwencjami prawnymi i zarządzaniem zgodami
- Architektura wielowarstwowa (Field/Edge/Platforma/Rynek) i obowiązkowy podział odpowiedzialności – nie wchodzi w grę
- Kafka + CQRS + Event Sourcing i de facto stos umożliwiający skalowanie powyżej 100 000 DER - architektury relacyjnych baz danych nie wytrzymują
- Optymalizacja wysyłki poprzez LP/MILP jest dobrze zdefiniowana matematycznie, ale zawsze musi uwzględniać ograniczenia fizyczne (SoC, szybkość rampy), takie jak twarde ograniczenia
- Kontekst włoski (CER, UVAM, MACSE, NIS2) szybko się rozwija: przewagę konkurencyjną buduje się wspólnie dzięki umiejętnościom regulacyjnym i technicznym
Kolejny artykuł z serii EnergyTech poświęcony jest System zarządzania baterią (BMS): algorytmy sterowania systemami pamięci masowej (BESS) na podstawie estymacji stanu naładowania (SoC). z filtrem Kalmana do ochrony termicznej i równoważenia ogniw. Niezbędna lektura dla każdemu, kto pracuje z systemami pamięci masowej zintegrowanymi z VPP.
Zasoby i spostrzeżenia
- Specyfikacje OpenADR 2.0b: openadr.org (do pobrania bezpłatnie po rejestracji)
- IEEE 2030.5-2023: IEEE Xplore (płatny), bezpłatny przegląd na smartgrid.ieee.org
- Zasady operacyjne CACER/CER: gse.it (Załącznik 1, lipiec 2025 r.)
- Dokumentacja Terna UVAM: terna.it, Sekcja wysyłkowa
- PuLP (solwer Pythona LP): coin-or.github.io/pulp
- Konfluentny klient Kafka Python: github.com/confluentinc/confluent-kafka-python
- Zarządzenie FERC 2222 (amerykański punkt odniesienia dla agregacji DER): ferc.gov
- Projekt UE ATTEST (koordynacja OSD-OSP): atest-project.eu
Powiązane artykuły
- Seria MLOps: w celu wdrożenia modeli prognozowania fotowoltaiki w produkcji zintegrowany z DERMS - zobacz serię MLOps na tym blogu
- Inżynieria AI / seria RAG: LLM o pomoc operacyjną dla operatorów VPP (zapytanie w języku naturalnym o dane rynkowe, inteligentne alerty)
- Seria PostgreSQL AI: pgvector do wyszukiwania podobieństw we wzorcach konsumpcji historyczne (przydatne przy CBL i prognozowaniu)







