OCPP 2.x: Construirea sistemelor de încărcare EV pentru întreprinderi
Piața globală a infrastructurii de încărcare a vehiculelor electrice a ajuns din urmă 40,22 miliarde de dolari în 2025 și va crește cu un CAGR de 25% până în 2033, conform Grand View Research. În Europa, AFIR (Combustibili Alternativi Regulamentul privind infrastructura) impune termene obligatorii: la fiecare 60 km pe rețeaua TEN-T trebuie să existe o stație de cel puțin 150 kW până la sfârșitul anului 2025. În Italia, cu dincolo 73.000 de puncte de încărcare publice până la 31 decembrie 2025 și un PNRR care a alocat peste 700 de milioane de euro pentru High Power Charging, sector și în explozie completă.
În centrul acestei infrastructuri se aflăProtocolul punctului de încărcare deschis (OCPP), standardul deschis dezvoltat de Open Charge Alliance (OCA) care definește cum stațiile de încărcare (Charging Stations) comunică cu sistemele de management centralizate (CSMS - Charging Station Management Systems). Adoptat de peste 250 de organizații în peste 40 de țări, OCPP a devenit standardul de facto în industrie, asigurând interoperabilitate între hardware de la diferiți producători și software de management diferit.
În acest articol tehnic avansat explorăm OCPP 2.0.1 și 2.1 in adâncime: arhitectura WebSocket, structura mesajului, Modelul dispozitivului, profilurile securitate, încărcare inteligentă și cum să construiți un backend CSMS scalabil, pregătit pentru producție 10 până la 100.000 de stații, cu exemple complete de cod în TypeScript și Python.
Ce vei învăța
- Evoluția protocolului OCPP de la 1.2 la 2.1 și diferențele arhitecturale fundamentale
- Structura mesajului JSON (CALL, CALLRESULT, CALLERROR) și transportul WebSocket
- Cele 16 blocuri funcționale ale OCPP 2.0.1 și modelul ierarhic al dispozitivului
- Cele 3 profiluri de securitate: Basic Auth, TLS, mTLS cu certificate X.509
- Încărcare inteligentă avansată: SetChargingProfile, gestionarea încărcăturii, barbierit maxim, integrare solară
- Implementarea backend CSMS în Python cu asyncio și PostgreSQL
- ISO 15118 și Plug & Charge: PKI, eMAID, V2G bidirecțional
- Arhitectură scalabilă: de la 10 la 100.000 de puncte de încărcare cu clustering WebSocket și Kafka
- Monitorizare, tablou de bord Grafana și metrici operaționale cheie
- Reglementări AFIR, PNIRE și stimulente italiene pentru infrastructura de încărcare
Seria EnergyTech: 10 articole despre energia digitală
Acest articol este primul dintr-o serie dedicatăEnergyTech: protocoale, arhitecturi și software care revoluționează gestionarea energiei electrice, de la încărcarea EV la rețele inteligente, de la sistemele BESS la optimizarea energiei cu AI.
| # | Articol | Tehnologii | Nivel |
|---|---|---|---|
| 1 | Protocol OCPP 2.x: construirea sistemelor de încărcare pentru vehicule electrice (sunteți aici) | OCPP, WebSocket, Python, ISO 15118 | Avansat |
| 2 | Rețea inteligentă și OpenADR: răspuns la cerere și flexibilitate energetică | OpenADR, IEEE 2030.5, REST, MQTT | Avansat |
| 3 | BESS (Battery Energy Storage): Algoritmi de optimizare și BMS | Python, optimizare LP, bus CAN, Modbus | Avansat |
| 4 | Digital Twin pentru rețele electrice cu Kafka și Machine Learning | Kafka, InfluxDB, Grafana, ML, Python | Avansat |
| 5 | SCADA și ICS pentru infrastructuri critice: securitate și protocoale | Modbus, DNP3, IEC 61850, OPC UA | Avansat |
| 6 | Optimizarea energiei cu AI: prognoza consumului și prognoza cererii | TensorFlow, LSTM, Prophet, FastAPI | Avansat |
| 7 | Centrală electrică virtuală: agregați DER cu Python și API-ul REST | DER, DERMS, REST, Python, PostgreSQL | Intermediar |
| 8 | Piețe de energie și tranzacționare algoritmică: EPEX SPOT și API | Python, tranzacționare API, serii cronologice | Avansat |
| 9 | Software de contabilizare a carbonului: Scopul 1, 2, 3 și Raportarea GHG | Python, GHG Protocol, API, raportare | Intermediar |
| 10 | Microrețele și insula energetică: arhitecturi rezistente | Microrețele, EMS, edge computing, IoT | Avansat |
Evoluția protocolului: de la OCPP 1.2 la 2.1
Înțelegerea evoluției OCPP este fundamentală pentru a aprecia alegerile arhitecturale versiunea 2.0.1 și planificați migrarea de la sistemele vechi. S-a născut protocolul în 2010 pentru a rezolva problema deinteroperabilitate: fiecare producător dintre stații aveau propriul protocol proprietar, făcând imposibilă gestionarea multi-furnizor.
| Versiune | An | Transport | Caracteristici cheie | Desfăşurare |
|---|---|---|---|---|
| OCPP 1.2 | 2010 | SOAP/XML | Prima versiune publică, operațiuni de bază: pornire, autorizare, pornire/oprire | Dezafectat |
| OCPP 1.5 | 2012 | SOAP/XML | Rezervare, bază de încărcare inteligentă, transfer de date, resetare | Moştenire |
| OCPP 1.6 | 2015 | SOAP + JSON/WS | WebSocket, profiluri de încărcare, mesaje de declanșare, listă de autentificare locală | Foarte răspândit |
| OCPP 2.0 | 2018 | JSON/WS | Model de dispozitiv, blocuri funcționale, bază ISO 15118 (înlocuit cu 2.0.1) | Rar |
| OCPP 2.0.1 | 2020 | Numai JSON/WS | 16 blocuri funcționale, model de dispozitiv, 3 profiluri de securitate, încărcare inteligentă avansată | Standard actual |
| OCPP 2.1 | 2025 | Numai JSON/WS | Compatibil cu versiunea inversă 2.0.1, V2G ISO 15118-20, încărcare nativă, schimbare baterie | In curs de dezvoltare |
Diferențele fundamentale dintre OCPP 1.6 și 2.0.1
OCPP 2.0.1 nu este o simplă actualizare incrementală: este un rescriere arhitectural complet care modifică terminologia, structura mesajului și model conceptual. Această incompatibilitate „prin proiect” era necesară pentru depășirea limitelor structurale ale OCPP 1.6.
| astept | OCPP 1.6 | OCPP 2.0.1 |
|---|---|---|
| Terminologia serverului | Sistemul central | CSMS (Sistem de management al stației de încărcare) |
| Terminologia clientului | Punct de încărcare | Stație de încărcare |
| Unitate de încărcare | Conector | EVSE (Echipament de alimentare pentru vehicule electrice) |
| Tranzacții | StartTransaction / StopTransaction | Unified TransactionEvent (Început/Actualizat/Încheiat) |
| Configurare | Taste fixe (ChangeConfiguration) | Model de dispozitiv ierarhic (GetVariables/SetVariables) |
| Siguranţă | Opțional, nestandardizat | 3 profiluri de securitate integrate și obligatorii |
| Încărcare inteligentă | Baza (profile conector) | Avansat: pentru EVSE, prioritate stivă, programe compuse |
| ISO 15118 | Nu este acceptat | Bloc nativ M (Plug & Charge) |
| Organizare specifică | Lista plată de operațiuni | 16 blocuri funcționale cu cazuri de utilizare, cerințe, diagrame |
OCPP 2.1: Ce este nou în 2025
Lansat în ianuarie 2025 de Open Charge Alliance, OCPP 2.1 rămâne complet compatibil cu versiunea 2.0.1 și adaugă funcții critice pentru viitor:
- V2G avansat (Vehicle-to-Grid).: Suport complet pentru ISO 15118-20 cu transfer de putere bidirecțional, permițând EV-urile ca centrale electrice virtuale
- Integrarea DER: Instrumente avansate pentru optimizarea energiei distribuite cu resurse precum panouri fotovoltaice și sisteme de stocare
- Prețuri native: structuri de date standardizate pentru a comunica tarifele în timp real (kWh, timp, taxe de parcare) fără extensii specifice furnizorului
- Schimb baterie: suport pentru stațiile de schimb de baterii pentru vehicule cu două și trei roți
- Reluați tranzacția: posibilitatea de a relua o tranzacție după o repornire forțată fără pierderi de date
- Cost local: calculul costurilor direct pe stație pentru cazurile offline
Arhitectura de comunicare WebSocket
OCPP 2.0.1 folosește exclusiv JSON prin WebSocket ca protocol transport, abandonând complet SOAP/XML. Această alegere arhitecturală oferă Comunicare persistentă în două sensuri, latență scăzută, sarcină utilă ușoară și compatibilitate nativ cu infrastructuri web moderne.
Topologie client-server
În modelul OCPP, Stație de încărcare se comporta ca client WebSockets iar cel CSMS ca Server WebSocket. Stația de încărcare inițiază conexiunea și o menține activă cu un mecanism a bătăilor inimii. CSMS poate trimite comenzi către stația de pe aceeași conexiune Deschideți WebSocket, nu este nevoie de o conexiune inversă (fără sondaj, fără împingere separat).
Charging Station CSMS
| |
|--- WebSocket CONNECT ---------------->|
| wss://csms.example.com/ocpp/CS001 |
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| Authorization: Basic base64(...) |
| |
|<-- HTTP 101 Switching Protocols -------|
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| |
|--- BootNotification.req ------------->|
|<-- BootNotification.conf --------------|
| (interval: 300, status: Accepted) |
| |
|--- StatusNotification.req[EVSE1] ---->|
|<-- StatusNotification.conf ------------|
| |
|--- Heartbeat.req (ogni 300s) -------->|
|<-- Heartbeat.conf --------------------|
| |
| <-- utente avvicina RFID --- |
|--- Authorize.req -------------------->|
|<-- Authorize.conf (Accepted) ----------|
| |
|--- TransactionEvent(Started) -------->|
|<-- TransactionEvent.conf -------------|
| |
|<-- SetChargingProfile.req ------------| (CSMS gestisce load)
|--- SetChargingProfile.conf ---------->|
| |
|--- MeterValues (ogni 60s) ----------->|
|<-- MeterValues.conf ------------------|
| |
|--- TransactionEvent(Ended) ---------->|
|<-- TransactionEvent.conf -------------|
URL de conexiune și subprotocol
Stația de încărcare se conectează la CSMS cu o adresă URL care include propria sa
identificator unic ca ultimul segment al drumului.
Subprotocolul WebSocket ocpp2.0.1 se negociază în timpul strângerii de mână
HTTP pentru a asigura compatibilitatea versiunii de protocol.
# Formato URL
wss://csms.example.com/ocpp/{chargingStationId}
# Esempi reali
wss://csms.example.com/ocpp/IT-MIL-STATION-001
wss://csms.example.com/ocpp/EVSE-PARK-NORD-042
wss://csms.example.com/ocpp/CPO-AUTOGRILL-A7-01
# Headers WebSocket obbligatori
Sec-WebSocket-Protocol: ocpp2.0.1
Authorization: Basic {base64(stationId:password)} # Security Profile 1-2
# Con Security Profile 3 (mTLS): nessun header Authorization,
# l'autenticazione avviene tramite certificato client TLS
Structura mesajului OCPP 2.0.1
OCPP 2.0.1 definește trei tipuri de mesaje JSON, toate transportate ca cadre Textul WebSocket. Fiecare mesaj este un matrice JSON cu un format exacte pe baza tipului de mesaj. Această structură simplă facilitează analiza și depanare față de suprasarcina SOAP/XML.
CALL (Solicitare) - MessageTypeId 2
Mesajul APEL reprezintă o cerere transmisă de una dintre părți (Stație de încărcare sau CSMS) la altul. Conține un ID unic, numele acțiunii și sarcina utilă a cererii.
// Formato: [MessageTypeId, MessageId, Action, Payload]
// Esempio: BootNotification dalla Charging Station
[2, "19223201", "BootNotification", {
"chargingStation": {
"model": "SuperCharger-500",
"vendorName": "EVPower Inc.",
"serialNumber": "SN-2025-00142",
"firmwareVersion": "3.2.1",
"modem": {
"iccid": "8939100000000000001",
"imsi": "310260000000001"
}
},
"reason": "PowerUp"
}]
// Esempio: TransactionEvent dalla Charging Station
[2, "tx-evt-001", "TransactionEvent", {
"eventType": "Started",
"timestamp": "2026-03-09T10:30:00Z",
"triggerReason": "CablePluggedIn",
"seqNo": 0,
"transactionInfo": {
"transactionId": "TXN-2026-0309-001",
"chargingState": "EVConnected"
},
"evse": { "id": 1, "connectorId": 1 },
"idToken": {
"idToken": "RFID-04A2B3C4D5",
"type": "ISO14443"
}
}]
CALLRESULT (Răspuns) - MessageTypeId 3
Mesajul REZULTAT APEL și răspunsul pozitiv la un APEL. The MessageId trebuie să se potrivească exact cu cel al apelului original pentru a permite corelația cerere-răspuns.
// Formato: [MessageTypeId, MessageId, Payload]
// Risposta a BootNotification
[3, "19223201", {
"currentTime": "2026-03-09T10:00:00Z",
"interval": 300,
"status": "Accepted"
}]
// Risposta a TransactionEvent (Started)
[3, "tx-evt-001", {
"totalCost": 0,
"chargingPriority": 0,
"idTokenInfo": {
"status": "Accepted",
"groupIdToken": {
"idToken": "GROUP-FLEET-01",
"type": "Central"
}
}
}]
CALLERROR (Eroare) - MessageTypeId 4
Mesajul CALLEROARE este trimis atunci când destinatarul nu poate procesa un APEL. Include un cod de eroare standardizat, descriere detalii lizibile și structurate.
// Formato: [MessageTypeId, MessageId, ErrorCode, ErrorDescription, ErrorDetails]
[4, "19223201", "FormatViolation",
"Il campo 'vendorName' supera la lunghezza massima di 50 caratteri",
{
"field": "chargingStation.vendorName",
"maxLength": 50,
"actualLength": 67
}
]
// Codici di errore OCPP 2.0.1 standardizzati:
// FormatViolation - messaggio JSON malformato
// GenericError - errore generico non classificabile
// InternalError - errore interno del ricevente
// MessageTypeNotSupported - tipo di messaggio non supportato
// NotImplemented - azione riconosciuta ma non implementata
// NotSupported - azione non supportata dall'implementazione
// OccurrenceConstraintViolation - violazione cardinalita elementi
// PropertyConstraintViolation - vincolo su una proprietà violato
// ProtocolError - violazione del protocollo OCPP
// RpcFrameworkError - errore nel framework RPC di base
// SecurityError - errore di sicurezza o autenticazione
// TypeConstraintViolation - tipo di dato non corretto
Corelația cerere-răspuns: reguli critice
Fiecare APEL trebuie să aibă un ID unic de mesaj (maximum 36 de caractere alfanumeric) care nu a fost utilizat anterior pe aceeași conexiune de către același expeditor. CALLRESULT sau CALLERROR trebuie să le folosească același MessageId. Expeditorul trebuie să mențină un timeout (recomandat: 30 de secunde) după care cererea este considerată eșuată. Numai un APEL poate fi în așteptare la un moment dat fiecare sens de comunicare: stația nu poate trimite un al doilea APEL până când nu a primit niciun răspuns la primul.
Cele 16 blocuri funcționale ale OCPP 2.0.1
OCPP 2.0.1 organizează toate funcționalitățile în 16 blocuri funcționale (A la P), fiecare conținând cazuri de utilizare specifice cu cerințe detaliate, condiţii preliminare şi diagrame de succesiune. Această organizare modulară permite implementatorii să declare blocurile pe care le suportă și testerii să verifice conformitatea bloc cu bloc.
| Bloc | Nume | Mesaje de top | Obligatoriu |
|---|---|---|---|
| A | Securitate | SecurityEventNotification, SignCertificate, CertificateSigned | Si |
| B | Aprovizionare | BootNotification, SetVariables, GetVariables, NotifyReport | Si |
| C | Autorizare | Authorize, ClearCache, GetLocalListVersion | Si |
| D | Lista de autorizații locale | SendLocalList, GetLocalListVersion | No |
| E | Tranzacţie | TransactionEvent, GetTransactionStatus, MeterValues | Si |
| F | Telecomanda | RequestStartTransaction, RequestStopTransaction, UnlockConnector | No |
| G | Disponibilitate | StatusNotification, ChangeAvailability, Heartbeat | Si |
| H | Rezervare | Rezervați acum, anulați rezervarea | No |
| I | Tarif și cost | CostUpdated, ShowMessage | No |
| J | Contorizare | MeterValues (măsurători de energie/putere/curent) | Si |
| K | Încărcare inteligentă | SetChargingProfile, ClearChargingProfile, GetChargingProfiles, ReportChargingProfiles | No |
| L | Managementul firmware-ului | UpdateFirmware, FirmwareStatusNotification | No |
| M | Gestiunea certificatului ISO 15118 | Get15118EVCertificate, DeleteCertificate, CertificateSigned | No |
| N | Diagnosticare | GetLog, LogStatusNotification, SetMonitoringBase, SetVariableMonitoring | No |
| O | Afișează mesajul | SetDisplayMessage, GetDisplayMessages, ClearDisplayMessage | No |
| P | Transfer de date | DataTransfer (extensii specifice furnizorului) | No |
Model de dispozitiv: inima OCPP 2.0.1
Il Modelul dispozitivului și inovația arhitecturală majoră a OCPP 2.0.1. Înlocuiește sistemul de chei de configurare hard al OCPP 1.6 (ChangeConfiguration cu chei) cu a model ierarhic flexibil bazat pe Componente și variabile. Fiecare stație își descrie complet structura și configurație într-un mod independent de furnizor.
ChargingStation (radice)
|
+-- Controller (computer della stazione)
| +-- Variables: Vendor, Model, FirmwareVersion, SerialNumber
|
+-- EVSE[1] (punto di ricarica 1)
| +-- Variables: AvailabilityState, Power, SupplyPhases
| +-- Connector[1] (connettore CCS2 / DC)
| | +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
| +-- Connector[2] (connettore CHAdeMO)
| +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
|
+-- EVSE[2] (punto di ricarica 2)
| +-- Connector[1] (connettore Type2 AC / 22kW)
| +-- Variables: ConnectorType, Phases, MaxCurrent
|
+-- PowerMeter (contatore principale)
| +-- Variables: Energy.Active.Import.Register, Power.Active.Import
|
+-- NetworkInterface (ETH0/LTE)
| +-- Variables: Type, SSID, SignalStrength, ActiveNetworkProfile
|
+-- SecurityCtrlr
| +-- Variables: SecurityProfile, CertificateEntries
|
+-- SmartChargingCtrlr
+-- Variables: ChargingProfileMaxStackLevel, ChargeProfileKindsSupported
// CALL dal CSMS: legge stato EVSE e tipo connettore
[2, "get-var-001", "GetVariables", {
"getVariableData": [
{
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeType": "Actual"
},
{
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeType": "Actual"
},
{
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeType": "Actual"
}
]
}]
// CALLRESULT dalla Charging Station
[3, "get-var-001", {
"getVariableResult": [
{
"attributeStatus": "Accepted",
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeValue": "Available"
},
{
"attributeStatus": "Accepted",
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeValue": "cCCS2"
},
{
"attributeStatus": "Accepted",
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeValue": "5"
}
]
}]
Profiluri de securitate OCPP 2.0.1
OCPP 2.0.1 introduce trei Profiluri de securitate progresive care definesc nivelul de protecție al comunicației dintre stația de încărcare și CSMS. Profilul trebuie ales în timpul implementării și configurează automat mecanismul autentificare și criptare.
| Caracteristică | Profil 1 | Profilul 2 | Profilul 3 |
|---|---|---|---|
| URL WebSocket | ws:// (fără TLS) | wss:// (TLS) | wss:// (TLS) |
| Criptare | Nici unul | TLS 1.2+ | TLS 1.2+ |
| Stația de autentificare | Parolă (autentificare de bază) | Parolă (autentificare de bază) | certificat client X.509 |
| Autentificare CSMS | Nici unul | Certificat de server TLS | Certificat de server TLS |
| Protectie MitM | No | Parțial (numai cu autorizare CSMS) | Complet (TLS reciproc) |
| Managementul certificatelor | Nu este necesar | Numai root CA pe dispozitiv | PKI complet: CA + certificat client |
| Utilizare recomandată | Doar medii de testare | Producție standard | Producție critică, P&C ISO 15118 |
Managementul certificatelor în producție (Profil de securitate 3)
Cu Security Profile 3, managementul ciclului de viață al certificatelor devine operare critică. OCPP 2.0.1 include mesaje dedicate: SemneazăCertificat (stația necesită semnarea unui CSR), Certificat semnat (CSMS instalează certificatul semnat), Șterge certificatul (elimină un certificat învechit), GetInstalledCertificateIds (lista certificatelor instalate). Și unul este esențial PKI robust cu reînnoire automată cel puțin 30 de zile înainte de expirare, monitorizarea continuă a valabilității și Mecanism de revocare CRL/OCSP.
Gestionarea inteligentă a încărcării și a încărcăturii
Lo Încărcare inteligentă (blocul K) și cea mai critică funcționalitate pentru operatori cu instalaţii mari. Permite CSMS să controleze dinamic puterea furnizată de fiecare EVSE în funcție de constrângerile rețelei, tarifele la energie, prioritatea utilizatorului și capacitatea transformatorului.
Ierarhia profilurilor de taxare
OCPP 2.0.1 definește patru tipuri de profiluri de încărcare cu niveluri de stivă (priorități):
| Tip de profil | Domeniul de aplicare | Aplicație | Suprascriere |
|---|---|---|---|
| ChargingStationMaxProfile | Limita maximă absolută a întregii stații | Protectie transformator, contract de furnizare | Nu poate fi anulat |
| ChargingStationExternalConstraints | Limite de la sisteme externe (DSO, agregatoare) | Răspunsul la cerere, echilibrarea rețelei | Doar de la un profil superior |
| TxDefaultProfile | Profil implicit pentru tranzacții | Politici tarifare, programare de bază, solar | Din specificul TxProfile |
| TxProfile | Profil specific pentru o tranzacție | Priorități ale utilizatorilor, preferințe individuale | Secitate maximă |
SetChargingProfile: Peak Shaving și integrare solară
// Strategia: integra produzione solare + peak shaving ore serali
// Scenario: sito con 50kW fotovoltaico, trasformatore 100A, picco 19-21h
// 1. Limite massimo stazione (rispetta contratto di fornitura)
[2, "smart-max-001", "SetChargingProfile", {
"evseId": 0,
"chargingProfile": {
"id": 1,
"stackLevel": 1,
"chargingProfilePurpose": "ChargingStationMaxProfile",
"chargingProfileKind": "Absolute",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 100.0 }
]
}]
}
}]
// 2. Profilo solare + peak shaving per un singolo EVSE
[2, "smart-solar-001", "SetChargingProfile", {
"evseId": 1,
"chargingProfile": {
"id": 100,
"stackLevel": 0,
"chargingProfilePurpose": "TxDefaultProfile",
"chargingProfileKind": "Absolute",
"validFrom": "2026-03-09T00:00:00Z",
"validTo": "2026-03-10T00:00:00Z",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"startSchedule": "2026-03-09T06:00:00Z",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 7200, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 14400, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 43200, "limit": 16.0, "numberPhases": 3 },
{ "startPeriod": 46800, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 54000, "limit": 24.0, "numberPhases": 3 }
]
}]
}
}]
// Orario potenza: 06-08h: 8A (offpeak, bassa produzione solare)
// 08-12h: 32A (piena produzione FV, massima potenza)
// 12-18h: 32A (picco solare, alta produzione)
// 18-19h: 16A (calo solare, riduzione)
// 19-21h: 8A (picco domanda residenziale, min potenza)
// 21-24h: 24A (fine picco, potenza media)
Algoritm de echilibrare dinamică a sarcinii
Un algoritm de gestionare a încărcării distribuie puterea disponibilă între sesiuni activ în timp real, respectând limita și prioritățile transformatorului. Cea mai comună abordare este Cotă echitabilă ponderată cu constrângeri min/max.
interface ChargingSession {
readonly stationId: string;
readonly evseId: number;
readonly transactionId: string;
readonly priority: number; // 0-9 (9 = massima)
readonly minChargingRate: number; // A minimi per caricare
readonly maxChargingRate: number; // A massimi del connettore
readonly currentChargingRate: number;
readonly energyDelivered: number; // Wh totali erogati
readonly targetEnergy?: number; // Wh target (se specificato dall'utente)
readonly isEV3Phase: boolean; // Veicolo trifase
}
interface LoadBalancerConfig {
readonly maxSitePowerAmps: number; // A max del trasformatore
readonly reservedBuildingAmps: number; // A riservati per l'edificio
readonly minSessionAmps: number; // A minimi per sessione (tipico: 6A)
readonly rebalanceIntervalSec: number; // Secondi tra ricalcoli (tipico: 30s)
}
interface Allocation {
readonly stationId: string;
readonly evseId: number;
readonly allocatedAmps: number;
readonly phases: number;
}
function calculateChargingAllocations(
sessions: ReadonlyArray<ChargingSession>,
config: LoadBalancerConfig
): ReadonlyArray<Allocation> {
if (sessions.length === 0) return [];
const availableAmps = config.maxSitePowerAmps
- config.reservedBuildingAmps;
// Step 1: ordina per priorità (desc), poi energia erogata (asc = meno carico prima)
const sorted = [...sessions].sort((a, b) => {
if (b.priority !== a.priority) return b.priority - a.priority;
return a.energyDelivered - b.energyDelivered;
});
// Step 2: garantisci potenza minima a tutti
const minRequired = sorted.length * config.minSessionAmps;
if (minRequired > availableAmps) {
// Caso critico: potenza insufficiente, sospendi sessioni a bassa priorità
return sorted
.slice(0, Math.floor(availableAmps / config.minSessionAmps))
.map((s) => ({
stationId: s.stationId,
evseId: s.evseId,
allocatedAmps: config.minSessionAmps,
phases: s.isEV3Phase ? 3 : 1,
}));
}
// Step 3: distribuzione proporzionale ai pesi di priorità
const totalWeight = sorted.reduce(
(sum, s) => sum + (1 + s.priority), 0
);
const remainingAmps = availableAmps - minRequired;
const allocations = sorted.map((session) => {
const weight = (1 + session.priority) / totalWeight;
const bonus = remainingAmps * weight;
const raw = config.minSessionAmps + bonus;
// Applica vincoli min/max del connettore
const allocatedAmps = Math.max(
config.minSessionAmps,
Math.min(session.maxChargingRate, Math.round(raw * 10) / 10)
);
return {
stationId: session.stationId,
evseId: session.evseId,
allocatedAmps,
phases: session.isEV3Phase ? 3 : 1,
};
});
// Step 4: verifica finale che il totale non superi il limite
const total = allocations.reduce((s, a) => s + a.allocatedAmps, 0);
if (total <= availableAmps) return allocations;
// Riscaling proporzionale
const scale = availableAmps / total;
return allocations.map((a) => ({
...a,
allocatedAmps: Math.max(
config.minSessionAmps,
Math.round(a.allocatedAmps * scale * 10) / 10
),
}));
}
Implementarea CSMS Backend cu Python și PostgreSQL
Biblioteca Python ocpp de MobilityHouse (sursă deschisă, peste 2000 de stele pe GitHub)
și cea mai populară implementare de referință pentru CSMS. Combinăm biblioteca cu
asyncio, websockets e asyncpg pentru PostgreSQL pentru
construiți un backend pregătit pentru producție.
Schema PostgreSQL pentru CSMS
-- Registro stazioni di ricarica
CREATE TABLE charging_stations (
station_id TEXT PRIMARY KEY,
vendor_name TEXT NOT NULL,
model TEXT NOT NULL,
serial_number TEXT,
firmware_version TEXT,
security_profile SMALLINT NOT NULL DEFAULT 1,
last_boot_reason TEXT,
last_seen_at TIMESTAMPTZ,
is_online BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Stato EVSE e connettori
CREATE TABLE evse_status (
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT NOT NULL,
connector_type TEXT, -- cCCS2, cCHAdeMO, cType2, sType3
status TEXT NOT NULL DEFAULT 'Unknown',
error_code TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (station_id, evse_id, connector_id)
);
-- Transazioni di ricarica
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT,
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'Started',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
meter_start_wh NUMERIC(12, 3),
meter_end_wh NUMERIC(12, 3),
energy_wh NUMERIC(12, 3) GENERATED ALWAYS AS (meter_end_wh - meter_start_wh) STORED,
stop_reason TEXT,
total_cost_cents INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Meter values (time series - usare TimescaleDB in produzione)
CREATE TABLE meter_values (
id BIGSERIAL PRIMARY KEY,
transaction_id TEXT REFERENCES transactions(transaction_id),
station_id TEXT NOT NULL,
evse_id SMALLINT NOT NULL,
sampled_at TIMESTAMPTZ NOT NULL,
energy_wh NUMERIC(12, 3),
power_w NUMERIC(10, 2),
current_a NUMERIC(8, 3),
voltage_v NUMERIC(8, 2),
soc_pct SMALLINT -- State of Charge da ISO 15118
);
CREATE INDEX idx_meter_values_station_time
ON meter_values(station_id, sampled_at DESC);
CREATE INDEX idx_transactions_station_id
ON transactions(station_id, started_at DESC);
-- Token di autorizzazione (lista locale cache)
CREATE TABLE authorization_cache (
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
status TEXT NOT NULL, -- Accepted, Invalid, Blocked, Expired
group_id TEXT,
expiry_date TIMESTAMPTZ,
cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id_token, id_token_type)
);
Backend complet Python CSMS
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional, Any
import asyncpg
import websockets
from ocpp.routing import on
from ocpp.v201 import ChargePoint as Cp
from ocpp.v201 import call, call_result
from ocpp.v201.enums import (
Action, RegistrationStatusType, AuthorizationStatusType,
ConnectorStatusType
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
log = logging.getLogger('csms')
# Pool globale connessioni PostgreSQL
_db_pool: Optional[asyncpg.Pool] = None
async def get_db() -> asyncpg.Pool:
global _db_pool
if _db_pool is None:
_db_pool = await asyncpg.create_pool(
dsn='postgresql://csms:password@localhost/csms_db',
min_size=5,
max_size=20,
)
return _db_pool
class ChargePointHandler(Cp):
"""
Handler OCPP 2.0.1 per una singola Charging Station.
Un'istanza per ogni connessione WebSocket attiva.
"""
@on(Action.boot_notification)
async def on_boot_notification(
self, charging_station: dict, reason: str, **kwargs
) -> call_result.BootNotification:
log.info(
f"Boot: {self.id} | "
f"{charging_station['vendor_name']} {charging_station['model']} "
f"| reason={reason}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO charging_stations
(station_id, vendor_name, model, serial_number,
firmware_version, last_boot_reason, last_seen_at, is_online)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), TRUE)
ON CONFLICT (station_id) DO UPDATE SET
vendor_name = EXCLUDED.vendor_name,
model = EXCLUDED.model,
firmware_version = EXCLUDED.firmware_version,
last_boot_reason = EXCLUDED.last_boot_reason,
last_seen_at = NOW(),
is_online = TRUE
""",
self.id,
charging_station['vendor_name'],
charging_station['model'],
charging_station.get('serial_number'),
charging_station.get('firmware_version'),
reason,
)
return call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=300,
status=RegistrationStatusType.accepted,
)
@on(Action.heartbeat)
async def on_heartbeat(self) -> call_result.Heartbeat:
db = await get_db()
await db.execute(
"UPDATE charging_stations SET last_seen_at = NOW() WHERE station_id = $1",
self.id
)
return call_result.Heartbeat(
current_time=datetime.now(timezone.utc).isoformat()
)
@on(Action.status_notification)
async def on_status_notification(
self, timestamp: str, connector_status: str,
evse_id: int, connector_id: int, **kwargs
) -> call_result.StatusNotification:
log.info(
f"Status: {self.id} EVSE[{evse_id}]"
f"Connector[{connector_id}] = {connector_status}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO evse_status
(station_id, evse_id, connector_id, status, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (station_id, evse_id, connector_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = NOW()
""",
self.id, evse_id, connector_id, connector_status
)
return call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(
self, id_token: dict, **kwargs
) -> call_result.Authorize:
token = id_token['id_token']
token_type = id_token['type']
log.info(f"Authorize: {self.id} token={token} type={token_type}")
db = await get_db()
# Controlla prima la cache locale
row = await db.fetchrow(
"""
SELECT status, group_id, expiry_date
FROM authorization_cache
WHERE id_token = $1 AND id_token_type = $2
AND (expiry_date IS NULL OR expiry_date > NOW())
""",
token, token_type
)
status = AuthorizationStatusType.invalid
if row and row['status'] == 'Accepted':
status = AuthorizationStatusType.accepted
return call_result.Authorize(
id_token_info={'status': status}
)
@on(Action.transaction_event)
async def on_transaction_event(
self,
event_type: str,
timestamp: str,
trigger_reason: str,
seq_no: int,
transaction_info: dict,
evse: Optional[dict] = None,
id_token: Optional[dict] = None,
meter_value: Optional[list] = None,
**kwargs
) -> call_result.TransactionEvent:
tx_id = transaction_info['transaction_id']
log.info(
f"TransactionEvent: {self.id} {event_type} "
f"tx={tx_id} trigger={trigger_reason}"
)
db = await get_db()
if event_type == 'Started':
await self._handle_tx_started(
db, tx_id, evse, id_token, timestamp, meter_value
)
elif event_type == 'Updated' and meter_value:
await self._handle_tx_updated(db, tx_id, evse, meter_value)
elif event_type == 'Ended':
await self._handle_tx_ended(
db, tx_id, timestamp, transaction_info, meter_value
)
return call_result.TransactionEvent(
total_cost=0,
charging_priority=0,
id_token_info={'status': AuthorizationStatusType.accepted},
)
async def _handle_tx_started(
self, db, tx_id, evse, id_token, timestamp, meter_value
):
evse_id = evse['id'] if evse else 0
connector_id = evse.get('connector_id') if evse else None
token = id_token['id_token'] if id_token else 'unknown'
token_type = id_token['type'] if id_token else 'Local'
meter_start = self._extract_energy(meter_value)
await db.execute(
"""
INSERT INTO transactions
(transaction_id, station_id, evse_id, connector_id,
id_token, id_token_type, state, started_at, meter_start_wh)
VALUES ($1, $2, $3, $4, $5, $6, 'Started', $7, $8)
ON CONFLICT (transaction_id) DO NOTHING
""",
tx_id, self.id, evse_id, connector_id,
token, token_type, timestamp, meter_start
)
async def _handle_tx_updated(self, db, tx_id, evse, meter_value):
evse_id = evse['id'] if evse else 0
energy = self._extract_energy(meter_value)
power = self._extract_power(meter_value)
if energy is not None:
await db.execute(
"""
INSERT INTO meter_values
(transaction_id, station_id, evse_id, sampled_at, energy_wh, power_w)
VALUES ($1, $2, $3, NOW(), $4, $5)
""",
tx_id, self.id, evse_id, energy, power
)
async def _handle_tx_ended(
self, db, tx_id, timestamp, transaction_info, meter_value
):
meter_end = self._extract_energy(meter_value)
stop_reason = transaction_info.get('stopped_reason')
await db.execute(
"""
UPDATE transactions SET
state = 'Ended',
ended_at = $1,
meter_end_wh = $2,
stop_reason = $3
WHERE transaction_id = $4
""",
timestamp, meter_end, stop_reason, tx_id
)
def _extract_energy(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Energy.Active.Import.Register':
return float(sv['value'])
return None
def _extract_power(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Power.Active.Import':
return float(sv['value'])
return None
# === Comandi CSMS -> Stazione ===
async def send_remote_start(
self, evse_id: int, id_token: str, limit_amps: float = 32.0
) -> str:
"""Avvia una sessione da remoto su EVSE specificato."""
request = call.RequestStartTransaction(
id_token={'id_token': id_token, 'type': 'Central'},
evse_id=evse_id,
charging_profile={
'id': 999,
'stack_level': 0,
'charging_profile_purpose': 'TxProfile',
'charging_profile_kind': 'Relative',
'charging_schedule': [{
'id': 1,
'charging_rate_unit': 'A',
'charging_schedule_period': [
{'start_period': 0, 'limit': limit_amps}
],
}],
},
)
response = await self.call(request)
log.info(f"RemoteStart {self.id} EVSE{evse_id}: {response.status}")
return response.status
async def send_charging_profile(
self, evse_id: int, profile: dict
) -> str:
"""Imposta un profilo di carica per smart charging."""
request = call.SetChargingProfile(
evse_id=evse_id,
charging_profile=profile
)
response = await self.call(request)
log.info(f"ChargingProfile {self.id} EVSE{evse_id}: {response.status}")
return response.status
# Registry globale delle connessioni attive
_connected_stations: dict[str, ChargePointHandler] = {}
async def on_connect(websocket, path: str):
"""Callback per nuove connessioni WebSocket OCPP."""
station_id = path.strip('/').split('/')[-1]
if not station_id:
await websocket.close(1008, 'Missing station ID')
return
log.info(f"Connessione da: {station_id} | path={path}")
cp = ChargePointHandler(station_id, websocket)
_connected_stations[station_id] = cp
try:
await cp.start()
except websockets.exceptions.ConnectionClosed as e:
log.info(f"Disconnesso: {station_id} code={e.code}")
except Exception as e:
log.error(f"Errore: {station_id} - {e}")
finally:
_connected_stations.pop(station_id, None)
db = await get_db()
await db.execute(
"UPDATE charging_stations SET is_online = FALSE WHERE station_id = $1",
station_id
)
async def main():
await get_db() # Inizializza pool DB
log.info("CSMS OCPP 2.0.1 avviato")
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp2.0.1'],
# TLS: aggiungere ssl=ssl_context per Security Profile 2-3
ping_interval=60,
ping_timeout=30,
max_size=1_048_576, # 1MB max message size
)
log.info("In ascolto su ws://0.0.0.0:9000/ocpp/{stationId}")
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())
ISO 15118 și Plug & Charge
ISO 15118 definește comunicarea la nivel înalt între vehicule electric (EV) și stație de încărcare (EVSE) prin Power Line Communication (PLC) pe cablul de încărcare DC (CCS). OCPP 2.0.1 integrează nativ ISO 15118 prin blocul funcțional M, permițând Conectare și încărcare: vehiculul se autentifică automat prin certificat digital X.509, fără RFID sau aplicație mobilă.
Arhitectură PKI V2G (Vehicle-to-Grid).
Sistemul de certificate pentru Plug & Charge se bazează pe o PKI (Public Key Infrastructură) ierarhie specifică pentru mobilitatea electrică:
V2G Root CA (Root of Trust - gestita da OEM o eMSP)
|
+-- V2G Intermediate CA
| |
| +-- EVSE Certificate (installato nella stazione)
| CN = EVSE-IT-MIL-001
|
+-- eMobility Service Provider CA (eMSP)
|
+-- Contract Certificate (installato nel veicolo)
CN = IT.CPO.000001234 (eMAID - e-Mobility Account Identifier)
SubjectAltName = eMAID:IT.CPO.000001234
Flux complet Plug & Charge
EV EVSE CSMS eMSP
| | | |
|-- Plug cavo DC --> | | |
| | | |
|<= ISO 15118-2 TLS =>| (PLC sul cavo SLAC) | |
| | | |
|-- ContractCert --->| | |
| (eMAID, X.509) | | |
| |--- Authorize req ----->| |
| | idToken.type=eMAID | |
| | |--- OCPI check --->|
| | |<-- Contract OK --|
| | | |
| |<-- Authorize.conf ----| |
| | status: Accepted | |
| | | |
|<= Charging Start ==>| | |
| | | |
| EV invia target | | |
|-- EnergyRequest --->| | |
| SoC: 45% | | |
| Target: 80% | | |
| Departure: 18:30 | | |
| |--- TransactionEvent -->| |
| | ISO15118Trigger | |
| | | |
| |<-- SetChargingProfile--| |
|<= Schedule via PLC =>| | |
Starea ISO 15118 în producție (2026)
- ISO 15118-2: Plug & Charge AC/DC - acceptat pe scară largă de încărcătoarele HPC DC (Ionity, Fastned, Tesla Supercharger V3)
- ISO 15118-20: Suport bidirecțional V2G - suport hardware gata, software-ul lansat în 2025-2026
- Cerința AFIR: Toate stațiile noi compatibile cu V2G trebuie să accepte ISO 15118 din 2026
- Conformitate AFIR 2027: Orice încărcător instalat după 01/01/2027 trebuie să fie pregătit pentru încărcare inteligentă
- V2G real în Italia: primii piloți cu Enel X Way și Nissan Leaf la standardul V2H (Vehicle-to-Home)
Arhitectură scalabilă: de la 10 la 100.000 de puncte de încărcare
O întreprindere CSMS trebuie să gestioneze de la câteva zeci la sute de mii de conexiuni WebSockets concurente. Arhitectura evoluează în etape, cu componente suplimentare care ele intră în joc la scări diferite.
Faza 1: Scară mică (10-500 de stații)
+------------------+ WebSocket/OCPP +------------------+
| Charging |------------------------| CSMS Monolitico |
| Stations (10-500)| wss://csms:9000/ocpp | Python/asyncio |
+------------------+ | Port 9000 |
+--------+---------+
|
+-------+-------+
| PostgreSQL |
| Redis (cache) |
+---------------+
Stack: Python asyncio + PostgreSQL + Redis
Deployment: 1 VM (4 vCPU, 8GB RAM), 1 DB managed
Costo: ~$200/mese
Faza 2: scară medie (500-10.000 de stații)
+------------+ +------------------+ +--------------+
| Load | | WS Gateway #1 | | Message |
| Balancer +---->| (asyncio CSMS) +---->| Broker |
| (HAProxy) | | Max 2000 conn | | (RabbitMQ) |
| | +------------------+ | |
| Sticky +---->| WS Gateway #2 +---->| |
| Sessions | | (asyncio CSMS) | +--------------+
| | +------------------+ |
+------------+ +------+------+
| Business |
| Services |
| (FastAPI) |
+------+------+
|
+----------+----------+
| PostgreSQL (HA) |
| TimescaleDB |
| Redis Cluster |
+---------------------+
Sticky sessions: basate su station_id nel path URL
Cross-node ops: Redis pub/sub per inviare comandi alle stazioni
Costo: ~$2.000/mese (K8s managed)
Faza 3: scară largă (10.000-100.000 de stații)
Global Load Balancer (Anycast)
|
+---------------+---------------+
| |
Region EU-WEST Region EU-SOUTH
+------------------+ +------------------+
| WS Gateway Pool | | WS Gateway Pool |
| (50 pods, 2000 | | (30 pods) |
| conn each = 100K)| +--------+---------+
+--------+---------+ |
| |
+-------------+----------------+
|
+-------+--------+
| Apache Kafka |
| (12 partitions)|
| per topic |
+-------+--------+
|
+-----------------+------------------+
| | |
+------+------+ +-------+------+ +--------+------+
| Transaction | | Smart | | Device |
| Service | | Charging Svc | | Mgmt Svc |
| (10 replicas)| | (5 replicas) | | (3 replicas) |
+------+------+ +-------+------+ +--------+------+
| | |
+--------+--------+------------------+
|
+--------+--------+
| PostgreSQL |
| Citus (sharding) |
| Shard key: |
| station_id hash |
+--------+---------+
|
+--------+--------+
| TimescaleDB |
| (meter values) |
+--------+--------+
Kafka Topics:
- ocpp.boot-notification (chiave: station_id)
- ocpp.transaction-events (chiave: transaction_id)
- ocpp.meter-values (chiave: station_id)
- ocpp.status-notifications (chiave: station_id)
- csms.commands (chiave: station_id)
Throughput target: 1M messaggi/ora, latenza P99 < 200ms
Costo: ~$30.000/mese (multi-region Kubernetes)
Managementul conexiunii între noduri cu Redis
Într-o implementare cu mai multe noduri, CSMS trebuie să știe pe ce gateway se află fiecare stație pentru a trimite comenzi (SetChargingProfile, RemoteStart etc.). Redis pub/sub rezolva problema:
import json
import asyncio
import redis.asyncio as aioredis
redis_client = aioredis.from_url(
'redis://redis-cluster:6379',
encoding='utf-8',
decode_responses=True
)
# Registra il nodo della connessione
async def register_connection(station_id: str, gateway_id: str):
await redis_client.setex(
f"csms:gateway:{station_id}",
value=gateway_id,
time=600 # TTL: 10 minuti, rinnovato a ogni heartbeat
)
# Pubblica un comando verso una stazione (qualunque nodo sia)
async def publish_command(station_id: str, action: str, payload: dict):
channel = f"csms:commands:{station_id}"
await redis_client.publish(channel, json.dumps({
'action': action,
'payload': payload
}))
# Su ogni nodo gateway: ascolta i comandi per le stazioni connesse
async def listen_for_commands(connected_stations: dict):
pubsub = redis_client.pubsub()
# Sottoscrivi ai canali delle stazioni connesse a questo nodo
async def subscribe_station(station_id: str):
await pubsub.subscribe(f"csms:commands:{station_id}")
async for message in pubsub.listen():
if message['type'] != 'message':
continue
station_id = message['channel'].split(':')[-1]
cp = connected_stations.get(station_id)
if not cp:
continue # Stazione non su questo nodo, ignora
cmd = json.loads(message['data'])
try:
if cmd['action'] == 'SetChargingProfile':
await cp.send_charging_profile(
cmd['payload']['evse_id'],
cmd['payload']['profile']
)
elif cmd['action'] == 'RemoteStart':
await cp.send_remote_start(
cmd['payload']['evse_id'],
cmd['payload']['id_token'],
)
except Exception as e:
log.error(f"Errore esecuzione comando {cmd['action']}: {e}")
Monitorizare, metrici și tablou de bord Grafana
Un CSMS în producție necesită un sistem cuprinzător de observabilitate. Valorile cheie de monitorizare se referă la sănătatea infrastructurii, calitatea servicii și performanță operațională.
Măsuri operaționale cheie
| Metric | Formula/Sursa | SLA țintă | Pragul de alertă |
|---|---|---|---|
| Disponibilitatea stației | Posturi online / Total posturi x 100 | >= 99% | < 95% |
| Latența mesajului OCPP P99 | CALL time -> CALLRESULT (percentila 95) | < 2s | > 5s |
| Rata de succes a tranzacției | TX finalizat / TX început x 100 | >= 98% | < 95% |
| Energie livrată (kWh/oră) | Sum MeterValues deocamdată | Linia de referință +10% | < inițial -20% |
| Rata de respingere a autenticării | Auth Invalid / Total Auth x 100 | < 2% | > 10% (posibil atac) |
| Reconexiuni WebSocket/oră | Contorați noile conexiuni pe stație | < 2/oră/stație | > 10/oră/stație |
| Conformitate cu încărcarea inteligentă | Puterea reală vs profilul setat | +/- 5% | Abatere > 15% |
| Zile de expirare certificate | Zile până la expirarea certificatelor TLS | > 30 de zile | < 30 de zile (alertă de reînnoire) |
Exportator Prometheus pentru CSMS
from prometheus_client import (
Counter, Gauge, Histogram, start_http_server
)
# Metriche Prometheus
OCPP_MESSAGES_TOTAL = Counter(
'ocpp_messages_total',
'Numero totale messaggi OCPP processati',
['action', 'direction', 'status'] # direction: inbound/outbound
)
OCPP_MESSAGE_DURATION = Histogram(
'ocpp_message_duration_seconds',
'Latenza elaborazione messaggi OCPP',
['action'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
STATIONS_CONNECTED = Gauge(
'csms_stations_connected_total',
'Numero stazioni connesse al CSMS'
)
ACTIVE_TRANSACTIONS = Gauge(
'csms_active_transactions_total',
'Numero transazioni di ricarica attive'
)
ENERGY_DELIVERED_WH = Counter(
'csms_energy_delivered_wh_total',
'Energia totale erogata in Wh',
['station_id']
)
AUTH_RESULTS = Counter(
'csms_authorization_results_total',
'Risultati delle autorizzazioni OCPP',
['status'] # Accepted, Invalid, Blocked, Expired
)
SMART_CHARGING_EVENTS = Counter(
'csms_smart_charging_events_total',
'Operazioni smart charging',
['action', 'result']
)
def start_metrics_server(port: int = 8001):
"""Avvia il server HTTP Prometheus su porta specificata."""
start_http_server(port)
log.info(f"Prometheus metrics su http://0.0.0.0:{port}/metrics")
# Decorator per misurare latenza handler
import time
import functools
def track_ocpp_handler(action: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='success'
).inc()
return result
except Exception as e:
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='error'
).inc()
raise
finally:
OCPP_MESSAGE_DURATION.labels(action=action).observe(
time.monotonic() - start
)
return wrapper
return decorator
// Pannelli principali per dashboard Grafana CSMS
// 1. Stazioni online (Gauge)
{
"title": "Stazioni Connesse",
"type": "gauge",
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 90 },
{ "color": "green", "value": 99 }
]
}
}
},
"targets": [{
"expr": "csms_stations_connected_total / csms_stations_registered_total * 100",
"legendFormat": "Availability %"
}]
}
// 2. Latenza messaggi OCPP P99 (Time Series)
{
"title": "OCPP Message Latency P99",
"type": "timeseries",
"targets": [{
"expr": "histogram_quantile(0.99, rate(ocpp_message_duration_seconds_bucket[5m]))",
"legendFormat": "P99 - {{action}}"
}]
}
// 3. Energia erogata (Stat panel)
{
"title": "Energia Totale Erogata oggi (kWh)",
"type": "stat",
"targets": [{
"expr": "increase(csms_energy_delivered_wh_total[24h]) / 1000",
"legendFormat": "kWh"
}]
}
// 4. Auth rejection rate - alert su attacchi
{
"title": "Rejection Rate Autorizzazioni (%)",
"type": "timeseries",
"targets": [{
"expr": "rate(csms_authorization_results_total{status='Invalid'}[5m]) / rate(csms_authorization_results_total[5m]) * 100"
}]
}
Reglementări italiene și europene: AFIR, PNIRE, PNRR
Infrastructura de încărcare a vehiculelor electrice din Italia și Europa este strict reglementată. Conformitatea cu reglementările nu este opțională: se referă atât la cerințele tehnice ale stații (metrologie legală, accesibilitate, plată) și standarde de comunicare.
AFIR (Regulamentul privind infrastructura pentru combustibili alternativi - UE 2023/1804)
AFIR a intrat în vigoare în aprilie 2024 și definește un calendar precis pentru obligații de încărcare a infrastructurii pe rețelele TEN-T și în zonele urbane:
| Expirare | Cerinţă | Aplicabilitate |
|---|---|---|
| 31 decembrie 2025 | Stație >= 150 kW la fiecare 60 km pe rețeaua centrală TEN-T | Întreaga UE |
| 31 decembrie 2027 | Stație >= 150 kW la fiecare 60 km TEN-T Rețea cuprinzătoare | Întreaga UE |
| 14 aprilie 2025 | Date statice și dinamice gratuite (locație, tip de conector, disponibilitate) | Stații publice |
| 01 ianuarie 2027 | Încărcare inteligentă gata pentru stații noi/renovate > 22 kW | Stații publice |
| Imediat | Plată ad-hoc fără abonament (card bancar fără contact) | Statii > 50 kW publice |
| 2026+ | ISO 15118 pentru stații compatibile V2G | Stații bidirecționale |
PNIRE și PNRR pentru Italia
În Italia, implementarea AFIR are loc prin două instrumente principale:
- PNIRE (Planul național de infrastructură de încărcare electrică): gestionat de MASE (Ministerul Mediului și Securității Energetice), definește obiective naționale: 13.755 de stații publice în 2025, focus pe rețeaua de drumuri și zonele urbane
- Misiunea PNRR 2, Investiții 4.3: peste 700 de milioane de euro alocate pentru Încărcare de mare putere (HPC >= 150 kW) pe autostrăzi și zone serviciul. Conform MEMORY, PNRR a alocat un total de 12,7 miliarde EUR, cu utilizare încă parțială
- Apel MASE 2024-2025: stimulente pentru operatorii privați care instalați infrastructuri în zone cu densitate scăzută de încărcare (Sudul Italiei, zonele rurale). Subvenție de până la 60% din costurile de instalare
Situația italiană: 73.000 de puncte de încărcare în 2025
Începând cu 31 decembrie 2025, contează Italia Peste 73.000 de puncte de încărcare publice (+18% față de 2024), cu o acoperire teritorială națională de 93%. Dintre acestea: aproximativ 12.000 sunt puncte de încărcare rapidă (> 22 kW) și 5.000 sunt HPC (>= 150 kW). Regiunea cu cea mai mare infrastructură este Lombardia (23%), urmată din Lazio (12%) și Toscana (9%). Sudul rămâne o zonă prioritară pentru i Finanțare PNRR, cu scopul de a reduce decalajul până în 2026.
Obligații tehnice pentru software CSMS
- OCPI (Open Charge Point Interface): protocol obligatoriu pentru roaming între CPO (Charge Point Operator) și eMSP (e-Mobility Service Provider). Se recomandă OCPI versiunea 2.2.1
- Metrologie legală: în Germania (Eichrecht) și progresiv în UE (MID - Measurement Instruments Directive), sistemele de măsurare trebuie să fie certificate, iar citirile trebuie să fie transparente și inalterabile de către utilizator
- GDPR: datele sesiunii de încărcare (RFID, locație, ore) sunt date personale. Necesită politică de confidențialitate, minimizarea datelor și dreptul de a fi uitat
- CDR (înregistrări cu detalii de taxare): trebuie să respecte formatul OCPI pentru facturarea interoperabilă și păstrat cel puțin 5 ani (obligații fiscale italiene)
Studiu de caz: Rețea italiană de încărcare cu peste 50 de stații
Să examinăm arhitectura unui sistem CSMS real pentru un operator italian care administrează 50 de stații de încărcare DC (22-150 kW) distribuite în parcări urbane și centre comerciale în 3 regiuni.
Cerințe operaționale
- 50 de stații, 150 de EVSE în total (în medie 3 EVSE per stație), 300 de conectori (CCS2 + Type2)
- Vârf zilnic: 400-600 sesiuni de încărcare (7-9 am, 12-2 pm, 5-9 pm)
- Încărcare inteligentă: respectarea limitei de 200 A per stație, integrare cu SEM (Site Energy Manager)
- Multi-locatari: 3 CPO-uri cu vizibilitate parțială a stației, roaming OCPI cu Enel
- Uptime SLA: 99,5% lunar per stație, 99,9% pentru backend-ul CSMS
Arhitectura selectată
+------------------+ +------------------+ +------------------+
| 50 Stazioni | | HAProxy | | CSMS Primary |
| OCPP 2.0.1 |---->| (WS sticky sess.)|---->| Python asyncio |
| Security Profile 2| | Port 443 (TLS) | | 2 replicas |
+------------------+ +------------------+ +--------+---------+
|
+------------------+ +--------+---------+
| CSMS Worker |<----| Redis Cluster |
| (FastAPI REST) | | (stato sessioni) |
| Dashboard, API | +------------------+
+------------------+
|
+-----------+----------+
| |
+-------+-------+ +---------+------+
| PostgreSQL 16 | | TimescaleDB |
| (transazioni, | | (meter values)|
| auth, device | | 2TB/anno est. |
| model, CDR) | +---------------+
+---------------+
Monitoring: Prometheus + Grafana Cloud
Alerting: PagerDuty (P1: stazione offline >5min, P2: CSMS latency >2s)
CDR/Billing: integrazione ERP via webhook PostgreSQL NOTIFY
Valori operaționale reale (lună tipică)
| Metric | Valoare | Note |
|---|---|---|
| Sesiuni/luna | ~14.000 | In medie 280 de sedinte/zi |
| Energia livrată | ~85.000 kWh/lună | Medie 6 kWh/sesiune |
| Disponibilitatea medie a stației | 99,3% | 0,7% timp de oprire = ~5h/lună per stație |
| Rata de acceptare a autenticării | 96,8% | 3,2% respins = expirat sau neînregistrat |
| Latența OCPP P95 | 180 ms | Include stație LTE dus-întors |
| Evenimente de încărcare inteligentă/zi | ~1.200 | 24 de reechilibrari/ora in medie |
| Eficacitate maximă a bărbieritului | 92% | 92% din timp puterea <= limită setată |
Securitate CSMS: OWASP și modelul de amenințare
CSMS este o infrastructură critică: un compromis poate provoca întreruperi încărcare, manipulare a facturării sau atacuri asupra rețelei electrice prin încărcare. Principalele amenințări sunt identificabile prin un model de amenințare specific.
Modelul de amenințare CSMS
| Ameninţare | Vector | Impact | Atenuare |
|---|---|---|---|
| Stație neautorizată | Conexiune cu acreditările furate | Injectare de date false, consum fraudulos | Profil de securitate 3 (mTLS), fixare certificată |
| Omul-la-mijloc | Interceptarea WS pe rețele nesigure | Interceptarea jetoanelor RFID, manipularea comenzilor | TLS 1.3 obligatoriu, transparență certificată |
| Replay Attack | Retransmiterea mesajelor OCPP capturate | Facturare dublă, permisiuni nevalide | MessageId unic, validare marca temporală, nonce |
| DDoS WebSocket | Flux de conexiuni sau mesaje | CSMS inaccesibil, infrastructură DoS | Limitarea ratei, limitarea conexiunii, WAF |
| Injecție SQL prin sarcina utilă OCPP | Sarcină utilă OCPP cu sarcină utilă SQL în câmpul idToken | Exfiltrarea DB, escaladarea privilegiilor | Declarații pregătite, ORM, validare de intrare |
| Clonarea RFID | Clonarea cardurilor RFID legitime | Sesiuni plătite de alți utilizatori | ISO 15118 P&C, lista albă RFID, detectarea anomaliilor |
| Firmware rău intenționat | Actualizare firmware cu malware | Controlul fizic al stației, manipularea rețelei | Firmware semnătură digitală, boot securizat, SBOM |
Întărirea CSMS: Lista de verificare a securității
import ssl
import re
from functools import wraps
# 1. Configurazione TLS sicura (Security Profile 2-3)
def create_tls_context(
certfile: str,
keyfile: str,
cafile: str,
require_client_cert: bool = False
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
if require_client_cert: # Security Profile 3 (mTLS)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=cafile)
# Disabilita cipher suite deboli
ctx.set_ciphers(
'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:!aNULL:!eNULL:!LOW:!EXPORT'
)
return ctx
# 2. Rate limiting per connessioni WebSocket
from collections import defaultdict
import time
_connection_attempts: dict[str, list[float]] = defaultdict(list)
MAX_CONN_PER_MINUTE = 10
def check_rate_limit(client_ip: str) -> bool:
"""Ritorna True se il client può connettersi, False se throttled."""
now = time.monotonic()
window = _connection_attempts[client_ip]
# Rimuovi tentativi più vecchi di 60 secondi
_connection_attempts[client_ip] = [
t for t in window if now - t < 60
]
if len(_connection_attempts[client_ip]) >= MAX_CONN_PER_MINUTE:
log.warning(f"Rate limit superato per IP: {client_ip}")
return False
_connection_attempts[client_ip].append(now)
return True
# 3. Validazione MessageId per prevenire replay attack
_seen_message_ids: set[str] = set()
_message_id_pattern = re.compile(r'^[a-zA-Z0-9\-_\.]{1,36}






