OCPP 2.x: Budování podnikových nabíjecích systémů EV
Globální trh s infrastrukturou nabíjení elektrických vozidel dohonil 40,22 miliardy dolarů v roce 2025 a do roku 2033 poroste s CAGR o 25 %, podle Grand View Research. V Evropě, AFIR (Alternative Fuels Nařízení o infrastruktuře) ukládá závazné lhůty: každých 60 km v síti TEN-T do konce roku 2025 musí být stanice o výkonu alespoň 150 kW. V Itálii s mimo 73 000 veřejných dobíjecích míst do 31. prosince 2025 a PNRR která vyčlenila více než 700 milionů eur na vysoce výkonné nabíjení, sektor a v plný výbuch.
Srdcem této infrastruktury jeOpen Charge Point Protocol (OCPP), otevřený standard vyvinutý organizací Open Charge Alliance (OCA), který definuje jak nabíjecí stanice (Charging Station) komunikují s centralizovanými řídicími systémy (CSMS - Charging Station Management Systems). Přijato více než 250 organizacemi v ve více než 40 zemích se OCPP stal de facto standardem v tomto odvětví, což zajišťuje interoperabilita mezi hardwarem od různých výrobců a různým softwarem pro správu.
V tomto pokročilém technickém článku prozkoumáme OCPP 2.0.1 a 2.1 v hloubka: architektura WebSocket, struktura zpráv, model zařízení, profily zabezpečení, chytré nabíjení a jak vytvořit škálovatelný backend CSMS připravený na produkci 10 až 100 000 stanic s kompletními příklady kódu v TypeScriptu a Pythonu.
Co se naučíte
- Vývoj protokolu OCPP z 1.2 na 2.1 a základní architektonické rozdíly
- Struktura zprávy JSON (CALL, CALLRESULT, CALLERROR) a přenos WebSocket
- 16 funkčních bloků OCPP 2.0.1 a hierarchický model zařízení
- 3 bezpečnostní profily: Basic Auth, TLS, mTLS s certifikáty X.509
- Pokročilé chytré nabíjení: SetChargingProfile, správa zátěže, špičkové holení, solární integrace
- CSMS backend implementace v Pythonu s asyncio a PostgreSQL
- ISO 15118 a Plug & Charge: PKI, eMAID, obousměrný V2G
- Škálovatelná architektura: od 10 do 100 000 nabíjecích bodů s clusteringem WebSocket a Kafka
- Monitorování, řídicí panel Grafana a klíčové provozní metriky
- Předpisy AFIR, PNIRE a italské pobídky pro infrastrukturu nabíjení
Řada EnergyTech: 10 článků o digitální energii
Tento článek je prvním ze série věnovanéEnergyTech: protokoly, architektury a software, které revolučně mění řízení elektřiny, od nabíjení elektromobilů po chytré sítě, od systémů BESS po energetickou optimalizaci pomocí AI.
| # | Položka | Technologie | Úroveň |
|---|---|---|---|
| 1 | OCPP 2.x Protocol: Building EV Charging Systems (jste zde) | OCPP, WebSocket, Python, ISO 15118 | Moderní |
| 2 | Smart Grid a OpenADR: Reakce na poptávku a energetická flexibilita | OpenADR, IEEE 2030.5, REST, MQTT | Moderní |
| 3 | BESS (Battery Energy Storage): Optimalizační algoritmy a BMS | Python, optimalizace LP, sběrnice CAN, Modbus | Moderní |
| 4 | Digitální dvojče pro elektrické sítě s Kafkou a strojovým učením | Kafka, InfluxDB, Grafana, ML, Python | Moderní |
| 5 | SCADA a ICS pro kritické infrastruktury: Bezpečnost a protokoly | Modbus, DNP3, IEC 61850, OPC UA | Moderní |
| 6 | Optimalizace energie s umělou inteligencí: Předpovídání spotřeby a předpovídání poptávky | TensorFlow, LSTM, Prophet, FastAPI | Moderní |
| 7 | Virtuální elektrárna: Agregátní DER s Pythonem a REST API | DER, DERMS, REST, Python, PostgreSQL | Střední |
| 8 | Energetické trhy a algoritmické obchodování: EPEX SPOT a API | Python, API obchodování, časové řady | Moderní |
| 9 | Carbon Accounting Software: Rozsah 1, 2, 3 a hlášení skleníkových plynů | Python, protokol GHG, API, reporting | Střední |
| 10 | Microgrids a Energy Island: Resilient Architectures | Microgrids, EMS, edge computing, IoT | Moderní |
Vývoj protokolu: Od OCPP 1.2 k 2.1
Pochopení vývoje OCPP je zásadní pro ocenění architektonických možností verze 2.0.1 a plánovat migrace ze starších systémů. Protokol byl na světě v roce 2010 k vyřešení problémuinteroperabilita: každý výrobce stanic mělo svůj vlastní proprietární protokol, což znemožňovalo správu multi-prodejce.
| Verze | Rok | Doprava | Klíčové vlastnosti | Nasazení |
|---|---|---|---|---|
| OCPP 1.2 | 2010 | SOAP/XML | První veřejná verze, základní operace: boot, autorizace, start/stop | Vyřazeno z provozu |
| OCPP 1.5 | 2012 | SOAP/XML | Rezervace, chytrá nabíjecí základna, přenos dat, reset | Dědictví |
| OCPP 1.6 | 2015 | SOAP + JSON/WS | WebSocket, načíst profily, spouštěcí zprávy, seznam místních autorizací | Velmi rozšířený |
| OCPP 2.0 | 2018 | JSON/WS | Model zařízení, funkční bloky, báze ISO 15118 (nahrazeno 2.0.1) | Vzácný |
| OCPP 2.0.1 | 2020 | Pouze JSON/WS | 16 funkčních bloků, Model zařízení, 3 bezpečnostní profily, pokročilé chytré nabíjení | Současný standard |
| OCPP 2.1 | 2025 | Pouze JSON/WS | Zpětně kompatibilní 2.0.1, V2G ISO 15118-20, nativní nabíjení, výměna baterie | Vznikající |
Základní rozdíly mezi OCPP 1.6 a 2.0.1
OCPP 2.0.1 není jednoduchá přírůstková aktualizace: je to a přepisování kompletní architektonický která mění terminologii, strukturu zpráv a koncepční model. Tato nekompatibilita "by design" byla nezbytná pro překonat strukturální limity OCPP 1.6.
| čekám | OCPP 1.6 | OCPP 2.0.1 |
|---|---|---|
| Serverová terminologie | Centrální systém | CSMS (systém správy nabíjecí stanice) |
| Terminologie klienta | Nabíjecí bod | Nabíjecí stanice |
| Nabíjecí jednotka | Konektor | EVSE (zásobovací zařízení elektrického vozidla) |
| Transakce | StartTransaction / StopTransaction | Unified TransactionEvent (zahájená/aktualizovaná/ukončená) |
| Konfigurace | Pevné klíče (ChangeConfiguration) | Hierarchický model zařízení (GetVariables/SetVariables) |
| Bezpečnost | Volitelné, ne standardizované | 3 integrované a povinné bezpečnostní profily |
| Chytré nabíjení | Základna (profily konektorů) | Pokročilé: pro EVSE, priorita zásobníku, složené plány |
| ISO 15118 | Není podporováno | Nativní blok M (Plug & Charge) |
| Specifická organizace | Plochý seznam operací | 16 funkčních bloků s případy použití, požadavky, schématy |
OCPP 2.1: Co je nového v roce 2025
OCPP 2.1, vydaný v lednu 2025 Open Charge Alliance, zůstává plný zpětně kompatibilní s 2.0.1 a přidává důležité funkce pro budoucnost:
- Pokročilé V2G (Vehicle-to-Grid).: Plná podpora pro normu ISO 15118-20 s obousměrným přenosem energie, umožňující z elektromobilů vytvořit virtuální elektrárny
- integrace DER: Pokročilé nástroje pro optimalizaci distribuované energie pomocí zdrojů, jako jsou fotovoltaické panely a akumulační systémy
- Nativní ceny: standardizované datové struktury pro komunikaci sazeb v reálném čase (kWh, čas, poplatky za parkování) bez rozšíření specifických pro dodavatele
- Výměna baterie: podpora stanic pro výměnu baterií pro dvoukolová a tříkolová vozidla
- Obnovit transakci: možnost obnovit transakci po vynuceném restartu bez ztráty dat
- Místní náklady: kalkulace nákladů přímo na stanici pro offline případy
Komunikační architektura WebSocket
Používá výhradně OCPP 2.0.1 JSON přes WebSocket jako protokol transport, zcela opouští SOAP/XML. Tato architektonická volba přináší Trvalá obousměrná komunikace, nízká latence, nízké užitečné zatížení a kompatibilita nativní s moderními webovými infrastrukturami.
Topologie klient-server
V modelu OCPP je Nabíjecí stanice působí jako klienta WebSockets e il CSMS jako server WebSocket. Nabíjecí stanice zahájí připojení a udržuje jej aktivní pomocí mechanismu srdečního tepu. CSMS může posílat příkazy stanici na stejném spojení Otevřete WebSocket, není potřeba zpětné připojení (žádné dotazování, žádné tlačení oddělené).
Charging Station CSMS
| |
|--- WebSocket CONNECT ---------------->|
| wss://csms.example.com/ocpp/CS001 |
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| Authorization: Basic base64(...) |
| |
|<-- HTTP 101 Switching Protocols -------|
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| |
|--- BootNotification.req ------------->|
|<-- BootNotification.conf --------------|
| (interval: 300, status: Accepted) |
| |
|--- StatusNotification.req[EVSE1] ---->|
|<-- StatusNotification.conf ------------|
| |
|--- Heartbeat.req (ogni 300s) -------->|
|<-- Heartbeat.conf --------------------|
| |
| <-- utente avvicina RFID --- |
|--- Authorize.req -------------------->|
|<-- Authorize.conf (Accepted) ----------|
| |
|--- TransactionEvent(Started) -------->|
|<-- TransactionEvent.conf -------------|
| |
|<-- SetChargingProfile.req ------------| (CSMS gestisce load)
|--- SetChargingProfile.conf ---------->|
| |
|--- MeterValues (ogni 60s) ----------->|
|<-- MeterValues.conf ------------------|
| |
|--- TransactionEvent(Ended) ---------->|
|<-- TransactionEvent.conf -------------|
Adresa URL připojení a podprotokol
Nabíjecí stanice se připojí k CSMS pomocí adresy URL, která obsahuje její vlastní
jedinečný identifikátor jako poslední úsek cesty.
Podprotokol WebSocket ocpp2.0.1 se vyjednává během podání ruky
HTTP pro zajištění kompatibility verzí protokolu.
# Formato URL
wss://csms.example.com/ocpp/{chargingStationId}
# Esempi reali
wss://csms.example.com/ocpp/IT-MIL-STATION-001
wss://csms.example.com/ocpp/EVSE-PARK-NORD-042
wss://csms.example.com/ocpp/CPO-AUTOGRILL-A7-01
# Headers WebSocket obbligatori
Sec-WebSocket-Protocol: ocpp2.0.1
Authorization: Basic {base64(stationId:password)} # Security Profile 1-2
# Con Security Profile 3 (mTLS): nessun header Authorization,
# l'autenticazione avviene tramite certificato client TLS
Struktura zpráv OCPP 2.0.1
OCPP 2.0.1 definuje tři typy zpráv JSON, všechny přenášené jako rámce Text WebSocket. Každá zpráva je a Pole JSON s formátem přesné na základě typu zprávy. Tato jednoduchá struktura usnadňuje analýzu a ladění versus režie SOAP/XML.
CALL (požadavek) - MessageTypeId 2
Zpráva VOLÁNÍ představuje žádost zaslanou jednou stranou (Nabíjecí stanice nebo CSMS) na jinou. Obsahuje jedinečné ID, název akce a užitečné zatížení požadavku.
// Formato: [MessageTypeId, MessageId, Action, Payload]
// Esempio: BootNotification dalla Charging Station
[2, "19223201", "BootNotification", {
"chargingStation": {
"model": "SuperCharger-500",
"vendorName": "EVPower Inc.",
"serialNumber": "SN-2025-00142",
"firmwareVersion": "3.2.1",
"modem": {
"iccid": "8939100000000000001",
"imsi": "310260000000001"
}
},
"reason": "PowerUp"
}]
// Esempio: TransactionEvent dalla Charging Station
[2, "tx-evt-001", "TransactionEvent", {
"eventType": "Started",
"timestamp": "2026-03-09T10:30:00Z",
"triggerReason": "CablePluggedIn",
"seqNo": 0,
"transactionInfo": {
"transactionId": "TXN-2026-0309-001",
"chargingState": "EVConnected"
},
"evse": { "id": 1, "connectorId": 1 },
"idToken": {
"idToken": "RFID-04A2B3C4D5",
"type": "ISO14443"
}
}]
CALLRESULT (odpověď) - MessageTypeId 3
Zpráva VÝSLEDEK VOLÁNÍ a kladná odpověď na CALL. The Aby to bylo možné, musí MessageId přesně odpovídat původnímu CALL korelace žádost-odpověď.
// Formato: [MessageTypeId, MessageId, Payload]
// Risposta a BootNotification
[3, "19223201", {
"currentTime": "2026-03-09T10:00:00Z",
"interval": 300,
"status": "Accepted"
}]
// Risposta a TransactionEvent (Started)
[3, "tx-evt-001", {
"totalCost": 0,
"chargingPriority": 0,
"idTokenInfo": {
"status": "Accepted",
"groupIdToken": {
"idToken": "GROUP-FLEET-01",
"type": "Central"
}
}
}]
CALLERROR (Chyba) - MessageTypeId 4
Zpráva VOLAJÍCÍ je odeslána, když příjemce ne může zpracovat CALL. Zahrnuje standardizovaný chybový kód, popis čitelné a strukturované detaily.
// Formato: [MessageTypeId, MessageId, ErrorCode, ErrorDescription, ErrorDetails]
[4, "19223201", "FormatViolation",
"Il campo 'vendorName' supera la lunghezza massima di 50 caratteri",
{
"field": "chargingStation.vendorName",
"maxLength": 50,
"actualLength": 67
}
]
// Codici di errore OCPP 2.0.1 standardizzati:
// FormatViolation - messaggio JSON malformato
// GenericError - errore generico non classificabile
// InternalError - errore interno del ricevente
// MessageTypeNotSupported - tipo di messaggio non supportato
// NotImplemented - azione riconosciuta ma non implementata
// NotSupported - azione non supportata dall'implementazione
// OccurrenceConstraintViolation - violazione cardinalita elementi
// PropertyConstraintViolation - vincolo su una proprietà violato
// ProtocolError - violazione del protocollo OCPP
// RpcFrameworkError - errore nel framework RPC di base
// SecurityError - errore di sicurezza o autenticazione
// TypeConstraintViolation - tipo di dato non corretto
Korelace požadavku a odpovědi: kritická pravidla
Každý CALL musí mít a Jedinečné MessageId (maximálně 36 znaků alfanumerický), který nebyl dříve použit na stejném připojení stejný odesílatel. Musí jej použít CALLRESULT nebo CALLERROR stejné MessageId. Odesílatel musí udržovat časový limit (doporučeno: 30 sekund), po jehož uplynutí žádost je považována za neúspěšnou. V jednu chvíli může být čekající pouze jeden CALL každý směr komunikace: stanice nemůže odeslat druhé CALL, dokud na první nedostal žádnou odpověď.
16 funkčních bloků OCPP 2.0.1
OCPP 2.0.1 organizuje všechny funkce do 16 funkčních bloků (A až P), z nichž každá obsahuje konkrétní případy použití s podrobnými požadavky, předběžné podmínky a sekvenční diagramy. Tato modulární organizace umožňuje implementátoři, aby deklarovali, které bloky podporují, a testery k ověření soulad blok po bloku.
| Blok | Jméno | Top zprávy | Povinný |
|---|---|---|---|
| A | Zabezpečení | SecurityEventNotification, SignCertificate, CertificateSigned | Si |
| B | Poskytování rezerv | BootNotification, SetVariables, GetVariables, NotifyReport | Si |
| C | Povolení | Autorizovat, ClearCache, GetLocalListVersion | Si |
| D | Místní seznam oprávnění | SendLocalList, GetLocalListVersion | No |
| E | Transakce | TransactionEvent, GetTransactionStatus, MeterValues | Si |
| F | Dálkové ovládání | RequestStartTransaction, RequestStopTransaction, UnlockConnector | No |
| G | Dostupnost | StatusNotification, ChangeAvailability, Heartbeat | Si |
| H | Rezervace | Rezervujte nyní, zrušte rezervaci | No |
| I | Tarif a náklady | CostUpdated, ShowMessage | No |
| J | Měření | MeterValues (energie / výkon / měření proudu) | Si |
| K | Chytré nabíjení | SetChargingProfile, ClearChargingProfile, GetChargingProfiles, ReportChargingProfiles | No |
| L | Správa firmwaru | UpdateFirmware, FirmwareStatusNotification | No |
| M | Certifikát ISO 15118 Mgmt | Get15118EVCertificate, DeleteCertificate, CertificateSigned | No |
| N | Diagnostika | GetLog, LogStatusNotification, SetMonitoringBase, SetVariableMonitoring | No |
| O | Zobrazit zprávu | SetDisplayMessage, GetDisplayMessages, ClearDisplayMessage | No |
| P | Přenos dat | DataTransfer (rozšíření specifická pro dodavatele) | No |
Model zařízení: Srdce OCPP 2.0.1
Il Model zařízení a hlavní architektonická inovace OCPP 2.0.1. Nahrazuje systém pevného konfiguračního klíče OCPP 1.6 (ChangeConfiguration s klíči) s a flexibilní hierarchický model na základě Komponenty a proměnné. Každá stanice kompletně popisuje její strukturu a konfiguraci bez ohledu na dodavatele.
ChargingStation (radice)
|
+-- Controller (computer della stazione)
| +-- Variables: Vendor, Model, FirmwareVersion, SerialNumber
|
+-- EVSE[1] (punto di ricarica 1)
| +-- Variables: AvailabilityState, Power, SupplyPhases
| +-- Connector[1] (connettore CCS2 / DC)
| | +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
| +-- Connector[2] (connettore CHAdeMO)
| +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
|
+-- EVSE[2] (punto di ricarica 2)
| +-- Connector[1] (connettore Type2 AC / 22kW)
| +-- Variables: ConnectorType, Phases, MaxCurrent
|
+-- PowerMeter (contatore principale)
| +-- Variables: Energy.Active.Import.Register, Power.Active.Import
|
+-- NetworkInterface (ETH0/LTE)
| +-- Variables: Type, SSID, SignalStrength, ActiveNetworkProfile
|
+-- SecurityCtrlr
| +-- Variables: SecurityProfile, CertificateEntries
|
+-- SmartChargingCtrlr
+-- Variables: ChargingProfileMaxStackLevel, ChargeProfileKindsSupported
// CALL dal CSMS: legge stato EVSE e tipo connettore
[2, "get-var-001", "GetVariables", {
"getVariableData": [
{
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeType": "Actual"
},
{
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeType": "Actual"
},
{
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeType": "Actual"
}
]
}]
// CALLRESULT dalla Charging Station
[3, "get-var-001", {
"getVariableResult": [
{
"attributeStatus": "Accepted",
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeValue": "Available"
},
{
"attributeStatus": "Accepted",
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeValue": "cCCS2"
},
{
"attributeStatus": "Accepted",
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeValue": "5"
}
]
}]
Bezpečnostní profily OCPP 2.0.1
OCPP 2.0.1 zavádí tři Progresivní bezpečnostní profily které definují úroveň ochrany komunikace mezi Nabíjecí stanicí a CSMS. Profil musí být zvolen během nasazení a automaticky konfiguruje mechanismus ověřování a šifrování.
| Charakteristický | Profil 1 | Profil 2 | Profil 3 |
|---|---|---|---|
| WebSocket URL | ws:// (bez TLS) | wss:// (TLS) | wss:// (TLS) |
| Šifrování | Žádný | TLS 1.2+ | TLS 1.2+ |
| Auth Station | Heslo (základní ověření) | Heslo (základní ověření) | klientský certifikát X.509 |
| Ověření CSMS | Žádný | Certifikát serveru TLS | Certifikát serveru TLS |
| MitM ochrana | No | Částečné (pouze ověření CSMS) | Úplné (vzájemné TLS) |
| Správa certifikátů | Není nutné | Pouze root CA na zařízení | Úplné PKI: CA + klientský certifikát |
| Doporučené použití | Pouze testovací prostředí | Standardní výroba | Kritická výroba, P&C ISO 15118 |
Správa certifikátů ve výrobě (bezpečnostní profil 3)
S bezpečnostním profilem 3 se stává správa životního cyklu certifikátu kritická operace. OCPP 2.0.1 obsahuje vyhrazené zprávy: SignCertificate (stanice vyžaduje podpis CSR), CertifikátPodepsán (CSMS nainstaluje podepsaný certifikát), Smazat certifikát (odstraní zastaralý certifikát), GetInstalledCertificateIds (seznam nainstalovaných certifikátů). A jeden je zásadní Robustní PKI s automatickou obnovou minimálně 30 dní před vypršením platnosti, průběžné sledování platnosti a Mechanismus odvolání CRL/OCSP.
Smart Charging and Load Management
Lo Chytré nabíjení (blok K) a nejdůležitější funkce pro provozovatelé s velkými instalacemi. Umožňuje CSMS dynamicky ovládat výkon dodávaný každým EVSE na základě omezení sítě, energetických tarifů, priorita uživatele a kapacita transformátoru.
Hierarchie profilů poplatků
OCPP 2.0.1 definuje čtyři typy nabíjecích profilů s úrovněmi zásobníku (prioritami):
| Typ profilu | Rozsah | Aplikace | Přepsat |
|---|---|---|---|
| ChargingStationMaxProfile | Absolutní maximální limit celé stanice | Ochrana transformátoru, smlouva o dodávce | Nelze přepsat |
| ChargingStationExternalConstraints | Limity z externích systémů (DSO, agregátory) | Reakce na poptávku, vyvažování sítě | Pouze z vyššího profilu |
| TxDefaultProfile | Výchozí profil pro transakce | Tarifní politika, základní plánování, solární | Z konkrétního TxProfile |
| TxProfile | Specifický profil pro transakci | Priority uživatele, individuální preference | Maximální spolehlivost |
SetChargingProfile: Špičkové holení a solární integrace
// Strategia: integra produzione solare + peak shaving ore serali
// Scenario: sito con 50kW fotovoltaico, trasformatore 100A, picco 19-21h
// 1. Limite massimo stazione (rispetta contratto di fornitura)
[2, "smart-max-001", "SetChargingProfile", {
"evseId": 0,
"chargingProfile": {
"id": 1,
"stackLevel": 1,
"chargingProfilePurpose": "ChargingStationMaxProfile",
"chargingProfileKind": "Absolute",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 100.0 }
]
}]
}
}]
// 2. Profilo solare + peak shaving per un singolo EVSE
[2, "smart-solar-001", "SetChargingProfile", {
"evseId": 1,
"chargingProfile": {
"id": 100,
"stackLevel": 0,
"chargingProfilePurpose": "TxDefaultProfile",
"chargingProfileKind": "Absolute",
"validFrom": "2026-03-09T00:00:00Z",
"validTo": "2026-03-10T00:00:00Z",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"startSchedule": "2026-03-09T06:00:00Z",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 7200, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 14400, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 43200, "limit": 16.0, "numberPhases": 3 },
{ "startPeriod": 46800, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 54000, "limit": 24.0, "numberPhases": 3 }
]
}]
}
}]
// Orario potenza: 06-08h: 8A (offpeak, bassa produzione solare)
// 08-12h: 32A (piena produzione FV, massima potenza)
// 12-18h: 32A (picco solare, alta produzione)
// 18-19h: 16A (calo solare, riduzione)
// 19-21h: 8A (picco domanda residenziale, min potenza)
// 21-24h: 24A (fine picco, potenza media)
Algoritmus dynamického vyvažování zátěže
Algoritmus správy zátěže rozděluje dostupný výkon mezi relace aktivní v reálném čase, respektující limit a priority transformátoru. Nejběžnějším přístupem je Vážený spravedlivý podíl s omezením min/max.
interface ChargingSession {
readonly stationId: string;
readonly evseId: number;
readonly transactionId: string;
readonly priority: number; // 0-9 (9 = massima)
readonly minChargingRate: number; // A minimi per caricare
readonly maxChargingRate: number; // A massimi del connettore
readonly currentChargingRate: number;
readonly energyDelivered: number; // Wh totali erogati
readonly targetEnergy?: number; // Wh target (se specificato dall'utente)
readonly isEV3Phase: boolean; // Veicolo trifase
}
interface LoadBalancerConfig {
readonly maxSitePowerAmps: number; // A max del trasformatore
readonly reservedBuildingAmps: number; // A riservati per l'edificio
readonly minSessionAmps: number; // A minimi per sessione (tipico: 6A)
readonly rebalanceIntervalSec: number; // Secondi tra ricalcoli (tipico: 30s)
}
interface Allocation {
readonly stationId: string;
readonly evseId: number;
readonly allocatedAmps: number;
readonly phases: number;
}
function calculateChargingAllocations(
sessions: ReadonlyArray<ChargingSession>,
config: LoadBalancerConfig
): ReadonlyArray<Allocation> {
if (sessions.length === 0) return [];
const availableAmps = config.maxSitePowerAmps
- config.reservedBuildingAmps;
// Step 1: ordina per priorità (desc), poi energia erogata (asc = meno carico prima)
const sorted = [...sessions].sort((a, b) => {
if (b.priority !== a.priority) return b.priority - a.priority;
return a.energyDelivered - b.energyDelivered;
});
// Step 2: garantisci potenza minima a tutti
const minRequired = sorted.length * config.minSessionAmps;
if (minRequired > availableAmps) {
// Caso critico: potenza insufficiente, sospendi sessioni a bassa priorità
return sorted
.slice(0, Math.floor(availableAmps / config.minSessionAmps))
.map((s) => ({
stationId: s.stationId,
evseId: s.evseId,
allocatedAmps: config.minSessionAmps,
phases: s.isEV3Phase ? 3 : 1,
}));
}
// Step 3: distribuzione proporzionale ai pesi di priorità
const totalWeight = sorted.reduce(
(sum, s) => sum + (1 + s.priority), 0
);
const remainingAmps = availableAmps - minRequired;
const allocations = sorted.map((session) => {
const weight = (1 + session.priority) / totalWeight;
const bonus = remainingAmps * weight;
const raw = config.minSessionAmps + bonus;
// Applica vincoli min/max del connettore
const allocatedAmps = Math.max(
config.minSessionAmps,
Math.min(session.maxChargingRate, Math.round(raw * 10) / 10)
);
return {
stationId: session.stationId,
evseId: session.evseId,
allocatedAmps,
phases: session.isEV3Phase ? 3 : 1,
};
});
// Step 4: verifica finale che il totale non superi il limite
const total = allocations.reduce((s, a) => s + a.allocatedAmps, 0);
if (total <= availableAmps) return allocations;
// Riscaling proporzionale
const scale = availableAmps / total;
return allocations.map((a) => ({
...a,
allocatedAmps: Math.max(
config.minSessionAmps,
Math.round(a.allocatedAmps * scale * 10) / 10
),
}));
}
Implementace CSMS Backend s Pythonem a PostgreSQL
Knihovna Python ocpp od MobilityHouse (open source, 2000+ hvězdiček na GitHubu)
a nejoblíbenější referenční implementace pro CSMS. Spojujeme knihovnu s
asyncio, websockets e asyncpg pro PostgreSQL pro
vybudovat backend připravený k produkci.
PostgreSQL schéma pro CSMS
-- Registro stazioni di ricarica
CREATE TABLE charging_stations (
station_id TEXT PRIMARY KEY,
vendor_name TEXT NOT NULL,
model TEXT NOT NULL,
serial_number TEXT,
firmware_version TEXT,
security_profile SMALLINT NOT NULL DEFAULT 1,
last_boot_reason TEXT,
last_seen_at TIMESTAMPTZ,
is_online BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Stato EVSE e connettori
CREATE TABLE evse_status (
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT NOT NULL,
connector_type TEXT, -- cCCS2, cCHAdeMO, cType2, sType3
status TEXT NOT NULL DEFAULT 'Unknown',
error_code TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (station_id, evse_id, connector_id)
);
-- Transazioni di ricarica
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT,
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'Started',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
meter_start_wh NUMERIC(12, 3),
meter_end_wh NUMERIC(12, 3),
energy_wh NUMERIC(12, 3) GENERATED ALWAYS AS (meter_end_wh - meter_start_wh) STORED,
stop_reason TEXT,
total_cost_cents INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Meter values (time series - usare TimescaleDB in produzione)
CREATE TABLE meter_values (
id BIGSERIAL PRIMARY KEY,
transaction_id TEXT REFERENCES transactions(transaction_id),
station_id TEXT NOT NULL,
evse_id SMALLINT NOT NULL,
sampled_at TIMESTAMPTZ NOT NULL,
energy_wh NUMERIC(12, 3),
power_w NUMERIC(10, 2),
current_a NUMERIC(8, 3),
voltage_v NUMERIC(8, 2),
soc_pct SMALLINT -- State of Charge da ISO 15118
);
CREATE INDEX idx_meter_values_station_time
ON meter_values(station_id, sampled_at DESC);
CREATE INDEX idx_transactions_station_id
ON transactions(station_id, started_at DESC);
-- Token di autorizzazione (lista locale cache)
CREATE TABLE authorization_cache (
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
status TEXT NOT NULL, -- Accepted, Invalid, Blocked, Expired
group_id TEXT,
expiry_date TIMESTAMPTZ,
cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id_token, id_token_type)
);
CSMS Full Python Backend
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional, Any
import asyncpg
import websockets
from ocpp.routing import on
from ocpp.v201 import ChargePoint as Cp
from ocpp.v201 import call, call_result
from ocpp.v201.enums import (
Action, RegistrationStatusType, AuthorizationStatusType,
ConnectorStatusType
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
log = logging.getLogger('csms')
# Pool globale connessioni PostgreSQL
_db_pool: Optional[asyncpg.Pool] = None
async def get_db() -> asyncpg.Pool:
global _db_pool
if _db_pool is None:
_db_pool = await asyncpg.create_pool(
dsn='postgresql://csms:password@localhost/csms_db',
min_size=5,
max_size=20,
)
return _db_pool
class ChargePointHandler(Cp):
"""
Handler OCPP 2.0.1 per una singola Charging Station.
Un'istanza per ogni connessione WebSocket attiva.
"""
@on(Action.boot_notification)
async def on_boot_notification(
self, charging_station: dict, reason: str, **kwargs
) -> call_result.BootNotification:
log.info(
f"Boot: {self.id} | "
f"{charging_station['vendor_name']} {charging_station['model']} "
f"| reason={reason}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO charging_stations
(station_id, vendor_name, model, serial_number,
firmware_version, last_boot_reason, last_seen_at, is_online)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), TRUE)
ON CONFLICT (station_id) DO UPDATE SET
vendor_name = EXCLUDED.vendor_name,
model = EXCLUDED.model,
firmware_version = EXCLUDED.firmware_version,
last_boot_reason = EXCLUDED.last_boot_reason,
last_seen_at = NOW(),
is_online = TRUE
""",
self.id,
charging_station['vendor_name'],
charging_station['model'],
charging_station.get('serial_number'),
charging_station.get('firmware_version'),
reason,
)
return call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=300,
status=RegistrationStatusType.accepted,
)
@on(Action.heartbeat)
async def on_heartbeat(self) -> call_result.Heartbeat:
db = await get_db()
await db.execute(
"UPDATE charging_stations SET last_seen_at = NOW() WHERE station_id = $1",
self.id
)
return call_result.Heartbeat(
current_time=datetime.now(timezone.utc).isoformat()
)
@on(Action.status_notification)
async def on_status_notification(
self, timestamp: str, connector_status: str,
evse_id: int, connector_id: int, **kwargs
) -> call_result.StatusNotification:
log.info(
f"Status: {self.id} EVSE[{evse_id}]"
f"Connector[{connector_id}] = {connector_status}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO evse_status
(station_id, evse_id, connector_id, status, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (station_id, evse_id, connector_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = NOW()
""",
self.id, evse_id, connector_id, connector_status
)
return call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(
self, id_token: dict, **kwargs
) -> call_result.Authorize:
token = id_token['id_token']
token_type = id_token['type']
log.info(f"Authorize: {self.id} token={token} type={token_type}")
db = await get_db()
# Controlla prima la cache locale
row = await db.fetchrow(
"""
SELECT status, group_id, expiry_date
FROM authorization_cache
WHERE id_token = $1 AND id_token_type = $2
AND (expiry_date IS NULL OR expiry_date > NOW())
""",
token, token_type
)
status = AuthorizationStatusType.invalid
if row and row['status'] == 'Accepted':
status = AuthorizationStatusType.accepted
return call_result.Authorize(
id_token_info={'status': status}
)
@on(Action.transaction_event)
async def on_transaction_event(
self,
event_type: str,
timestamp: str,
trigger_reason: str,
seq_no: int,
transaction_info: dict,
evse: Optional[dict] = None,
id_token: Optional[dict] = None,
meter_value: Optional[list] = None,
**kwargs
) -> call_result.TransactionEvent:
tx_id = transaction_info['transaction_id']
log.info(
f"TransactionEvent: {self.id} {event_type} "
f"tx={tx_id} trigger={trigger_reason}"
)
db = await get_db()
if event_type == 'Started':
await self._handle_tx_started(
db, tx_id, evse, id_token, timestamp, meter_value
)
elif event_type == 'Updated' and meter_value:
await self._handle_tx_updated(db, tx_id, evse, meter_value)
elif event_type == 'Ended':
await self._handle_tx_ended(
db, tx_id, timestamp, transaction_info, meter_value
)
return call_result.TransactionEvent(
total_cost=0,
charging_priority=0,
id_token_info={'status': AuthorizationStatusType.accepted},
)
async def _handle_tx_started(
self, db, tx_id, evse, id_token, timestamp, meter_value
):
evse_id = evse['id'] if evse else 0
connector_id = evse.get('connector_id') if evse else None
token = id_token['id_token'] if id_token else 'unknown'
token_type = id_token['type'] if id_token else 'Local'
meter_start = self._extract_energy(meter_value)
await db.execute(
"""
INSERT INTO transactions
(transaction_id, station_id, evse_id, connector_id,
id_token, id_token_type, state, started_at, meter_start_wh)
VALUES ($1, $2, $3, $4, $5, $6, 'Started', $7, $8)
ON CONFLICT (transaction_id) DO NOTHING
""",
tx_id, self.id, evse_id, connector_id,
token, token_type, timestamp, meter_start
)
async def _handle_tx_updated(self, db, tx_id, evse, meter_value):
evse_id = evse['id'] if evse else 0
energy = self._extract_energy(meter_value)
power = self._extract_power(meter_value)
if energy is not None:
await db.execute(
"""
INSERT INTO meter_values
(transaction_id, station_id, evse_id, sampled_at, energy_wh, power_w)
VALUES ($1, $2, $3, NOW(), $4, $5)
""",
tx_id, self.id, evse_id, energy, power
)
async def _handle_tx_ended(
self, db, tx_id, timestamp, transaction_info, meter_value
):
meter_end = self._extract_energy(meter_value)
stop_reason = transaction_info.get('stopped_reason')
await db.execute(
"""
UPDATE transactions SET
state = 'Ended',
ended_at = $1,
meter_end_wh = $2,
stop_reason = $3
WHERE transaction_id = $4
""",
timestamp, meter_end, stop_reason, tx_id
)
def _extract_energy(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Energy.Active.Import.Register':
return float(sv['value'])
return None
def _extract_power(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Power.Active.Import':
return float(sv['value'])
return None
# === Comandi CSMS -> Stazione ===
async def send_remote_start(
self, evse_id: int, id_token: str, limit_amps: float = 32.0
) -> str:
"""Avvia una sessione da remoto su EVSE specificato."""
request = call.RequestStartTransaction(
id_token={'id_token': id_token, 'type': 'Central'},
evse_id=evse_id,
charging_profile={
'id': 999,
'stack_level': 0,
'charging_profile_purpose': 'TxProfile',
'charging_profile_kind': 'Relative',
'charging_schedule': [{
'id': 1,
'charging_rate_unit': 'A',
'charging_schedule_period': [
{'start_period': 0, 'limit': limit_amps}
],
}],
},
)
response = await self.call(request)
log.info(f"RemoteStart {self.id} EVSE{evse_id}: {response.status}")
return response.status
async def send_charging_profile(
self, evse_id: int, profile: dict
) -> str:
"""Imposta un profilo di carica per smart charging."""
request = call.SetChargingProfile(
evse_id=evse_id,
charging_profile=profile
)
response = await self.call(request)
log.info(f"ChargingProfile {self.id} EVSE{evse_id}: {response.status}")
return response.status
# Registry globale delle connessioni attive
_connected_stations: dict[str, ChargePointHandler] = {}
async def on_connect(websocket, path: str):
"""Callback per nuove connessioni WebSocket OCPP."""
station_id = path.strip('/').split('/')[-1]
if not station_id:
await websocket.close(1008, 'Missing station ID')
return
log.info(f"Connessione da: {station_id} | path={path}")
cp = ChargePointHandler(station_id, websocket)
_connected_stations[station_id] = cp
try:
await cp.start()
except websockets.exceptions.ConnectionClosed as e:
log.info(f"Disconnesso: {station_id} code={e.code}")
except Exception as e:
log.error(f"Errore: {station_id} - {e}")
finally:
_connected_stations.pop(station_id, None)
db = await get_db()
await db.execute(
"UPDATE charging_stations SET is_online = FALSE WHERE station_id = $1",
station_id
)
async def main():
await get_db() # Inizializza pool DB
log.info("CSMS OCPP 2.0.1 avviato")
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp2.0.1'],
# TLS: aggiungere ssl=ssl_context per Security Profile 2-3
ping_interval=60,
ping_timeout=30,
max_size=1_048_576, # 1MB max message size
)
log.info("In ascolto su ws://0.0.0.0:9000/ocpp/{stationId}")
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())
ISO 15118 a Plug & Charge
ISO 15118 definuje komunikaci na vysoké úrovni mezi vozidly elektrické (EV) a nabíjecí stanice (EVSE) prostřednictvím Power Line Communication (PLC) na DC nabíjecím kabelu (CCS). OCPP 2.0.1 nativně integruje ISO 15118 prostřednictvím funkčního bloku M, umožňujícího Plug & Charge: vozidlo se automaticky ověřuje pomocí digitálního certifikátu X.509, bez RFID nebo mobilní aplikace.
Architektura PKI V2G (Vehicle-to-Grid).
Certifikační systém pro Plug & Charge je založen na PKI (Public Key Infrastruktura) specifická hierarchie pro elektrickou mobilitu:
V2G Root CA (Root of Trust - gestita da OEM o eMSP)
|
+-- V2G Intermediate CA
| |
| +-- EVSE Certificate (installato nella stazione)
| CN = EVSE-IT-MIL-001
|
+-- eMobility Service Provider CA (eMSP)
|
+-- Contract Certificate (installato nel veicolo)
CN = IT.CPO.000001234 (eMAID - e-Mobility Account Identifier)
SubjectAltName = eMAID:IT.CPO.000001234
Kompletní tok Plug & Charge
EV EVSE CSMS eMSP
| | | |
|-- Plug cavo DC --> | | |
| | | |
|<= ISO 15118-2 TLS =>| (PLC sul cavo SLAC) | |
| | | |
|-- ContractCert --->| | |
| (eMAID, X.509) | | |
| |--- Authorize req ----->| |
| | idToken.type=eMAID | |
| | |--- OCPI check --->|
| | |<-- Contract OK --|
| | | |
| |<-- Authorize.conf ----| |
| | status: Accepted | |
| | | |
|<= Charging Start ==>| | |
| | | |
| EV invia target | | |
|-- EnergyRequest --->| | |
| SoC: 45% | | |
| Target: 80% | | |
| Departure: 18:30 | | |
| |--- TransactionEvent -->| |
| | ISO15118Trigger | |
| | | |
| |<-- SetChargingProfile--| |
|<= Schedule via PLC =>| | |
Stav podle ISO 15118 ve výrobě (2026)
- ISO 15118-2: Plug & Charge AC/DC – široce podporované nabíječkami HPC DC (Ionity, Fastned, Tesla Supercharger V3)
- ISO 15118-20: Obousměrná podpora V2G – podpora hardwaru připravena, software bude spuštěn v letech 2025–2026
- Požadavek AFIR: Všechny nové stanice s podporou V2G musí od roku 2026 podporovat ISO 15118
- Soulad s AFIR 2027: Každá nabíječka nainstalovaná po 1. 1. 2027 musí být připravena na chytré nabíjení
- Skutečné V2G v Itálii: první piloti s Enel X Way a Nissan Leaf na standardu V2H (Vehicle-to-Home)
Škálovatelná architektura: od 10 do 100 000 nabíjecích bodů
Podnikový CSMS musí spravovat od několika desítek až po stovky tisíc připojení Konkurenční WebSockets. Architektura se vyvíjí ve fázích s dalšími komponenty, které přicházejí do hry v různých měřítcích.
Fáze 1: Malé měřítko (10–500 stanic)
+------------------+ WebSocket/OCPP +------------------+
| Charging |------------------------| CSMS Monolitico |
| Stations (10-500)| wss://csms:9000/ocpp | Python/asyncio |
+------------------+ | Port 9000 |
+--------+---------+
|
+-------+-------+
| PostgreSQL |
| Redis (cache) |
+---------------+
Stack: Python asyncio + PostgreSQL + Redis
Deployment: 1 VM (4 vCPU, 8GB RAM), 1 DB managed
Costo: ~$200/mese
Fáze 2: Střední měřítko (500–10 000 stanic)
+------------+ +------------------+ +--------------+
| Load | | WS Gateway #1 | | Message |
| Balancer +---->| (asyncio CSMS) +---->| Broker |
| (HAProxy) | | Max 2000 conn | | (RabbitMQ) |
| | +------------------+ | |
| Sticky +---->| WS Gateway #2 +---->| |
| Sessions | | (asyncio CSMS) | +--------------+
| | +------------------+ |
+------------+ +------+------+
| Business |
| Services |
| (FastAPI) |
+------+------+
|
+----------+----------+
| PostgreSQL (HA) |
| TimescaleDB |
| Redis Cluster |
+---------------------+
Sticky sessions: basate su station_id nel path URL
Cross-node ops: Redis pub/sub per inviare comandi alle stazioni
Costo: ~$2.000/mese (K8s managed)
Fáze 3: Velký rozsah (10 000–100 000 stanic)
Global Load Balancer (Anycast)
|
+---------------+---------------+
| |
Region EU-WEST Region EU-SOUTH
+------------------+ +------------------+
| WS Gateway Pool | | WS Gateway Pool |
| (50 pods, 2000 | | (30 pods) |
| conn each = 100K)| +--------+---------+
+--------+---------+ |
| |
+-------------+----------------+
|
+-------+--------+
| Apache Kafka |
| (12 partitions)|
| per topic |
+-------+--------+
|
+-----------------+------------------+
| | |
+------+------+ +-------+------+ +--------+------+
| Transaction | | Smart | | Device |
| Service | | Charging Svc | | Mgmt Svc |
| (10 replicas)| | (5 replicas) | | (3 replicas) |
+------+------+ +-------+------+ +--------+------+
| | |
+--------+--------+------------------+
|
+--------+--------+
| PostgreSQL |
| Citus (sharding) |
| Shard key: |
| station_id hash |
+--------+---------+
|
+--------+--------+
| TimescaleDB |
| (meter values) |
+--------+--------+
Kafka Topics:
- ocpp.boot-notification (chiave: station_id)
- ocpp.transaction-events (chiave: transaction_id)
- ocpp.meter-values (chiave: station_id)
- ocpp.status-notifications (chiave: station_id)
- csms.commands (chiave: station_id)
Throughput target: 1M messaggi/ora, latenza P99 < 200ms
Costo: ~$30.000/mese (multi-region Kubernetes)
Správa připojení mezi uzly s Redis
V nasazení s více uzly potřebuje CSMS vědět, na které bráně jsou jednotlivé brány stanice k odesílání příkazů (SetChargingProfile, RemoteStart atd.). Redis hospoda/sub řeší problém:
import json
import asyncio
import redis.asyncio as aioredis
redis_client = aioredis.from_url(
'redis://redis-cluster:6379',
encoding='utf-8',
decode_responses=True
)
# Registra il nodo della connessione
async def register_connection(station_id: str, gateway_id: str):
await redis_client.setex(
f"csms:gateway:{station_id}",
value=gateway_id,
time=600 # TTL: 10 minuti, rinnovato a ogni heartbeat
)
# Pubblica un comando verso una stazione (qualunque nodo sia)
async def publish_command(station_id: str, action: str, payload: dict):
channel = f"csms:commands:{station_id}"
await redis_client.publish(channel, json.dumps({
'action': action,
'payload': payload
}))
# Su ogni nodo gateway: ascolta i comandi per le stazioni connesse
async def listen_for_commands(connected_stations: dict):
pubsub = redis_client.pubsub()
# Sottoscrivi ai canali delle stazioni connesse a questo nodo
async def subscribe_station(station_id: str):
await pubsub.subscribe(f"csms:commands:{station_id}")
async for message in pubsub.listen():
if message['type'] != 'message':
continue
station_id = message['channel'].split(':')[-1]
cp = connected_stations.get(station_id)
if not cp:
continue # Stazione non su questo nodo, ignora
cmd = json.loads(message['data'])
try:
if cmd['action'] == 'SetChargingProfile':
await cp.send_charging_profile(
cmd['payload']['evse_id'],
cmd['payload']['profile']
)
elif cmd['action'] == 'RemoteStart':
await cp.send_remote_start(
cmd['payload']['evse_id'],
cmd['payload']['id_token'],
)
except Exception as e:
log.error(f"Errore esecuzione comando {cmd['action']}: {e}")
Monitoring, metriky a Grafana Dashboard
CSMS ve výrobě vyžaduje komplexní systém pozorovatelnosti. Metriky klíč ke sledování se týká zdraví infrastruktury, kvality servis a provozní výkon.
Klíčové provozní metriky
| Metrický | Vzorec/zdroj | Cílová SLA | Práh výstrahy |
|---|---|---|---|
| Dostupnost stanice | Online stanice / Celkový počet stanic x 100 | >= 99 % | < 95 % |
| Latence zpráv OCPP P99 | Doba hovoru -> VÝSLEDEK VOLÁNÍ (95. percentil) | < 2 s | > 5 s |
| Úspěšnost transakce | TX dokončeno / TX zahájeno x 100 | >= 98 % | < 95 % |
| Dodaná energie (kWh/hod) | Součet MeterValues prozatím | Základní +10 % | < výchozí -20 % |
| Míra odmítnutí ověření | Neplatné ověření / celkové ověření x 100 | < 2 % | > 10 % (možný útok) |
| Obnovení připojení WebSocket/hod | Počítat nová spojení na stanici | < 2/hod/stanici | > 10/hod./stanici |
| Shoda s inteligentním nabíjením | Skutečný výkon vs nastavený profil | +/- 5 % | Odchylka > 15 % |
| Certifikované dny platnosti | Počet dní do vypršení platnosti certifikátů TLS | > 30 dní | < 30 dní (upozornění na obnovení) |
Prometheus Exporter pro CSMS
from prometheus_client import (
Counter, Gauge, Histogram, start_http_server
)
# Metriche Prometheus
OCPP_MESSAGES_TOTAL = Counter(
'ocpp_messages_total',
'Numero totale messaggi OCPP processati',
['action', 'direction', 'status'] # direction: inbound/outbound
)
OCPP_MESSAGE_DURATION = Histogram(
'ocpp_message_duration_seconds',
'Latenza elaborazione messaggi OCPP',
['action'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
STATIONS_CONNECTED = Gauge(
'csms_stations_connected_total',
'Numero stazioni connesse al CSMS'
)
ACTIVE_TRANSACTIONS = Gauge(
'csms_active_transactions_total',
'Numero transazioni di ricarica attive'
)
ENERGY_DELIVERED_WH = Counter(
'csms_energy_delivered_wh_total',
'Energia totale erogata in Wh',
['station_id']
)
AUTH_RESULTS = Counter(
'csms_authorization_results_total',
'Risultati delle autorizzazioni OCPP',
['status'] # Accepted, Invalid, Blocked, Expired
)
SMART_CHARGING_EVENTS = Counter(
'csms_smart_charging_events_total',
'Operazioni smart charging',
['action', 'result']
)
def start_metrics_server(port: int = 8001):
"""Avvia il server HTTP Prometheus su porta specificata."""
start_http_server(port)
log.info(f"Prometheus metrics su http://0.0.0.0:{port}/metrics")
# Decorator per misurare latenza handler
import time
import functools
def track_ocpp_handler(action: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='success'
).inc()
return result
except Exception as e:
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='error'
).inc()
raise
finally:
OCPP_MESSAGE_DURATION.labels(action=action).observe(
time.monotonic() - start
)
return wrapper
return decorator
// Pannelli principali per dashboard Grafana CSMS
// 1. Stazioni online (Gauge)
{
"title": "Stazioni Connesse",
"type": "gauge",
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 90 },
{ "color": "green", "value": 99 }
]
}
}
},
"targets": [{
"expr": "csms_stations_connected_total / csms_stations_registered_total * 100",
"legendFormat": "Availability %"
}]
}
// 2. Latenza messaggi OCPP P99 (Time Series)
{
"title": "OCPP Message Latency P99",
"type": "timeseries",
"targets": [{
"expr": "histogram_quantile(0.99, rate(ocpp_message_duration_seconds_bucket[5m]))",
"legendFormat": "P99 - {{action}}"
}]
}
// 3. Energia erogata (Stat panel)
{
"title": "Energia Totale Erogata oggi (kWh)",
"type": "stat",
"targets": [{
"expr": "increase(csms_energy_delivered_wh_total[24h]) / 1000",
"legendFormat": "kWh"
}]
}
// 4. Auth rejection rate - alert su attacchi
{
"title": "Rejection Rate Autorizzazioni (%)",
"type": "timeseries",
"targets": [{
"expr": "rate(csms_authorization_results_total{status='Invalid'}[5m]) / rate(csms_authorization_results_total[5m]) * 100"
}]
}
Italské a evropské předpisy: AFIR, PNIRE, PNRR
Infrastruktura nabíjení elektromobilů v Itálii a Evropě je přísně regulována. Shoda s předpisy není volitelná: týká se obou technických požadavků stanice (legální metrologie, dostupnost, platba) a komunikační standardy.
AFIR (Nařízení o infrastruktuře pro alternativní paliva – EU 2023/1804)
AFIR vstoupila v platnost v dubnu 2024 a definuje přesný harmonogram povinnosti týkající se dobíjecí infrastruktury v sítích TEN-T a v městských oblastech:
| Vypršení platnosti | Požadavek | Použitelnost |
|---|---|---|
| 31. prosince 2025 | Stanice >= 150 kW každých 60 km v hlavní síti TEN-T | Celá EU |
| 31. prosince 2027 | Stanice >= 150 kW každých 60 km TEN-T Komplexní síť | Celá EU |
| 14. dubna 2025 | Zdarma statická a dynamická data (umístění, typ konektoru, dostupnost) | Veřejné stanice |
| 1. ledna 2027 | Chytré nabíjení připraveno pro nové/renovované stanice > 22 kW | Veřejné stanice |
| Bezprostřední | Ad-hoc platba bez předplatného (bezkontaktní bankovní karta) | Stanice > 50 kW veřejné |
| 2026+ | ISO 15118 pro stanice podporující V2G | Obousměrné stanice |
PNIRE a PNRR pro Itálii
V Itálii probíhá implementace AFIR prostřednictvím dvou hlavních nástrojů:
- PNIRE (Národní plán infrastruktury pro nabíjení elektřiny): řízená MASE (Ministerstvo životního prostředí a energetické bezpečnosti), definuje národní cíle: 13 755 veřejných stanic v roce 2025, zaměření na silniční síť a městské oblasti
- PNRR mise 2, investice 4.3: přes 700 milionů eur přidělené pro nabíjení vysokým výkonem (HPC >= 150 kW) na dálnicích a oblastech služby. Podle MEMORY vyčlenila PNRR celkem 12,7 miliard EUR, s stále částečné využití
- Volejte MASE 2024-2025: pobídky pro soukromé provozovatele, kteří instalovat infrastruktury v oblastech s nízkou hustotou nabíjení (jižní Itálie, venkovské oblasti). Dotace až 60 % nákladů na instalaci
Situace v Itálii: 73 000 nabíjecích míst v roce 2025
K 31. prosinci 2025 se Itálie počítá 73 000+ veřejných nabíjecích míst (+18 % ve srovnání s rokem 2024), s 93% celostátním územním pokrytím. z těchto: kolem 12 000 rychlonabíjecích stanic (> 22 kW) a 5 000 HPC (>= 150 kW). Regionem s největší infrastrukturou je Lombardie (23 %), následuje z Lazia (12 %) a Toskánska (9 %). Jih zůstává prioritní oblastí i Financování PNRR s cílem zacelit mezeru do roku 2026.
Technické povinnosti pro software CSMS
- OCPI (otevřené rozhraní nabíjecího bodu): povinný protokol pro roaming mezi CPO (operátor nabíjecího bodu) a eMSP (poskytovatel služeb elektronické mobility). Doporučuje se OCPI verze 2.2.1
- Legální metrologie: v Německu (Eichrecht) a postupně v EU (MID - Směrnice o měřicích přístrojích) musí být měřicí systémy certifikovány a odečty musí být transparentní a uživatelsky neměnné
- GDPR: údaje o nabíjecí relaci (RFID, místo, časy) jsou osobní údaje. Vyžaduje zásady ochrany osobních údajů, minimalizaci dat a právo být zapomenut
- CDR (podrobné záznamy o náboji): musí být v souladu s formátem OCPI pro interoperabilní fakturaci a musí být uchovávány po dobu nejméně 5 let (italské daňové povinnosti)
Případová studie: Italská dobíjecí síť s více než 50 stanicemi
Prozkoumejme architekturu skutečného systému CSMS pro italského operátora, který spravuje 50 DC nabíjecích stanic (22-150 kW) rozmístěných na městských parkovištích a nákupní centra ve 3 krajích.
Provozní požadavky
- 50 stanic, celkem 150 EVSE (průměrně 3 EVSE na stanici), 300 konektorů (CCS2 + Type2)
- Denní špička: 400–600 nabíjecích relací (7–9 hodin, 12–14 hodin, 17–21 hodin)
- Chytré nabíjení: splnění limitu 200 A na stanici, integrace s SEM (Site Energy Manager)
- Více nájemců: 3 CPO s částečnou viditelností stanice, OCPI roaming s Enelem
- Uptime SLA: 99,5 % měsíčně na stanici, 99,9 % pro backend CSMS
Vybraná architektura
+------------------+ +------------------+ +------------------+
| 50 Stazioni | | HAProxy | | CSMS Primary |
| OCPP 2.0.1 |---->| (WS sticky sess.)|---->| Python asyncio |
| Security Profile 2| | Port 443 (TLS) | | 2 replicas |
+------------------+ +------------------+ +--------+---------+
|
+------------------+ +--------+---------+
| CSMS Worker |<----| Redis Cluster |
| (FastAPI REST) | | (stato sessioni) |
| Dashboard, API | +------------------+
+------------------+
|
+-----------+----------+
| |
+-------+-------+ +---------+------+
| PostgreSQL 16 | | TimescaleDB |
| (transazioni, | | (meter values)|
| auth, device | | 2TB/anno est. |
| model, CDR) | +---------------+
+---------------+
Monitoring: Prometheus + Grafana Cloud
Alerting: PagerDuty (P1: stazione offline >5min, P2: CSMS latency >2s)
CDR/Billing: integrazione ERP via webhook PostgreSQL NOTIFY
Skutečné provozní metriky (typický měsíc)
| Metrický | Hodnota | Poznámky |
|---|---|---|
| Relace/měsíc | ~14 000 | Průměrně 280 sezení/den |
| Dodaná energie | ~85 000 kWh/měsíc | Průměrně 6 kWh/sezení |
| Průměrná dostupnost stanice | 99,3 % | 0,7 % prostoje = ~5 h/měsíc na stanici |
| Míra přijetí ověření | 96,8 % | 3,2 % odmítnuto = platnost vypršela nebo není zaregistrováno |
| Latence OCPP P95 | 180 ms | Zahrnuje Round-Trip LTE stanici |
| Smart Charging Events/den | ~1200 | Průměrně 24 rebalancí/hod |
| Špičková účinnost holení | 92 % | 92 % výkonu času <= nastavený limit |
Zabezpečení CSMS: OWASP a model hrozeb
CSMS je kritická infrastruktura: kompromis může způsobit narušení nabíjení, manipulace s účtováním nebo útoky na elektrickou síť prostřednictvím nabíjecích zátěží. Hlavní hrozby jsou identifikovatelné podle konkrétní model hrozby.
Model hrozeb CSMS
| Ohrožení | Vektor | Dopad | Zmírnění |
|---|---|---|---|
| Neoprávněná stanice | Spojení s odcizenými přihlašovacími údaji | Falešné vkládání dat, podvodná spotřeba | Bezpečnostní profil 3 (mTLS), certifikovaný pining |
| Man-in-the-Middle | Zachycování WS na nezabezpečených sítích | Zachycování RFID tokenů, manipulace s příkazy | TLS 1.3 povinná, certifikovaná transparentnost |
| Replay Attack | Opakovaný přenos zachycených zpráv OCPP | Dvojité účtování, neplatná oprávnění | Jedinečné MessageId, ověření časového razítka, nonce |
| DDoS WebSocket | Záplava spojení nebo zpráv | CSMS nedostupný, infrastruktura DoS | Omezení rychlosti, škrcení připojení, WAF |
| SQL Injection přes užitečné zatížení OCPP | Užitná zátěž OCPP s datovou zátěží SQL v poli idToken | Exfiltrace DB, eskalace oprávnění | Připravené výpisy, ORM, validace vstupů |
| Klonování RFID | Klonování legitimních RFID karet | Relace placené jinými uživateli | ISO 15118 P&C, RFID whitelist, detekce anomálií |
| Škodlivý firmware | Aktualizace firmwaru pomocí malwaru | Fyzické ovládání stanice, manipulace s mřížkou | Firmware digitální podpis, bezpečné spouštění, SBOM |
CSMS Hardening: Bezpečnostní kontrolní seznam
import ssl
import re
from functools import wraps
# 1. Configurazione TLS sicura (Security Profile 2-3)
def create_tls_context(
certfile: str,
keyfile: str,
cafile: str,
require_client_cert: bool = False
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
if require_client_cert: # Security Profile 3 (mTLS)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=cafile)
# Disabilita cipher suite deboli
ctx.set_ciphers(
'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:!aNULL:!eNULL:!LOW:!EXPORT'
)
return ctx
# 2. Rate limiting per connessioni WebSocket
from collections import defaultdict
import time
_connection_attempts: dict[str, list[float]] = defaultdict(list)
MAX_CONN_PER_MINUTE = 10
def check_rate_limit(client_ip: str) -> bool:
"""Ritorna True se il client può connettersi, False se throttled."""
now = time.monotonic()
window = _connection_attempts[client_ip]
# Rimuovi tentativi più vecchi di 60 secondi
_connection_attempts[client_ip] = [
t for t in window if now - t < 60
]
if len(_connection_attempts[client_ip]) >= MAX_CONN_PER_MINUTE:
log.warning(f"Rate limit superato per IP: {client_ip}")
return False
_connection_attempts[client_ip].append(now)
return True
# 3. Validazione MessageId per prevenire replay attack
_seen_message_ids: set[str] = set()
_message_id_pattern = re.compile(r'^[a-zA-Z0-9\-_\.]{1,36}






