Echilibrarea sarcinii de încărcare EV: algoritmi în timp real pentru încărcare inteligentă
Este ora 18.30 într-o seară de marți. Mii de șoferi italieni se întorc acasă după muncă, își parchează vehiculul electric și conectează cablul de încărcare. În câteva minute, întrebarea de putere la transformatorul de cartier trage în sus. Fără management inteligent, acest scenariu – care se repetă în fiecare zi din ce în ce mai intens pe măsură ce parcul crește EV - duce la supraîncărcări ale rețelei, întreruperi localizate și costuri de modernizare a infrastructurii de ordinul miliardelor de euro.
În 2025 vor circula mai departe 17 milioane de vehicule electrice în Europa, din care cca 230.000 în Italia, cu 94.230 de noi înregistrări numai în 2025 (+46% față de 2024). Proiecția până în 2030 vorbește despre peste 50 de milioane de vehicule electrice în Europa: fiecare vehicul necesită în medie între 7 și 22 kW de putere la încărcare. Înmulțit cu milioane de utilizatori care se reîncarcă în aceleași ore de seară, avem o problemă fără precedent de stabilitate a rețelei.
Soluția nu este să construiești mai multă rețea: este prea scumpă și lentă. Soluția este echilibrare inteligentă a sarcinii: algoritmi în timp real care distribuie puterea disponibile printre vehiculele de încărcare, respectând constrângerile rețelei, minimizând costurile cu energia și maximizarea satisfacției utilizatorilor. În acest articol explorăm întreaga stivă tehnologic: de la protocolul OCPP 2.0.1 la algoritmi de optimizare, de la învățare prin întărire la integrarea V2G, cu cod Python funcțional și contextul de reglementare italian.
Ce veți învăța în acest articol
- Curbe de rață și impact negestionat EV asupra rețelei de distribuție
- Taxonomia algoritmilor de echilibrare a sarcinii: Static, Dinamic, Predictiv
- Încărcare inteligentă OCPP 2.0.1: SetChargingProfile, ChargingSchedule, StackLevel
- Implementarea Python: Equal Share cu recalculare dinamică în timp real
- Algoritm bazat pe priorități cu coadă heap (SoC, termen limită, rată)
- Învățare prin consolidare (PPO) pentru optimizarea costurilor energetice
- Vehicle-to-Grid (V2G): bidirecțional, ISO 15118-20, reglare a frecvenței
- Integrare fotovoltaică: surplus solar și direcționare către EV
- Arhitectura microservicii: Charge Controller, Energy Manager, Forecaster
- Context italian: tarife la două ore, CER cu EV, reglementări ARERA
Seria EnergyTech - 10 articole
| # | Articol | Stat |
|---|---|---|
| 1 | Smart Grid și IoT: Arhitectură pentru rețeaua electrică a viitorului | Publicat |
| 2 | Arhitectura DERMS: agregarea a milioane de resurse distribuite | Publicat |
| 3 | Sistem de management al bateriei: algoritmi de control pentru BESS | Publicat |
| 4 | Digital Twin al rețelei electrice cu Python și Pandapower | Publicat |
| 5 | Prognoza energiei regenerabile: ML pentru PV și eolian | Publicat |
| 6 | Echilibrare încărcare EV: algoritmi în timp real (ești aici) | Actual |
| 7 | MQTT și InfluxDB pentru telemetrie energetică în timp real | În curând |
| 8 | IEC 61850: Comunicarea în stația electrică | În curând |
| 9 | Software de contabilizare a carbonului: Măsurarea și reducerea emisiilor | În curând |
| 10 | Blockchain pentru tranzacționarea energiei P2P în CER | În curând |
Problema de vârf: curba de rață și vehiculele electrice negestionate
Pentru a înțelege de ce echilibrarea încărcării vehiculelor electrice nu este o opțiune, ci o necesitate, trebuie să începem din fizica rețelei electrice și dintr-un concept de care managerii de rețea se tem în fiecare zi: cel curbe de rață.
Curba Rațelor și agravarea ei
Curba de rață descrie forma curbei cererii nete de energie electrică în timpul unei zile tipice într-un sistem cu penetrare fotovoltaică mare. Așa se numește deoarece curba seamănă cu profilul unei rațe: o burtă joasă în mijlocul zilei (când solar produce din abundență și cerere netă și scăzută) și o cocoașă mare seara (când soarele apune, producția fotovoltaică se prăbușește și cererea rezidențială crește vertiginos).
Fără vehicule electrice, rețeaua italiană se ocupă deja de această problemă în fiecare zi. Odată cu creșterea vehiculelor electrice lăsată negestionată, problema se amplifică drastic. Cercetarea MDPI (2025) arată că fără optimizare, cererea de vârf trece de la 22.000 MW de bază la 35.000 MW cu 5 milioane de vehicule electrice. Cu încărcarea inteligentă, aceeași flotă adaugă doar 2.000-3.000 MW distribuiți în timpul nopții.
| Scenariu | Ora de vârf | Sarcina suplimentara | Riscul de rețea |
|---|---|---|---|
| Linia de referință (fără EV) | 19:00-20:00 | 0 MW | Gestionabil |
| 500.000 de vehicule electrice negestionate | 18.30-19.30 | +3.500 MW | Tensiuni locale asupra transformatoarelor |
| 1,5 milioane de vehicule electrice negestionate | 18:00-20:00 | +10.500 MW | Supraîncărcare larg răspândită |
| 5 milioane de vehicule electrice negestionate | 18:00-21:00 | +35.000 MW | Pane de curent sistemice |
| 1,5 M EV cu încărcare inteligentă | Livrat între orele 22:00 și 6:00 | +2.100 MW diluat | Neglijabil |
O optimizare bazată pe inteligență artificială a profilului de încărcare poate reduce cererea de vârf a 16% cu 1,5 milioane de vehicule electrice, cu 21% cu 3 milioane și cu 34% cu 5 milioane (sursa: MDPI Electronics, 2025). Diferența dintre un scenariu gestionat și unul negestionat Nu sunt câteva puncte procentuale: este diferența dintre o rețea stabilă și un colaps.
Impact asupra transformatoarelor de distribuție
Problema nu este doar la nivelul sistemului de transport, ci mai ales la nivel de rețea distributie locala. Un transformator de cartier de 400 kVA servește de obicei 80-120 familii. Dacă 15 dintre acestea au un EV care începe să se încarce la 11 kW, toate împreună la 6:30 p.m., sarcina suplimentară este de 165 kW - aproape 41% din capacitatea nominală a transformatorului, care ar putea fi deja la 60-70% pentru consumul intern. Rezultat: supraîncălzire, reducere a duratei de viata utila, iar in cel mai rau caz deschiderea protectiilor.
Costul inacțiunii
Potrivit Terna, costul de consolidare a rețelei de distribuție italiană pentru a sprijini tranziția EV fără încărcare inteligentă este estimată la 10-15 miliarde de euro până în 2030. Cu încărcarea inteligentă pe scară largă, tranziția în sine necesită investiții infrastructura costă cu 60-70% mai mici, pur și simplu prin deplasarea sarcinii în orele în care rețeaua are capacitate disponibilă.
Taxonomia algoritmilor de echilibrare a sarcinii
Nu există un singur algoritm optim de echilibrare a sarcinii: alegerea depinde de dimensiune a instalației, disponibilitatea datelor istorice, cerințele de latență și complexitate pe care managerul este dispus să o mențină. Clasificăm abordările în trei familiile principale.
Algoritmi statici
Algoritmii statici definesc regulile de distribuție a puterii a priori, fără a se adapta dinamic la starea sistemului. Sunt simplu de implementat si depanare, ideal pentru instalatii mici, omogene.
- Round Robin: fiecare conector primește aceeași putere maximă pe rotație. Simplu, dar ineficient: un vehicul electric aproape încărcat folosește aceeași putere ca unul gol.
- Cotă egală: puterea totală disponibilă este împărțită în mod egal între toți conectorii activi. Eficient pentru instalații omogene, dar nu ia în considerare nevoi individuale.
- Primul venit, primul servit: primele EV-uri conectate primesc putere maximă, cele ulterioare împart reziduul. Pedepsiți-i pe cei care ajung târziu.
Algoritmi dinamici
Algoritmii dinamici adaptează distribuția în timp real în funcție de stare curentul sistemului: numărul de vehicule electrice conectate, nivelul de încărcare (SoC), puterea disponibilă, prioritățile utilizatorilor.
- Proporţional: puterea este distribuită proporțional cu energia solicitate de fiecare EV. Cine are cea mai slabă baterie primește cea mai mare putere.
- Bazat pe prioritate: fiecare EV are un scor de prioritate calculat pe multiple factori (SoC, termen limită, abonament). Puterea trece către priorități mai înalte.
- Logica fuzzy: reguli lingvistice precum „dacă SoC și termenul limită E scăzut și închideți ATUNCI prioritatea este mare." Gestionează bine incertitudinea datelor de intrare.
Algoritmi predictivi (bazați pe ML)
Algoritmii predictivi folosesc modele de învățare automată pentru a anticipa evenimentele viitoare (sosiri, plecări, variații ale prețului energiei) și optimizați programarea în avans, minimizând costurile totale pe termen lung.
- Control predictiv model (MPC): rezolvă o problemă de optimizare pe un orizont de timp viitor (de exemplu, 4 ore) la fiecare interval de control (de exemplu, 15 min), aplicând doar primul pas și reformulând la pasul următor.
- Învățare prin consolidare profundă (DRL): un agent învață politica optimă interacționând cu un simulator. PPO și SAC sunt cele mai utilizate în producție.
- Programare stocastică: încorporează în mod explicit incertitudinea în Sosiri și plecări de vehicule electrice, generând programe robuste pentru scenariile cele mai defavorabile.
| Algoritm | Complexitate | Latența | Optimalitate | Date solicitate | Caz de utilizare ideal |
|---|---|---|---|---|---|
| Cotă egală | Scăzut | <1 ms | Scăzut | Nimeni | Rezidential, <10 prize |
| Bazat pe prioritate | Medie | <10 ms | Mediu-Ridicat | SoC, termen limită pentru utilizator | Birouri, hoteluri, flote |
| MPC | Ridicat | 100 ms-1s | Ridicat | Prețurile energiei, previziuni | Hub > 50 de prize, C&I |
| Deep RL (PPO) | Foarte sus | <50 ms (inferență) | Foarte sus | Istoric 6+ luni | Hub-uri grozave, V2G încorporat |
OCPP 2.0.1 Smart Charging: Protocolul de încărcare inteligentă
OCPP (Open Charge Point Protocol) și limbajul universal între punctele de încărcare (CP) și sistemul central (CSMS - Charging Station Management System). În versiunea 2.0.1, Încărcarea inteligentă a fost profund îmbunătățită în comparație cu OCPP 1.6, introducând gestionarea mai granulară și mai fiabilă a profilurilor de taxare.
Mecanismul SetChargingProfile
CSMS trimite un mesaj la punctul de încărcare SetChargingProfileRequest conţinând
a ChargingProfile care definește modul în care stația trebuie să furnizeze putere în timp.
O ChargingProfile si compus din:
-
chargingProfilPurpose:
ChargePointMaxProfile(limitați toate stația),TxDefaultProfile(implicit pentru tranzacții noi),TxProfile(specific unei tranzacții în curs). - stackLevel: întreg care definește prioritatea. În cazul profilurilor suprapus, cel cu cel mai mare stackLevel câștigă.
- program de încărcare: lista de perioade (startPeriod în secunde, limită în A sau W) care definesc curba de încărcare în timp.
Secvență critică: descreștere înainte de creștere
Când trimiteți profiluri actualizate către mai multe posturi, este esențial să trimiteți mai întâi comenzile de reducere a puterii și apoi cele de creștere. Dacă crești mai întâi, riscați să depășiți limita rețelei pentru un interval scurt, declanșând protectii electrice sau generatoare de varfuri de cerere cu penalitati contractuale in consecinta. Această regulă nu este negociabilă în producție.
Implementarea Python: generarea profilului OCPP 2.0.1
"""
OCPP 2.0.1 Smart Charging Profile Generator
Genera ChargingProfile compatibili con la specifica OCPP 2.0.1.
"""
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
import json
from datetime import datetime, timezone
class ChargingRateUnit(str, Enum):
WATTS = "W"
AMPERES = "A"
class ChargingProfilePurpose(str, Enum):
CHARGE_POINT_MAX = "ChargePointMaxProfile"
TX_DEFAULT = "TxDefaultProfile"
TX = "TxProfile"
class ChargingProfileKind(str, Enum):
ABSOLUTE = "Absolute"
RECURRING = "Recurring"
RELATIVE = "Relative"
@dataclass
class ChargingSchedulePeriod:
"""Singolo intervallo nella schedule di ricarica."""
start_period: int # secondi dall'inizio della schedule
limit: float # limite in W o A
number_phases: Optional[int] = None
@dataclass
class ChargingSchedule:
"""Schedule completa di un profilo di ricarica."""
id: int
charging_rate_unit: ChargingRateUnit
charging_schedule_period: List[ChargingSchedulePeriod]
duration: Optional[int] = None
start_schedule: Optional[str] = None
min_charging_rate: Optional[float] = None
@dataclass
class ChargingProfile:
"""Profilo di ricarica OCPP 2.0.1 completo."""
id: int
stack_level: int
charging_profile_purpose: ChargingProfilePurpose
charging_profile_kind: ChargingProfileKind
charging_schedule: List[ChargingSchedule]
transaction_id: Optional[str] = None
valid_from: Optional[str] = None
valid_to: Optional[str] = None
def build_equal_share_profile(
profile_id: int,
power_limit_watts: float,
duration_seconds: int = 3600,
stack_level: int = 1
) -> ChargingProfile:
"""
Costruisce un profilo OCPP per distribuzione equa (potenza fissa).
Usato dall'algoritmo Equal Share per applicare il limite calcolato.
"""
schedule_period = ChargingSchedulePeriod(
start_period=0,
limit=power_limit_watts,
number_phases=3
)
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=[schedule_period],
duration=duration_seconds,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def build_time_of_use_profile(
profile_id: int,
schedule_periods: List[tuple],
stack_level: int = 2
) -> ChargingProfile:
"""
Costruisce un profilo Time-of-Use con potenza variabile nel tempo.
Esempio: alta potenza di notte (energia economica), bassa di giorno.
Args:
schedule_periods: lista di (start_period_sec, limit_watts)
"""
periods = [
ChargingSchedulePeriod(start_period=start, limit=limit)
for start, limit in schedule_periods
]
schedule = ChargingSchedule(
id=profile_id,
charging_rate_unit=ChargingRateUnit.WATTS,
charging_schedule_period=periods,
start_schedule=datetime.now(timezone.utc).isoformat()
)
return ChargingProfile(
id=profile_id,
stack_level=stack_level,
charging_profile_purpose=ChargingProfilePurpose.TX_DEFAULT,
charging_profile_kind=ChargingProfileKind.ABSOLUTE,
charging_schedule=[schedule]
)
def profile_to_ocpp_dict(profile: ChargingProfile) -> dict:
"""Serializza il profilo nel formato JSON OCPP 2.0.1."""
return {
"id": profile.id,
"stackLevel": profile.stack_level,
"chargingProfilePurpose": profile.charging_profile_purpose.value,
"chargingProfileKind": profile.charging_profile_kind.value,
"chargingSchedule": [
{
"id": sched.id,
"chargingRateUnit": sched.charging_rate_unit.value,
"chargingSchedulePeriod": [
{
"startPeriod": p.start_period,
"limit": p.limit,
}
for p in sched.charging_schedule_period
],
**({"duration": sched.duration} if sched.duration else {}),
**({"startSchedule": sched.start_schedule} if sched.start_schedule else {}),
}
for sched in profile.charging_schedule
],
}
if __name__ == "__main__":
# Profilo Equal Share: 3.3 kW per 2 ore
profile = build_equal_share_profile(
profile_id=101,
power_limit_watts=3300.0,
duration_seconds=7200
)
print(json.dumps(profile_to_ocpp_dict(profile), indent=2))
# Profilo Time-of-Use: 11 kW da adesso, 3.3 kW dopo 4 ore
tou = build_time_of_use_profile(
profile_id=102,
schedule_periods=[
(0, 11000), # da ora: 11 kW (fascia fuori punta)
(4 * 3600, 3300), # dopo 4 ore: 3.3 kW (fascia F1 inizia)
],
stack_level=2
)
print(json.dumps(profile_to_ocpp_dict(tou), indent=2))
Algoritm Equal Share cu recalculare dinamică
Equal Share este cel mai răspândit algoritm de echilibrare a sarcinii în instalațiile reale. Forța sa nu este în eleganța matematică, ci în robustețea operațională: este simplu pentru a explica utilizatorilor, ușor de depanat și suficient de eficient pentru majoritatea instalatiilor pana la 30-40 de prize. Adevărata complexitate constă în gestionați recalculările dinamice: de fiecare dată când un EV se conectează sau se deconectează sau când se modifică puterea disponibilă pentru un semnal DSO, sistemul trebuie să recalculeze și redistribuiți în milisecunde, respectând secvența critică reduceri-apoi-crește.
"""
Equal Share Load Balancer
Distribuzione equa con ricalcolo dinamico event-driven.
Thread-safe per deployment in produzione con OCPP WebSocket server.
"""
import asyncio
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Callable, Awaitable
from enum import Enum
import time
logger = logging.getLogger(__name__)
class ConnectorStatus(str, Enum):
AVAILABLE = "Available"
CHARGING = "Charging"
SUSPENDED_EV = "SuspendedEV"
FINISHING = "Finishing"
UNAVAILABLE = "Unavailable"
FAULTED = "Faulted"
@dataclass
class ConnectorState:
"""Stato di un singolo connettore nella stazione."""
connector_id: str
status: ConnectorStatus = ConnectorStatus.AVAILABLE
transaction_id: Optional[str] = None
allocated_power_w: float = 0.0
max_power_w: float = 22000.0 # max fisico del connettore (22 kW AC trifase)
min_power_w: float = 1380.0 # min per mantenere la carica (6A x 230V)
soc_percent: Optional[float] = None
connected_at: Optional[float] = None
@property
def is_charging(self) -> bool:
return self.status == ConnectorStatus.CHARGING
@property
def session_duration_min(self) -> float:
if self.connected_at is None:
return 0.0
return (time.time() - self.connected_at) / 60.0
@dataclass
class StationConfig:
"""Configurazione della stazione di ricarica."""
station_id: str
max_station_power_w: float
dynamic_limit_w: Optional[float] = None
rebalance_interval_s: float = 30.0
@property
def available_power_w(self) -> float:
if self.dynamic_limit_w is not None:
return min(self.max_station_power_w, self.dynamic_limit_w)
return self.max_station_power_w
ProfileSendCallback = Callable[[str, str, float], Awaitable[bool]]
class EqualShareBalancer:
"""
Load balancer Equal Share thread-safe e asincrono.
Gestisce la distribuzione equa della potenza disponibile
tra tutti i connettori attivamente in carica.
"""
def __init__(
self,
config: StationConfig,
profile_sender: ProfileSendCallback
):
self._config = config
self._connectors: Dict[str, ConnectorState] = {}
self._send_profile = profile_sender
self._lock = asyncio.Lock()
self._last_allocation: Dict[str, float] = {}
async def register_connector(
self,
connector_id: str,
max_power_w: float = 22000.0,
min_power_w: float = 1380.0
) -> None:
async with self._lock:
self._connectors[connector_id] = ConnectorState(
connector_id=connector_id,
max_power_w=max_power_w,
min_power_w=min_power_w
)
logger.info("Connettore %s registrato (max=%.0f W)", connector_id, max_power_w)
async def on_ev_connected(self, connector_id: str, transaction_id: str) -> None:
"""Chiamato quando un EV si connette. Triggera ricalcolo immediato."""
async with self._lock:
if connector_id not in self._connectors:
logger.warning("Connettore sconosciuto: %s", connector_id)
return
self._connectors[connector_id].status = ConnectorStatus.CHARGING
self._connectors[connector_id].transaction_id = transaction_id
self._connectors[connector_id].connected_at = time.time()
await self._rebalance()
async def on_ev_disconnected(self, connector_id: str) -> None:
"""Chiamato quando un EV si disconnette. Libera la potenza e redistribuisce."""
async with self._lock:
if connector_id not in self._connectors:
return
connector = self._connectors[connector_id]
connector.status = ConnectorStatus.AVAILABLE
connector.transaction_id = None
connector.allocated_power_w = 0.0
connector.connected_at = None
await self._rebalance()
async def update_dynamic_limit(self, limit_w: float) -> None:
"""Aggiorna il limite DSO in tempo reale. Triggera ricalcolo immediato."""
async with self._lock:
self._config.dynamic_limit_w = limit_w
logger.info("Limite DSO aggiornato: %.0f W (stazione %s)", limit_w, self._config.station_id)
await self._rebalance()
async def _rebalance(self) -> None:
"""
Calcola la nuova allocazione Equal Share e invia i profili OCPP.
REGOLA CRITICA: invia prima le riduzioni, poi gli aumenti.
"""
async with self._lock:
active = [c for c in self._connectors.values() if c.is_charging]
if not active:
return
available = self._config.available_power_w
n = len(active)
raw_share = available / n
new_allocations: Dict[str, float] = {}
# Prima passata: applica limiti min/max per connettore
constrained = []
free = []
for connector in active:
if raw_share <= connector.min_power_w:
new_allocations[connector.connector_id] = connector.min_power_w
constrained.append(connector)
elif raw_share >= connector.max_power_w:
new_allocations[connector.connector_id] = connector.max_power_w
constrained.append(connector)
else:
free.append(connector)
# Seconda passata: redistribuisce il residuo tra i connettori liberi
if free:
allocated_constrained = sum(
new_allocations[c.connector_id] for c in constrained
)
residual = available - allocated_constrained
share_free = residual / len(free)
for connector in free:
new_allocations[connector.connector_id] = max(
connector.min_power_w,
min(connector.max_power_w, share_free)
)
# Ordina: prima le riduzioni, poi gli aumenti (regola OCPP)
reductions = []
increases = []
for connector_id, new_power in new_allocations.items():
old_power = self._last_allocation.get(connector_id, float('inf'))
if new_power < old_power:
reductions.append((connector_id, new_power))
else:
increases.append((connector_id, new_power))
for connector_id, power in reductions:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Riduzione: %s -> %.0f W", connector_id, power)
for connector_id, power in increases:
success = await self._send_profile(self._config.station_id, connector_id, power)
if success:
self._last_allocation[connector_id] = power
logger.info("Aumento: %s -> %.0f W", connector_id, power)
logger.info(
"Ribilanciamento: %d EV attivi, %.0f W disponibili, stazione %s",
len(active), self._config.available_power_w, self._config.station_id
)
async def start_periodic_rebalance(self) -> None:
"""Avvia il ricalcolo periodico come safety net per deriva dello stato."""
async def _loop():
while True:
await asyncio.sleep(self._config.rebalance_interval_s)
await self._rebalance()
asyncio.create_task(_loop())
def get_status(self) -> dict:
return {
"station_id": self._config.station_id,
"available_power_w": self._config.available_power_w,
"connectors": [
{
"connector_id": c.connector_id,
"status": c.status.value,
"allocated_power_w": c.allocated_power_w,
"soc_percent": c.soc_percent,
"session_duration_min": round(c.session_duration_min, 1)
}
for c in self._connectors.values()
]
}
async def demo_equal_share():
async def mock_send(station_id, connector_id, power_w) -> bool:
print(f" OCPP -> {connector_id}: {power_w:.0f} W")
return True
config = StationConfig(station_id="STATION_001", max_station_power_w=44000.0)
balancer = EqualShareBalancer(config, mock_send)
for i in range(1, 5):
await balancer.register_connector(f"C0{i}", max_power_w=22000.0, min_power_w=1380.0)
print("\n[EV1 connesso]")
await balancer.on_ev_connected("C01", "TX001") # C01: 44000 W
print("\n[EV2 connesso]")
await balancer.on_ev_connected("C02", "TX002") # C01,C02: 22000 W ciascuno
print("\n[EV3 e EV4 connessi]")
await balancer.on_ev_connected("C03", "TX003")
await balancer.on_ev_connected("C04", "TX004") # tutti: 11000 W
print("\n[DSO riduce limite a 22 kW]")
await balancer.update_dynamic_limit(22000.0) # tutti: 5500 W
print("\n[EV2 si disconnette]")
await balancer.on_ev_disconnected("C02") # C01,C03,C04: 7333 W
if __name__ == "__main__":
asyncio.run(demo_equal_share())
Algoritm bazat pe priorități cu scor multifactor
Equal Share este corect, dar nu optim din punctul de vedere al utilizatorilor. Un EV cu baterie cei 5% al căror proprietar trebuie să plece în 30 de minute au o nevoie mult mai urgentă comparativ cu un EV de 60% care rămâne parcat timp de 8 ore. Echilibrare bazată pe priorități răspunde acestei nevoi prin calcularea unui scor de urgență multifactorial e distribuirea puterii proporțional cu prioritățile.
Formula de punctare prioritară
Scorul combină patru dimensiuni cu greutăți configurabile:
- Urgență SoC (35%): cat si descarca bateria. Funcție concavă pentru amplificarea cazurilor critice (de exemplu, SoC 5% = urgență foarte mare).
- Presiunea timpului (35%): cât de aproape este plecarea. Funcție exponențială care crește rapid pe măsură ce termenul limită se apropie.
- Nivelul de utilizator (15%): nivel de abonament (de bază/premium/prioritar). Creați un model de afaceri cu SLA-uri diferențiate.
- Eficiență energetică (15%): îi recompensează pe cei care pot absorbi efectiv puterea. Evitați alocarea energiei bateriilor aproape pline.
"""
Priority-Based Load Balancer.
Distribuisce potenza proporzionalmente ai punteggi di urgenza,
con algoritmo iterativo per rispettare i vincoli min/max.
"""
import math
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
logger = logging.getLogger(__name__)
@dataclass
class ChargingSession:
"""Sessione con tutti i parametri per il calcolo della priorità."""
connector_id: str
transaction_id: str
soc_percent: float
target_soc_percent: float
battery_capacity_kwh: float
departure_time: float # timestamp unix
max_power_w: float
min_power_w: float
user_tier: int = 1 # 1=base, 2=premium, 3=priority
connected_at: float = field(default_factory=time.time)
@property
def energy_needed_kwh(self) -> float:
delta = max(0.0, self.target_soc_percent - self.soc_percent) / 100.0
return self.battery_capacity_kwh * delta
@property
def time_remaining_h(self) -> float:
return max(0.1, (self.departure_time - time.time()) / 3600.0)
class PriorityScoreCalculator:
"""Calcola punteggio di urgenza normalizzato [0, 100]."""
W_SOC = 0.35
W_TIME = 0.35
W_TIER = 0.15
W_EFFICIENCY = 0.15
def calculate(self, session: ChargingSession) -> float:
# 1. Urgenza SoC: concava, amplifica i SoC bassissimi
soc_ratio = session.soc_percent / 100.0
soc_score = (1.0 - soc_ratio) ** 1.5 * 100
# 2. Pressione temporale: esponenziale, cresce rapido vicino alla deadline
time_score = 100.0 * math.exp(-0.3 * session.time_remaining_h)
# 3. Tier: bonus per abbonamenti premium
tier_score = {1: 33.0, 2: 66.0, 3: 100.0}.get(session.user_tier, 33.0)
# 4. Efficienza: premia chi ha batteria da riempire
eff = min(100.0, (session.energy_needed_kwh / max(0.1, session.battery_capacity_kwh)) * 100)
return round(
self.W_SOC * soc_score +
self.W_TIME * time_score +
self.W_TIER * tier_score +
self.W_EFFICIENCY * eff,
2
)
class PriorityBalancer:
"""
Distribuisce potenza proporzionalmente ai punteggi con vincoli min/max.
Algoritmo iterativo: separa i connettori con vincoli attivi e redistribuisce
il residuo tra quelli liberi, ripetendo fino a convergenza.
"""
def __init__(self, station_id: str, max_station_power_w: float, profile_sender):
self._station_id = station_id
self._max_power = max_station_power_w
self._dynamic_limit: Optional[float] = None
self._sessions: Dict[str, ChargingSession] = {}
self._send_profile = profile_sender
self._score_calc = PriorityScoreCalculator()
self._lock = asyncio.Lock()
@property
def available_power_w(self) -> float:
if self._dynamic_limit is not None:
return min(self._max_power, self._dynamic_limit)
return self._max_power
async def add_session(self, session: ChargingSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
await self._rebalance()
async def remove_session(self, connector_id: str) -> None:
async with self._lock:
self._sessions.pop(connector_id, None)
await self._rebalance()
async def update_soc(self, connector_id: str, soc_percent: float) -> None:
async with self._lock:
if connector_id in self._sessions:
self._sessions[connector_id].soc_percent = soc_percent
await self._rebalance()
def _compute_allocations(self) -> Dict[str, float]:
"""Algoritmo iterativo per allocazione con vincoli min/max."""
if not self._sessions:
return {}
sessions = list(self._sessions.values())
scores = {s.connector_id: self._score_calc.calculate(s) for s in sessions}
allocations: Dict[str, float] = {}
remaining_power = self.available_power_w
remaining = sessions[:]
for _ in range(10): # max 10 iterazioni per convergenza
if not remaining:
break
total_score = sum(scores[s.connector_id] for s in remaining)
if total_score == 0:
share = remaining_power / len(remaining)
for s in remaining:
allocations[s.connector_id] = max(s.min_power_w, min(s.max_power_w, share))
break
constrained, free_sessions = [], []
for s in remaining:
ratio = scores[s.connector_id] / total_score
proposed = remaining_power * ratio
if proposed <= s.min_power_w:
allocations[s.connector_id] = s.min_power_w
constrained.append(s)
elif proposed >= s.max_power_w:
allocations[s.connector_id] = s.max_power_w
constrained.append(s)
else:
free_sessions.append(s)
if not constrained:
# Nessun vincolo: allocazione finale proporzionale
for s in remaining:
ratio = scores[s.connector_id] / total_score
allocations[s.connector_id] = remaining_power * ratio
break
for s in constrained:
remaining_power -= allocations[s.connector_id]
remaining = free_sessions
return allocations
async def _rebalance(self) -> None:
async with self._lock:
allocs = self._compute_allocations()
# Stampa riepilogo per logging
for cid, power in sorted(allocs.items()):
s = self._sessions.get(cid)
score = self._score_calc.calculate(s) if s else 0
logger.info(
"Priority: %s score=%.1f -> %.0f W (SoC=%.0f%%, %.1fh)",
cid, score, power,
s.soc_percent if s else 0,
s.time_remaining_h if s else 0
)
await self._send_profile(self._station_id, cid, power)
# Demo
async def priority_demo():
async def mock_send(station_id, cid, power_w) -> bool:
print(f" SET {cid}: {power_w:.0f} W")
return True
balancer = PriorityBalancer("HUB_01", 44000.0, mock_send)
now = time.time()
sessions = [
# Urgente: 5% SoC, parte tra 45 min
ChargingSession("C1", "T1", 5.0, 80.0, 77.0, now + 45*60, 22000.0, 1380.0, user_tier=2),
# Normale: 60% SoC, parte tra 8 ore
ChargingSession("C2", "T2", 60.0, 80.0, 40.0, now + 8*3600, 11000.0, 1380.0, user_tier=1),
# Priority tier: 30% SoC, parte tra 2 ore
ChargingSession("C3", "T3", 30.0, 90.0, 100.0, now + 2*3600, 22000.0, 1380.0, user_tier=3),
]
print("\n--- Sessioni aggiunte (44 kW disponibili) ---")
for s in sessions:
await balancer.add_session(s)
print("\n--- SoC C1 sale al 40% (meno urgente) ---")
await balancer.update_soc("C1", 40.0)
if __name__ == "__main__":
asyncio.run(priority_demo())
Învățare de consolidare pentru a minimiza costurile energetice
Algoritmii anteriori reactioneaza la starea curenta. Învățarea prin consolidare depășește: un agent învață, prin milioane de simulări, politica optimă pe care o minimizează costul total al energiei luând în considerare prețurile variabile în timp, prognozele de sosire și plecarea vehiculelor electrice și constrângerile de rețea. În producție în 2025, principalii operatori de huburile de încărcare folosesc variante de Optimizarea proximală a politicii (PPO) pentru reduce costurile cu 15-25% comparativ cu algoritmii reactivi (sursa: ScienceDirect, 2025).
Formularea ca proces de decizie Markov
- Spațiu de stat: pentru fiecare conector (SoC, timpul rămas, puterea curentă) plus starea rețelei (puterea disponibilă, prețul energiei, ora din zi, prognoza PV).
- Spațiu de acțiune: vector continuu al nivelurilor de putere normalizate [0,1] pentru fiecare conector.
- Recompense: minimizează costul energiei, penalizează vehiculele electrice care nu ating vizează SoC până la termen limită, penalizează încălcările limitelor rețelei.
"""
Ambiente Gymnasium per EV Charging RL.
Compatibile con Stable-Baselines3 (PPO, SAC).
Ogni step = 15 minuti. Un episodio = 24 ore (96 step).
Installazione: pip install gymnasium stable-baselines3 numpy
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from dataclasses import dataclass
from typing import List, Optional, Tuple
@dataclass
class EVSession:
soc: float # [0, 1] corrente
target_soc: float # [0, 1] obiettivo
battery_kwh: float
time_remaining_h: float
max_power_kw: float
min_power_kw: float
class EVChargingEnv(gym.Env):
"""
Ambiente Gymnasium per scheduling EV charging.
Obiettivo: minimizzare costo energia rispettando deadline SoC.
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
n_connectors: int = 4,
max_station_power_kw: float = 44.0,
episode_steps: int = 96,
electricity_prices: Optional[np.ndarray] = None
):
super().__init__()
self.n = n_connectors
self.max_power = max_station_power_kw
self.episode_steps = episode_steps
self.prices = electricity_prices if electricity_prices is not None \
else self._italian_biorario_prices()
# Action: potenza normalizzata [0,1] per ogni connettore
self.action_space = spaces.Box(
low=0.0, high=1.0, shape=(n_connectors,), dtype=np.float32
)
# Observation: 5 feature per connettore + 3 globali
obs_dim = n_connectors * 5 + 3
self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32
)
self.sessions: List[Optional[EVSession]] = [None] * n_connectors
self.step_count = 0
def _italian_biorario_prices(self) -> np.ndarray:
"""Prezzi biorari italiani simulati: F1 (08-19) = 0.28, F2/F3 = 0.18 EUR/kWh."""
prices = np.zeros(96)
for i in range(96):
hour = (i * 15) // 60
prices[i] = 0.28 if 8 <= hour < 19 else 0.18
# Variabilità mercato spot
return (prices + np.random.normal(0, 0.02, 96)).clip(0.10, 0.40)
def _get_obs(self) -> np.ndarray:
obs = []
for s in self.sessions:
if s is not None:
obs.extend([
s.soc,
s.target_soc,
min(1.0, s.time_remaining_h / 12.0),
1.0,
s.max_power_kw / self.max_power
])
else:
obs.extend([0.0, 0.0, 0.0, 0.0, 0.0])
step_idx = self.step_count % len(self.prices)
hour = (self.step_count * 15 // 60) % 24
obs.extend([
self.prices[step_idx] / 0.40,
np.sin(2 * np.pi * hour / 24),
np.cos(2 * np.pi * hour / 24)
])
return np.array(obs, dtype=np.float32)
def step(self, action: np.ndarray) -> Tuple:
dt_h = 15 / 60
price = self.prices[self.step_count % len(self.prices)]
# Converti azioni in potenze reali con vincoli
actual_powers = np.zeros(self.n)
for i, s in enumerate(self.sessions):
if s is not None:
power = action[i] * s.max_power_kw
actual_powers[i] = np.clip(power, s.min_power_kw, s.max_power_kw)
# Rispetta limite stazione
total = actual_powers.sum()
if total > self.max_power:
actual_powers *= self.max_power / total
# Calcola reward
energy_kwh = actual_powers.sum() * dt_h
reward = -energy_kwh * price # costo negativo (vogliamo minimizzare)
# Aggiorna SoC e penalita deadline
for i, s in enumerate(self.sessions):
if s is None:
continue
delta_soc = (actual_powers[i] * dt_h * 0.92) / s.battery_kwh
s.soc = min(1.0, s.soc + delta_soc)
s.time_remaining_h -= dt_h
if s.time_remaining_h <= 0:
gap = max(0, s.target_soc - s.soc)
reward -= gap * 50.0 # penalita pesante per deadline mancata
# Penalita violazione rete
if total > self.max_power * 1.01:
reward -= (total - self.max_power) * 5.0
# Bonus ricarica off-peak
hour = (self.step_count * 15 // 60) % 24
if not (8 <= hour < 19) and total > 0:
reward += total * 0.015
self.step_count += 1
obs = self._get_obs()
terminated = self.step_count >= self.episode_steps
return obs, float(reward), terminated, False, {"price": price, "total_kw": total}
def reset(self, seed=None, options=None) -> Tuple:
super().reset(seed=seed)
self.step_count = 0
self._spawn_random_sessions()
return self._get_obs(), {}
def _spawn_random_sessions(self) -> None:
"""Genera sessioni EV casuali realistiche per il training."""
for i in range(self.n):
if np.random.random() > 0.3:
self.sessions[i] = EVSession(
soc=float(np.random.uniform(0.05, 0.7)),
target_soc=float(np.random.uniform(0.7, 0.95)),
battery_kwh=float(np.random.choice([40.0, 60.0, 77.0, 100.0])),
time_remaining_h=float(np.random.uniform(1.0, 10.0)),
max_power_kw=float(np.random.choice([7.4, 11.0, 22.0])),
min_power_kw=1.38
)
else:
self.sessions[i] = None
def train_ppo_agent(total_timesteps: int = 500_000):
"""
Addestra un agente PPO sull'ambiente di EV charging.
Richiede: pip install stable-baselines3
"""
try:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
env = EVChargingEnv(n_connectors=4, max_station_power_kw=44.0)
check_env(env)
model = PPO(
"MlpPolicy", env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1,
)
model.learn(total_timesteps=total_timesteps, progress_bar=True)
model.save("./models/ppo_ev_charging")
print("Agente PPO salvato in ./models/ppo_ev_charging")
return model
except ImportError:
print("Installa: pip install stable-baselines3")
return None
Vehicle-to-Grid (V2G): Vehiculul ca resursă de rețea
V2G reprezintă saltul calitativ al paradigmei EV: vehiculul nu mai este singur un consumator de energie dar devine un resursă bidirecțională. Când prețul energiei este mare sau rețeaua este sub stres, bateriile VE-urile pot trimite energie înapoi în rețea, câștigând bani pentru proprietar și stabilizarea sistemului electric.
Standard V2G în 2025
| Standard | Mături | știri V2G | Stare |
|---|---|---|---|
| ISO 15118-2 | Comunicare EV-EVSE AC/DC | Plug & Charge, încărcare inteligentă | În uz |
| ISO 15118-20 | V2G complet de a doua generație | BPT bidirecțional, integrare DER, programare dinamică | Adopție 2025-2026 |
| OCPP 2.0.1 | Comunicații CP-CSMS | Integrare ISO 15118, profiluri de încărcare V2G | În uz |
| Reg. UE AFIR. | Reglementarea infrastructurii | ISO 15118 obligatoriu pentru sisteme noi din ianuarie 2026 | În vigoare |
În Europa, Utrecht (Olanda) are cea mai mare implementare comercială V2G, cu peste 500 Renault cu două sensuri într-un car-sharing. Fiecare vehicul face bani 600-1.500 EUR/an furnizarea Rezervei de izolare a frecvenței (FCR) OTS olandez TenneT. V2G în Italia și încă în principal experimentale, dar noile modele Nissan Leaf e2+, Volkswagen ID.4 Pro și unele versiuni ale Tesla Model 3 Highland acceptă deja bidirecționalitate.
Răspuns de vârf și frecvență cu bateriile EV
"""
V2G Controller: peak shaving e frequency response.
Gestisce la bidirezionalita delle sessioni EV compatibili V2G.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
class ChargingDirection(str, Enum):
G2V = "G2V" # Grid-to-Vehicle (ricarica normale)
V2G = "V2G" # Vehicle-to-Grid (scarica)
IDLE = "IDLE"
@dataclass
class V2GSession:
"""Sessione V2G con capacità bidirezionale."""
connector_id: str
transaction_id: str
soc_percent: float
battery_capacity_kwh: float
max_charge_kw: float # potenza max di ricarica G2V
max_discharge_kw: float # potenza max di scarica V2G
min_soc_percent: float = 20.0 # SoC minimo garantito (non scarica sotto)
departure_soc_target: float = 80.0
@property
def available_discharge_kwh(self) -> float:
usable = max(0.0, self.soc_percent - self.min_soc_percent) / 100.0
return self.battery_capacity_kwh * usable
class V2GController:
"""Controller V2G per peak shaving e frequency response."""
FREQ_NOMINAL = 50.0
FREQ_DEADBAND = 0.05 # +/- 50 mHz: zona morta
FREQ_FULL_ACTIVATION = 0.5 # +/- 500 mHz: attivazione completa FCR
def __init__(
self,
station_id: str,
max_export_kw: float,
profile_sender
):
self._station_id = station_id
self._max_export = max_export_kw
self._send = profile_sender
self._sessions: Dict[str, V2GSession] = {}
self._lock = asyncio.Lock()
async def add_session(self, session: V2GSession) -> None:
async with self._lock:
self._sessions[session.connector_id] = session
async def peak_shaving(
self,
building_load_kw: float,
peak_threshold_kw: float
) -> dict:
"""
Quando il carico supera la soglia, usa le batterie EV per ridurre l'import.
Restituisce le azioni intraprese per ogni connettore V2G.
"""
if building_load_kw <= peak_threshold_kw:
return {"action": "none", "reason": "under_threshold"}
excess_kw = building_load_kw - peak_threshold_kw
logger.info(
"Peak shaving: %.1f kW > soglia %.1f kW (eccesso %.1f kW)",
building_load_kw, peak_threshold_kw, excess_kw
)
actions = {}
async with self._lock:
sessions = sorted(
self._sessions.values(),
key=lambda s: s.available_discharge_kwh,
reverse=True
)
remaining = excess_kw
for s in sessions:
if remaining <= 0:
break
if s.available_discharge_kwh < 1.0:
continue
discharge = min(s.max_discharge_kw, remaining)
actions[s.connector_id] = {
"direction": ChargingDirection.V2G.value,
"power_kw": discharge
}
# Potenza negativa = V2G discharge in OCPP
await self._send(self._station_id, s.connector_id, -discharge * 1000)
remaining -= discharge
logger.info("V2G: %s scarica %.1f kW (peak shaving)", s.connector_id, discharge)
return {"action": "peak_shaving", "actions": actions, "residual_excess_kw": remaining}
async def frequency_response(self, freq_hz: float) -> dict:
"""
FCR (Frequency Containment Reserve).
Risposta lineare alla deviazione di frequenza, entro 500ms.
Frequenza bassa (< 50 Hz) -> V2G discharge.
Frequenza alta (> 50 Hz) -> aumento ricarica G2V.
"""
deviation = freq_hz - self.FREQ_NOMINAL
abs_dev = abs(deviation)
if abs_dev < self.FREQ_DEADBAND:
return {"action": "none", "frequency_hz": freq_hz}
activation = min(1.0,
(abs_dev - self.FREQ_DEADBAND) / (self.FREQ_FULL_ACTIVATION - self.FREQ_DEADBAND)
)
actions = {}
async with self._lock:
sessions = list(self._sessions.values())
for s in sessions:
if deviation < 0 and s.available_discharge_kwh > 0.5:
# Frequenza bassa: scarica per supportare la rete
power_kw = s.max_discharge_kw * activation
actions[s.connector_id] = {"direction": "V2G", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, -power_kw * 1000)
elif deviation > 0 and s.soc_percent < 90:
# Frequenza alta: aumenta ricarica per assorbire eccesso
power_kw = s.max_charge_kw * activation
actions[s.connector_id] = {"direction": "G2V", "power_kw": power_kw}
await self._send(self._station_id, s.connector_id, power_kw * 1000)
return {
"frequency_hz": freq_hz,
"deviation_hz": deviation,
"activation_factor": activation,
"actions": actions
}
Integrare fotovoltaică: Surplus solar către vehiculele electrice
Integrarea între sistemul fotovoltaic și stația de încărcare pentru vehicule electrice este unul dintre cazurile de utilizare cu ROI mai rapid în managementul energiei. Când fotovoltaicul produce mai mult decât ce consumă clădirea, surplusul este direcționat către vehiculele electrice în loc să fie alimentat rețea (remunerată la ~0,07-0,10 EUR/kWh GSE) - permițând utilizarea energiei care altfel ar costa 0,28 EUR/kWh din retea. Economiile nete e 0,18-0,21 EUR/kWh pentru fiecare kWh autoconsumat prin EV.
| Strategie | Logică | Beneficia | Prescripţie |
|---|---|---|---|
| Numai Exces | Încărcați NUMAI EV cu surplus PV | Autoconsum maxim, cost de rețea zero | Putere variabilă, încărcare lentă |
| Solar-în primul rând | Prioritate solară, intactă din rețea | Încărcare stabilă cu maxim solar | O parte din energie din rețea |
| Maximizator verde | Optimizați la orizont de 4 ore cu prognoza meteo | Maximizați % energie regenerabilă | Necesită previziuni precise |
| Optimizator de costuri | Solar + prețuri spot + V2G combinate | Cost minim absolut | Complexitate algoritmică ridicată |
"""
Solar-EV Integration Controller.
Implementa strategie Excess-Only e Solar-First
con isteresi per evitare oscillazioni frequenti.
"""
from dataclasses import dataclass
from typing import Optional
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class PowerState:
"""Stato istantaneo del sistema energetico."""
pv_production_kw: float
building_load_kw: float # consumo edificio escluso EV
electricity_price_eur: float = 0.25
feed_in_tariff_eur: float = 0.07
@property
def net_surplus_kw(self) -> float:
return max(0.0, self.pv_production_kw - self.building_load_kw)
@property
def savings_per_kwh(self) -> float:
"""Risparmio per kWh autoconsumato invece di cedere+comprare."""
return self.electricity_price_eur - self.feed_in_tariff_eur
class SolarEVController:
"""
Controlla la ricarica EV in funzione del surplus fotovoltaico.
Supporta Excess-Only e Solar-First con isteresi configurabile.
"""
HYSTERESIS_KW = 0.3 # evita comandi continui per piccole oscillazioni
def __init__(
self,
balancer,
strategy: str = "solar_first",
min_ev_power_kw: float = 1.38,
max_grid_supplement_kw: float = 5.0
):
self._balancer = balancer
self._strategy = strategy
self._min_ev_power = min_ev_power_kw
self._max_grid_supp = max_grid_supplement_kw
self._last_limit_kw = 0.0
async def update(self, power_state: PowerState, n_active_evs: int) -> dict:
"""
Ricalcola e applica il limite di potenza EV.
Chiamato ogni 5-15 secondi dal loop di controllo.
"""
if n_active_evs == 0:
return {"action": "no_ev", "allocated_kw": 0}
if self._strategy == "excess_only":
target_kw = self._excess_only(power_state, n_active_evs)
else:
target_kw = self._solar_first(power_state, n_active_evs)
# Applica isteresi
if abs(target_kw - self._last_limit_kw) > self.HYSTERESIS_KW:
await self._balancer.update_dynamic_limit(target_kw * 1000)
self._last_limit_kw = target_kw
logger.info("Solar routing [%s]: %.1f kW -> EV", self._strategy, target_kw)
hourly_savings = target_kw * power_state.savings_per_kwh
return {
"strategy": self._strategy,
"pv_kw": power_state.pv_production_kw,
"surplus_kw": power_state.net_surplus_kw,
"allocated_ev_kw": target_kw,
"savings_eur_h": round(hourly_savings, 3)
}
def _excess_only(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
return surplus if surplus >= min_total else 0.0
def _solar_first(self, ps: PowerState, n_evs: int) -> float:
surplus = ps.net_surplus_kw
min_total = self._min_ev_power * n_evs
if surplus < min_total:
grid_needed = min_total - surplus
return min_total if grid_needed <= self._max_grid_supp else surplus
return min(surplus + self._max_grid_supp, surplus + self._max_grid_supp)
async def solar_routing_demo():
"""Simula una giornata con produzione FV tipica di aprile in Italia."""
hourly_pv = [0, 0, 0, 0, 0, 0.5, 2, 5, 9, 14, 18, 21,
22, 20, 16, 10, 6, 2, 0.5, 0, 0, 0, 0, 0]
print("Ora | PV kW | Bldg kW | Surplus | EV kW | Risparmio €/h")
print("-" * 62)
for h in range(24):
pv = hourly_pv[h]
bldg = 5.0 + (2.0 if 8 <= h < 18 else 0)
surplus = max(0, pv - bldg)
price = 0.28 if 8 <= h < 19 else 0.18
feed_in = 0.07
ev_alloc = min(surplus, 11.0)
savings = ev_alloc * (price - feed_in)
print(
f"{h:02d}:00 | {pv:5.1f} | {bldg:7.1f} | "
f"{surplus:7.1f} | {ev_alloc:5.1f} | {savings:.3f}"
)
if __name__ == "__main__":
asyncio.run(solar_routing_demo())
Arhitectura microserviciilor pentru controler de încărcare
Un sistem de echilibrare a încărcăturii EV de producție și o arhitectură distribuită cu componente specializate care comunică prin evenimente. Iată principalele servicii:
| Serviciu | Responsabilitate | Stive | ALS |
|---|---|---|---|
| Gateway OCPP | Server WebSocket, traducere OCPP 1.6/2.0.1 | Python ocpp, asincron | 99,9%, <100 ms |
| Controler de încărcare | Algoritm de echilibrare a sarcinii, SetChargingProfile | FastAPI, Redis | 99,9%, <500ms |
| Manager de energie | Măsurați PV/BESS/rețea, calculați disponibilitatea | Modbus TCP, MQTT, Python | 99,5%, sondaj 1s |
| Prognozatorii | Prognoza sosire/plecare, preturi, FV | LightGBM/LSTM, FastAPI | 99%, actualizare 15 min |
| Gateway DSO | Semnale OpenADR, limite dinamice | Python OpenADR 2.0b | 99%, <2s |
| Facturare | Contorizare, facturare, sesiuni | FastAPI, PostgreSQL | 99,5% |
Charge Controller API cu FastAPI
"""
Charge Controller Service - FastAPI REST API.
Endpoints per load balancing, monitoring e configurazione.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime, timezone
import logging
logger = logging.getLogger(__name__)
app = FastAPI(
title="EV Charge Controller API",
version="2.1.0",
description="Load balancing real-time per stazioni di ricarica EV"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "PUT"],
allow_headers=["*"]
)
# -------------------------------------------------------
# Modelli Pydantic
# -------------------------------------------------------
class ConnectorInfo(BaseModel):
connector_id: str
status: str
allocated_power_w: float
soc_percent: Optional[float]
session_duration_min: float
class StationStatus(BaseModel):
station_id: str
available_power_w: float
algorithm: str
connectors: List[ConnectorInfo]
last_updated: str
class EVConnectedEvent(BaseModel):
connector_id: str
transaction_id: str
initial_soc: Optional[float] = None
battery_capacity_kwh: Optional[float] = None
departure_timestamp: Optional[int] = None
user_tier: int = Field(default=1, ge=1, le=3)
class PowerLimitUpdate(BaseModel):
limit_watts: float = Field(gt=0, le=200_000)
source: str = "manual" # "manual" | "dso" | "solar" | "openadr"
valid_until_ts: Optional[int] = None
class AlgorithmSwitch(BaseModel):
algorithm: str = Field(pattern="^(equal_share|priority|rl_ppo)$")
params: Dict = {}
# In-memory registry (in produzione: Redis)
_station_registry: Dict = {}
_algorithm_map: Dict[str, str] = {}
# -------------------------------------------------------
# Endpoints
# -------------------------------------------------------
@app.get("/health")
async def health():
return {
"status": "healthy",
"ts": datetime.now(timezone.utc).isoformat(),
"version": "2.1.0"
}
@app.get("/stations/{station_id}/status", response_model=StationStatus)
async def get_status(station_id: str):
if station_id not in _station_registry:
raise HTTPException(404, f"Stazione {station_id} non trovata")
balancer = _station_registry[station_id]
raw = balancer.get_status()
return StationStatus(
station_id=station_id,
available_power_w=raw["available_power_w"],
algorithm=_algorithm_map.get(station_id, "equal_share"),
connectors=[ConnectorInfo(**c) for c in raw["connectors"]],
last_updated=datetime.now(timezone.utc).isoformat()
)
@app.post("/stations/{station_id}/events/connected")
async def ev_connected(station_id: str, event: EVConnectedEvent, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(
_station_registry[station_id].on_ev_connected,
event.connector_id,
event.transaction_id
)
return {"status": "accepted", "rebalancing": True}
@app.delete("/stations/{station_id}/connectors/{connector_id}/session")
async def ev_disconnected(station_id: str, connector_id: str, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(_station_registry[station_id].on_ev_disconnected, connector_id)
return {"status": "accepted"}
@app.put("/stations/{station_id}/power-limit")
async def set_power_limit(station_id: str, limit: PowerLimitUpdate, bg: BackgroundTasks):
if station_id not in _station_registry:
raise HTTPException(404, "Stazione non trovata")
bg.add_task(_station_registry[station_id].update_dynamic_limit, limit.limit_watts)
logger.info("Limite potenza: %s -> %.0f W (source=%s)", station_id, limit.limit_watts, limit.source)
return {"status": "accepted", "new_limit_w": limit.limit_watts, "source": limit.source}
@app.put("/stations/{station_id}/algorithm")
async def set_algorithm(station_id: str, config: AlgorithmSwitch):
_algorithm_map[station_id] = config.algorithm
return {"status": "ok", "algorithm": config.algorithm}
@app.get("/stations/{station_id}/metrics")
async def get_metrics(station_id: str):
"""KPI della stazione (in produzione: query InfluxDB/TimescaleDB)."""
return {
"station_id": station_id,
"period": "last_24h",
"charging_satisfaction_rate": 0.94,
"peak_reduction_percent": 31.2,
"energy_cost_savings_eur": 47.80,
"solar_share_percent": 42.3,
"grid_stress_index": 0.23,
"total_energy_kwh": 201.6,
"avg_power_kw": 8.4,
"rebalance_latency_ms_p95": 180
}
Metrici și KPI pentru evaluarea echilibrării sarcinii
Pentru a evalua eficacitatea unui sistem de echilibrare a sarcinii, sunt necesari KPI cuantificabili care măsoară atât satisfacția utilizatorilor, cât și optimizarea energiei.
| KPI-uri | Definiţie | Ţintă | Formula |
|---|---|---|---|
| Rata de satisfacție la încărcare (CSR) | % sesiuni care ating obiectivele SoC până la termen limită | >90% | ok_sessions / total_sessions |
| Reducere maximă % | Reducere maximă față de lipsa managementului | >25% | (peak_unmanaged - peak_managed) / peak_unmanaged |
| Economii de energie % | Economii la costul kWh față de încărcare imediată | >15% | (cost_de_bază - cost_gestionat) / cost_de_bază |
| % autoconsum solar | % energie EV din PV local | >40% | solar_energy_ev / total_energy_ev |
| Indicele de stres al rețelei (GSI) | Presiunea medie pe transformator [0-1] | <0,3 | avg(P_actual / P_nominal_trafo) |
| Reechilibrați latența P95 | S-a aplicat timpul de la eveniment până la profilul OCPP | <2s | t_ocpp_conf - t_event_received (p95) |
Context italian: ARERA, CER cu EV și Stimulente 2025
Implementarea sistemelor de echilibrare a sarcinii vehiculelor electrice în Italia prezintă provocări de reglementare și piață care influențează direct alegerile arhitecturale.
Tarife la două ore ARERA și Smart Charging
Sistemul tarifar italian creează un stimulent natural pentru taxarea în afara orelor de vârf:
- Banda F1 (Luni-Vineri 8:00-19:00): aproximativ 0,26-0,31 EUR/kWh. Banda de evitat pentru încărcarea EV.
- Banda F2 (Luni-Vineri 7-8, 19-23; Sâmbătă 7-23): aproximativ 0,20-0,24 EUR/kWh.
- Banda F3 (noapte 23:00-7:00; duminica și sărbătorile toată ziua): aproximativ 0,16-0,19 EUR/kWh. Ideal pentru încărcare peste noapte.
Un algoritm de timp de utilizare care schimbă 60% din încărcare de la F1 la F3 reduce costurile anuale de taxare pentru un administrator de flotă de 18-25%.
Comunități de energie regenerabilă (CER) cu EV
Decretul Legislativ 199/2021 și Decretul Ministerial MASE din 7 decembrie 2023 au deschis oportunități concrete pentru integrarea EV în CER-urile italiene:
- VE-urile care se încarcă cu surplusul PV comunitar generează stimulente GSE pe cota de energie împărtășită (rambursare tarif + stimulent până la 11 cenți/kWh).
- Cu V2G, bateriile EV acționează ca stocare virtuală CER, înlocuind energie solară de la amiază până seara (tur de vârf).
- GSE măsoară energia partajată prin MACSE, inclusiv fluxuri bidirecționale de vehicule electrice V2G în contabilitatea comunității.
Infrastructura italiană de vehicule electrice până în 2025
Italia are peste 58.000 de puncte publice de încărcare (Motus-E, sfârșitul anului 2024), dintre care 22% DC rapid (față de 14% în 2023). În 2025, au fost înregistrate 94.230 BEV (+46%), cu o cotă de piață de 6,2%. Potrivit Terna, 7% din populația fără încărcare inteligentă este răspândită Alimentatorul de distribuție va fi supraîncărcat până în 2027 în zonele cele mai dense ale Nordul Italiei. PNRR a alocat peste 740 de milioane de euro pentru infrastructură completare, cu un obiectiv de 21.000 de noi puncte publice până în 2026.
Acordați atenție stimulentelor
Creditele fiscale pentru încărcarea inteligentă în conformitate cu Tranziția 5.0 sunt supuse inspectat periodic. Verificați întotdeauna condițiile actualizate pe site-ul GSE și CSEA înainte de a planifica investiții bazate pe stimulente. Tarifele și condițiile de acces pot varia.
Concluzii și Foaia de parcurs de implementare
Echilibrarea sarcinii EV nu este o problemă de eleganță algoritmică: este o necesitate infrastructură critică pentru tranziția energetică. Datele sunt fără echivoc: cu 5 milioane de vehicule electrice negestionate, vârful serii crește cu 35.000 MW; cu inteligent încărcarea parcului de vehicule electrice în sine devine o resursă flexibilă care reduce vârful cu 34%.
Pentru o echipă de dezvoltare care implementează aceste sisteme în producție, foaia de parcurs optim pentru fazele ulterioare și:
- Faza 1 - Cotă egală: simplu, robust, suficient pentru instalatii până la 30-40 de trucuri. Prioritate maximă: implementați corect recalcularea condus de evenimente cu secvența de reducere-apoi-creștere. Timp estimat: 2-3 săptămâni.
-
Faza 2 - Integrare PV: ROI imediat fără ML. Este nevoie doar de
măsurarea surplusului solar prin Modbus/MQTT și integrarea cu echilibrul prin
update_dynamic_limit(). Câștig rapid cu rambursare în 6-18 luni. - Faza 3 - Bazat pe prioritate: adăugați când aveți aplicația mobilă de colectat SoC și termenul limită de începere. Câștigul în CSR (Charging Satisfaction Rate) este măsurabil și diferențiază serviciul de concurență.
- Faza 4 - Infrastructură V2G: proiectarea arhitecturii bidirectionale acum, chiar dacă nu aveți încă EV-uri V2G. OCPP 2.0.1 cu ISO 15118-20 devine obligatoriu pentru centrale noi din 2026 (EU AFIR). Modernizarea costă de 5-10 ori mai mult.
- Faza 5 - RL în producție: investiți în deep RL numai după 6+ luni de date de calitate și cu o echipă MLOps dedicată. Câștigul de cost este real (15-25%) dar complexitatea operațională este mare.
Articole similare din seria EnergyTech
- Articolul 4: Network Digital Twin - simulați impactul EV-urilor asupra rețelei înainte de implementare
- Articolul 5: Prognoza energiei regenerabile - prognoza producției fotovoltaice pentru rutarea solară
- Articolul 7: MQTT și InfluxDB - telemetria în timp real a parametrilor de încărcare
- Articolul 2: Arhitectura DERMS - integrarea vehiculelor electrice ca DER în sistemul de management distribuit
Perspective între serii
- Seria MLOps (art. 306-315): implementarea modelului PPO cu MLflow, monitorizare drift și recalificare
- Seria AI Engineering (art. 316-325): RAG privind documentația OCPP și ISO 15118 cu întreprinderea LLM
- Data & AI Business Series (art. 267-280): ROI, valori de afaceri și foaie de parcurs pentru investiții în încărcare inteligentă
- Seria PostgreSQL AI (art. 356-361): serii temporale de date de reîncărcare cu TimescaleDB pe PostgreSQL







