OCPP 2.x: Budowa systemów ładowania pojazdów elektrycznych dla przedsiębiorstw
Globalny rynek infrastruktury ładowania pojazdów elektrycznych nadrobił zaległości 40,22 miliarda dolarów w 2025 roku i będzie rósł w tempie CAGR wynoszącym 25% do 2033 r., według badań Grand View. W Europie AFIR (Alternative Fuels Rozporządzenie w sprawie infrastruktury) nakłada wiążące terminy: co 60 km w sieci TEN-T do końca 2025 r. musi powstać stacja o mocy co najmniej 150 kW. We Włoszech z poza 73 000 publicznych punktów ładowania do 31 grudnia 2025 r i PNRR która przeznaczyła ponad 700 milionów euro na ładowanie dużej mocy, sektor i in pełna eksplozja.
Sercem tej infrastruktury jest tzwProtokół otwartego punktu ładowania (OCPP), otwarty standard opracowany przez Open Charge Alliance (OCA), który określa, w jaki sposób stacje ładowania (Stacje ładowania) komunikują się ze scentralizowanymi systemami zarządzania (CSMS – Systemy Zarządzania Stacjami Ładowania). Przyjęte przez ponad 250 organizacji w w ponad 40 krajach, OCPP stał się de facto standardem w branży, zapewniając interoperacyjność sprzętu różnych producentów i różnych programów zarządzających.
W tym zaawansowanym artykule technicznym badamy OCPP 2.0.1 i 2.1 w głębokość: architektura WebSocket, struktura wiadomości, model urządzenia, profile bezpieczeństwo, inteligentne ładowanie i budowanie skalowalnego, gotowego do produkcji backendu CSMS Od 10 do 100 000 stacji z pełnymi przykładami kodu w TypeScript i Python.
Czego się nauczysz
- Ewolucja protokołu OCPP z wersji 1.2 do 2.1 i podstawowe różnice architektoniczne
- Struktura wiadomości JSON (CALL, CALLRESULT, CALLERROR) i transport WebSocket
- 16 bloków funkcjonalnych OCPP 2.0.1 i hierarchiczny model urządzenia
- 3 profile bezpieczeństwa: Basic Auth, TLS, mTLS z certyfikatami X.509
- Zaawansowane inteligentne ładowanie: SetChargingProfile, zarządzanie obciążeniem, eliminowanie szczytów, integracja z energią słoneczną
- Implementacja backendu CSMS w Pythonie z asyncio i PostgreSQL
- ISO 15118 i Plug & Charge: PKI, eMAID, dwukierunkowy V2G
- Skalowalna architektura: od 10 do 100 000 punktów ładowania z klastrem WebSocket i platformą Kafka
- Monitoring, dashboard Grafana i kluczowe wskaźniki operacyjne
- Regulacje AFIR, PNIRE i włoskie zachęty dla infrastruktury ładowania
Seria EnergyTech: 10 artykułów na temat energii cyfrowej
Artykuł ten jest pierwszym z serii poświęconejEnergiaTech: protokoły, architektury i oprogramowanie, które rewolucjonizują zarządzanie energią elektryczną, od ładowania pojazdów elektrycznych po inteligentne sieci, od systemów BESS po optymalizację zużycia energii za pomocą sztucznej inteligencji.
| # | Przedmiot | Technologie | Poziom |
|---|---|---|---|
| 1 | Protokół OCPP 2.x: Budowa systemów ładowania pojazdów elektrycznych (tutaj jesteś) | OCPP, WebSocket, Python, ISO 15118 | Zaawansowany |
| 2 | Inteligentna sieć i OpenADR: reagowanie na zapotrzebowanie i elastyczność energetyczna | OpenADR, IEEE 2030.5, REST, MQTT | Zaawansowany |
| 3 | BESS (Bateryjne Magazynowanie Energii): Algorytmy Optymalizacji i BMS | Python, optymalizacja LP, magistrala CAN, Modbus | Zaawansowany |
| 4 | Cyfrowy bliźniak dla sieci elektrycznych z Kafką i uczeniem maszynowym | Kafka, InfluxDB, Grafana, ML, Python | Zaawansowany |
| 5 | SCADA i ICS dla infrastruktury krytycznej: bezpieczeństwo i protokoły | Modbus, DNP3, IEC 61850, OPC UA | Zaawansowany |
| 6 | Optymalizacja energii za pomocą sztucznej inteligencji: prognozowanie zużycia i prognozowanie zapotrzebowania | TensorFlow, LSTM, Prorok, FastAPI | Zaawansowany |
| 7 | Wirtualna elektrownia: Agregat DER z Pythonem i API REST | DER, DERMS, REST, Python, PostgreSQL | Mediator |
| 8 | Rynki energii i handel algorytmiczny: EPEX SPOT i API | Python, handel API, szeregi czasowe | Zaawansowany |
| 9 | Oprogramowanie do rozliczania emisji dwutlenku węgla: zakres 1, 2, 3 i raportowanie gazów cieplarnianych | Python, protokół GHG, API, raportowanie | Mediator |
| 10 | Mikrosieci i wyspa energetyczna: odporne architektury | Mikrosieci, EMS, przetwarzanie brzegowe, IoT | Zaawansowany |
Ewolucja protokołu: od OCPP 1.2 do 2.1
Zrozumienie ewolucji OCPP ma kluczowe znaczenie, aby docenić wybory architektoniczne wersję 2.0.1 i planuj migracje ze starszych systemów. Powstał protokół w 2010 roku, aby rozwiązać probleminteroperacyjność: każdy producent stacji posiadało własny, autorski protokół, uniemożliwiający zarządzanie wielu dostawców.
| Wersja | Rok | Transport | Kluczowe funkcje | Zastosowanie |
|---|---|---|---|---|
| OCPP 1.2 | 2010 | SOAP/XML | Pierwsza wersja publiczna, podstawowe operacje: boot, autoryzacja, start/stop | Wycofany ze służby |
| OCPP 1.5 | 2012 | SOAP/XML | Rezerwacja, inteligentna baza ładująca, transfer danych, reset | Dziedzictwo |
| OCPP 1.6 | 2015 | SOAP + JSON/WS | WebSocket, profile ładowania, komunikaty wyzwalające, lista uwierzytelniania lokalnego | Bardzo rozpowszechnione |
| OCPP 2.0 | 2018 | JSON/WS | Model urządzenia, bloki funkcjonalne, podstawa ISO 15118 (zastąpiona przez 2.0.1) | Rzadki |
| OCPP 2.0.1 | 2020 | Tylko JSON/WS | 16 bloków funkcjonalnych, model urządzenia, 3 profile bezpieczeństwa, zaawansowane inteligentne ładowanie | Aktualna norma |
| UOKiK 2.1 | 2025 | Tylko JSON/WS | Wstecznie kompatybilny 2.0.1, V2G ISO 15118-20, ładowanie natywne, wymiana baterii | Pojawiające się |
Podstawowe różnice pomiędzy OCPP 1.6 i 2.0.1
OCPP 2.0.1 nie jest prostą aktualizacją przyrostową: jest to: przepisywanie kompletna architektoniczna co zmienia terminologię, strukturę przekazu i model koncepcyjny. Ta niezgodność „z założenia” była konieczna pokonać ograniczenia strukturalne OCPP 1.6.
| Czekam | OCPP 1.6 | OCPP 2.0.1 |
|---|---|---|
| Terminologia serwerowa | System Centralny | CSMS (system zarządzania stacją ładowania) |
| Terminologia klienta | Punkt ładowania | Stacja ładująca |
| Jednostka ładująca | Złącze | EVSE (urządzenia do zasilania pojazdów elektrycznych) |
| Transakcje | Rozpocznij transakcję / Zatrzymaj transakcję | Ujednolicone zdarzenie transakcji (rozpoczęte/zaktualizowane/zakończone) |
| Konfiguracja | Naprawiono klawisze (ChangeConfiguration) | Hierarchiczny model urządzenia (GetVariables/SetVariables) |
| Bezpieczeństwo | Opcjonalne, niestandaryzowane | 3 zintegrowane i obowiązkowe profile bezpieczeństwa |
| Inteligentne ładowanie | Podstawa (profile łączące) | Zaawansowane: dla EVSE, priorytet stosu, harmonogramy złożone |
| ISO15118 | Nieobsługiwane | Natywny blok M (podłącz i ładuj) |
| Konkretna organizacja | Płaska lista operacji | 16 bloków funkcjonalnych z przypadkami użycia, wymaganiami, diagramami |
OCPP 2.1: Co nowego w 2025 roku
Wydany w styczniu 2025 roku przez Open Charge Alliance, OCPP 2.1 pozostaje pełny wstecznie kompatybilny z wersją 2.0.1 i dodaje najważniejsze funkcje na przyszłość:
- Zaawansowane V2G (pojazd-sieć).: Pełna obsługa normy ISO 15118-20 z dwukierunkowym przenoszeniem mocy, dzięki czemu pojazdy elektryczne mogą pełnić funkcję wirtualnych elektrowni
- Integracja DER: Zaawansowane narzędzia do optymalizacji energii rozproszonej z wykorzystaniem zasobów takich jak panele fotowoltaiczne i systemy magazynowania
- Ceny natywne: standaryzowane struktury danych do przekazywania stawek w czasie rzeczywistym (kWh, czas, opłaty parkingowe) bez rozszerzeń specyficznych dla dostawcy
- Wymiana baterii: obsługa stacji wymiany akumulatorów w pojazdach dwu- i trzykołowych
- Wznów transakcję: możliwość wznowienia transakcji po wymuszonym ponownym uruchomieniu bez utraty danych
- Koszt lokalny: kalkulacja kosztów bezpośrednio na stacji w przypadku spraw offline
Architektura komunikacji WebSocket
OCPP 2.0.1 używa wyłącznie JSON przez WebSocket jako protokół transportu, całkowicie rezygnując z SOAP/XML. Ten wybór architektoniczny zapewnia Stała dwukierunkowa komunikacja, małe opóźnienia, niewielka ładowność i kompatybilność natywnie z nowoczesną infrastrukturą sieciową.
Topologia klient-serwer
W modelu OCPP tzw Stacja ładująca zachowuje się jak klient WebSockety e il CSMS-a Jak Serwer WebSocket. Stacja ładująca inicjuje połączenie i utrzymuje je aktywne za pomocą mechanizmu bicia serca. CSMS może wysyłać polecenia do stacji za pomocą tego samego połączenia Otwórz WebSocket, nie ma potrzeby odwrotnego połączenia (bez odpytywania, bez pchania). oddzielne).
Charging Station CSMS
| |
|--- WebSocket CONNECT ---------------->|
| wss://csms.example.com/ocpp/CS001 |
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| Authorization: Basic base64(...) |
| |
|<-- HTTP 101 Switching Protocols -------|
| Sec-WebSocket-Protocol: ocpp2.0.1 |
| |
|--- BootNotification.req ------------->|
|<-- BootNotification.conf --------------|
| (interval: 300, status: Accepted) |
| |
|--- StatusNotification.req[EVSE1] ---->|
|<-- StatusNotification.conf ------------|
| |
|--- Heartbeat.req (ogni 300s) -------->|
|<-- Heartbeat.conf --------------------|
| |
| <-- utente avvicina RFID --- |
|--- Authorize.req -------------------->|
|<-- Authorize.conf (Accepted) ----------|
| |
|--- TransactionEvent(Started) -------->|
|<-- TransactionEvent.conf -------------|
| |
|<-- SetChargingProfile.req ------------| (CSMS gestisce load)
|--- SetChargingProfile.conf ---------->|
| |
|--- MeterValues (ogni 60s) ----------->|
|<-- MeterValues.conf ------------------|
| |
|--- TransactionEvent(Ended) ---------->|
|<-- TransactionEvent.conf -------------|
Adres URL połączenia i podprotokół
Stacja ładująca łączy się z CSMS za pomocą adresu URL zawierającego własny adres URL
unikalny identyfikator jako ostatni odcinek ścieżki.
Podprotokół WebSocket ocpp2.0.1 jest negocjowana podczas uścisku dłoni
HTTP, aby zapewnić zgodność wersji protokołu.
# Formato URL
wss://csms.example.com/ocpp/{chargingStationId}
# Esempi reali
wss://csms.example.com/ocpp/IT-MIL-STATION-001
wss://csms.example.com/ocpp/EVSE-PARK-NORD-042
wss://csms.example.com/ocpp/CPO-AUTOGRILL-A7-01
# Headers WebSocket obbligatori
Sec-WebSocket-Protocol: ocpp2.0.1
Authorization: Basic {base64(stationId:password)} # Security Profile 1-2
# Con Security Profile 3 (mTLS): nessun header Authorization,
# l'autenticazione avviene tramite certificato client TLS
Struktura komunikatów OCPP 2.0.1
OCPP 2.0.1 definiuje trzy typy komunikatów JSON, wszystkie transportowane jako ramki Tekst protokołu WebSocket. Każda wiadomość jest Tablica JSON z formatem dokładne w zależności od typu wiadomości. Ta prosta struktura ułatwia analizowanie i debugowanie w porównaniu z obciążeniem SOAP/XML.
CALL (żądanie) — MessageTypeId 2
Wiadomość DZWONIĆ reprezentuje żądanie wysłane przez jedną ze stron (stacja ładująca lub CSMS) do innego. Zawiera unikalny identyfikator, nazwę akcji i ładunek żądania.
// Formato: [MessageTypeId, MessageId, Action, Payload]
// Esempio: BootNotification dalla Charging Station
[2, "19223201", "BootNotification", {
"chargingStation": {
"model": "SuperCharger-500",
"vendorName": "EVPower Inc.",
"serialNumber": "SN-2025-00142",
"firmwareVersion": "3.2.1",
"modem": {
"iccid": "8939100000000000001",
"imsi": "310260000000001"
}
},
"reason": "PowerUp"
}]
// Esempio: TransactionEvent dalla Charging Station
[2, "tx-evt-001", "TransactionEvent", {
"eventType": "Started",
"timestamp": "2026-03-09T10:30:00Z",
"triggerReason": "CablePluggedIn",
"seqNo": 0,
"transactionInfo": {
"transactionId": "TXN-2026-0309-001",
"chargingState": "EVConnected"
},
"evse": { "id": 1, "connectorId": 1 },
"idToken": {
"idToken": "RFID-04A2B3C4D5",
"type": "ISO14443"
}
}]
CALLRESULT (odpowiedź) — MessageTypeId 3
Wiadomość WYNIK POŁĄCZENIA oraz pozytywną reakcję na WEZWANIE. The Aby zezwolić, MessageId musi dokładnie odpowiadać identyfikatorowi oryginalnego CALL korelacja żądanie-odpowiedź.
// Formato: [MessageTypeId, MessageId, Payload]
// Risposta a BootNotification
[3, "19223201", {
"currentTime": "2026-03-09T10:00:00Z",
"interval": 300,
"status": "Accepted"
}]
// Risposta a TransactionEvent (Started)
[3, "tx-evt-001", {
"totalCost": 0,
"chargingPriority": 0,
"idTokenInfo": {
"status": "Accepted",
"groupIdToken": {
"idToken": "GROUP-FLEET-01",
"type": "Central"
}
}
}]
CALLERROR (Błąd) — MessageTypeId 4
Wiadomość TELEFON jest wysyłany, gdy odbiorca tego nie robi może obsłużyć POŁĄCZENIE. Zawiera ustandaryzowany kod błędu i opis czytelne i uporządkowane szczegóły.
// Formato: [MessageTypeId, MessageId, ErrorCode, ErrorDescription, ErrorDetails]
[4, "19223201", "FormatViolation",
"Il campo 'vendorName' supera la lunghezza massima di 50 caratteri",
{
"field": "chargingStation.vendorName",
"maxLength": 50,
"actualLength": 67
}
]
// Codici di errore OCPP 2.0.1 standardizzati:
// FormatViolation - messaggio JSON malformato
// GenericError - errore generico non classificabile
// InternalError - errore interno del ricevente
// MessageTypeNotSupported - tipo di messaggio non supportato
// NotImplemented - azione riconosciuta ma non implementata
// NotSupported - azione non supportata dall'implementazione
// OccurrenceConstraintViolation - violazione cardinalita elementi
// PropertyConstraintViolation - vincolo su una proprietà violato
// ProtocolError - violazione del protocollo OCPP
// RpcFrameworkError - errore nel framework RPC di base
// SecurityError - errore di sicurezza o autenticazione
// TypeConstraintViolation - tipo di dato non corretto
Korelacja żądanie-odpowiedź: zasady krytyczne
Każde wywołanie musi mieć Unikalny identyfikator wiadomości (maksymalnie 36 znaków alfanumeryczny), który nie był wcześniej używany w tym samym połączeniu przez ten sam nadawca. CALLRESULT lub CALLERROR muszą go używać ten sam identyfikator wiadomości. Nadawca musi zachować limit czasu (zalecany: 30 sekund), po którym wiadomość żądanie uważa się za nieudane. W danej chwili może być oczekujące tylko jedno CALL każdym kierunku komunikacji: stacja nie może wysłać drugiego CALL do godz na pierwsze nie otrzymano odpowiedzi.
16 bloków funkcjonalnych OCPP 2.0.1
OCPP 2.0.1 organizuje całą funkcjonalność w 16 bloków funkcjonalnych (A do P), każdy zawierający konkretne przypadki użycia ze szczegółowymi wymaganiami, warunki wstępne i diagramy sekwencji. Pozwala na to modułowa organizacja implementatorzy deklarują, które bloki obsługują, a testerzy mają zweryfikować zgodność blok po bloku.
| Blok | Nazwa | Najważniejsze wiadomości | Obowiązkowy |
|---|---|---|---|
| A | Bezpieczeństwo | SecurityEventNotification, SignCertificate, CertyfikatSigned | Si |
| B | Aprowizacja | BootNotification, SetVariables, GetVariables, NotifyReport | Si |
| C | Upoważnienie | Autoryzuj, ClearCache, GetLocalListVersion | Si |
| D | Lokalna lista autoryzacji | WyślijLocalList, GetLocalListVersion | No |
| E | Transakcja | TransactionEvent, GetTransactionStatus, MeterValues | Si |
| F | Zdalne sterowanie | RequestStartTransaction, RequestStopTransaction, UnlockConnector | No |
| G | Dostępność | Powiadomienie o statusie, dostępność zmiany, puls | Si |
| H | Rezerwacja | Zarezerwuj teraz, anuluj rezerwację | No |
| I | Taryfa i koszt | Koszt zaktualizowany, ShowMessage | No |
| J | Dozowanie | MeterValues (pomiary energii/mocy/prądu) | Si |
| K | Inteligentne ładowanie | SetChargingProfile, ClearChargingProfile, GetChargingProfiles, ReportChargingProfiles | No |
| L | Zarządzanie oprogramowaniem | Aktualizuj oprogramowanie sprzętowe, powiadomienie o stanie oprogramowania sprzętowego | No |
| M | Certyfikat ISO 15118 Zarządzający | Get15118EVCertyfikat, UsuńCertyfikat, CertyfikatPodpisany | No |
| N | Diagnostyka | GetLog, LogStatusNotification, SetMonitoringBase, SetVariableMonitoring | No |
| O | Wyświetl wiadomość | SetDisplayMessage, GetDisplayMessages, ClearDisplayMessage | No |
| P | Transfer danych | DataTransfer (rozszerzenia specyficzne dla dostawcy) | No |
Model urządzenia: Serce OCPP 2.0.1
Il Model urządzenia i główna innowacja architektoniczna OCPP 2.0.1. Zastępuje system twardych kluczy konfiguracyjnych OCPP 1.6 (ChangeConfiguration kluczami) z a elastyczny model hierarchiczny oparty na Komponenty i zmienne. Każda stacja szczegółowo opisuje swoją strukturę i konfigurację w sposób niezależny od dostawcy.
ChargingStation (radice)
|
+-- Controller (computer della stazione)
| +-- Variables: Vendor, Model, FirmwareVersion, SerialNumber
|
+-- EVSE[1] (punto di ricarica 1)
| +-- Variables: AvailabilityState, Power, SupplyPhases
| +-- Connector[1] (connettore CCS2 / DC)
| | +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
| +-- Connector[2] (connettore CHAdeMO)
| +-- Variables: ConnectorType, AvailabilityState, MaxCurrent
|
+-- EVSE[2] (punto di ricarica 2)
| +-- Connector[1] (connettore Type2 AC / 22kW)
| +-- Variables: ConnectorType, Phases, MaxCurrent
|
+-- PowerMeter (contatore principale)
| +-- Variables: Energy.Active.Import.Register, Power.Active.Import
|
+-- NetworkInterface (ETH0/LTE)
| +-- Variables: Type, SSID, SignalStrength, ActiveNetworkProfile
|
+-- SecurityCtrlr
| +-- Variables: SecurityProfile, CertificateEntries
|
+-- SmartChargingCtrlr
+-- Variables: ChargingProfileMaxStackLevel, ChargeProfileKindsSupported
// CALL dal CSMS: legge stato EVSE e tipo connettore
[2, "get-var-001", "GetVariables", {
"getVariableData": [
{
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeType": "Actual"
},
{
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeType": "Actual"
},
{
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeType": "Actual"
}
]
}]
// CALLRESULT dalla Charging Station
[3, "get-var-001", {
"getVariableResult": [
{
"attributeStatus": "Accepted",
"component": { "name": "EVSE", "evse": { "id": 1 } },
"variable": { "name": "AvailabilityState" },
"attributeValue": "Available"
},
{
"attributeStatus": "Accepted",
"component": { "name": "Connector", "evse": { "id": 1, "connectorId": 1 } },
"variable": { "name": "ConnectorType" },
"attributeValue": "cCCS2"
},
{
"attributeStatus": "Accepted",
"component": { "name": "SmartChargingCtrlr" },
"variable": { "name": "ChargingProfileMaxStackLevel" },
"attributeValue": "5"
}
]
}]
Profile zabezpieczeń OCPP 2.0.1
OCPP 2.0.1 wprowadza trzy Progresywne profile zabezpieczeń które definiują poziom ochrony komunikacji pomiędzy Stacją Ładowania a CSMS. Profil musi zostać wybrany podczas wdrażania i automatycznie konfiguruje mechanizm uwierzytelnianie i szyfrowanie.
| Charakterystyczny | Profil 1 | Profil 2 | Profil 3 |
|---|---|---|---|
| Adres URL gniazda internetowego | ws:// (bez TLS) | wss:// (TLS) | wss:// (TLS) |
| Szyfrowanie | Nic | TLS 1.2+ | TLS 1.2+ |
| Stacja autoryzacyjna | Hasło (podstawowe uwierzytelnianie) | Hasło (podstawowe uwierzytelnianie) | Certyfikat klienta X.509 |
| Uwierzytelnij CSMS | Nic | Certyfikat serwera TLS | Certyfikat serwera TLS |
| Ochrona MitM | No | Częściowe (tylko uwierzytelnianie CSMS) | Pełny (wzajemny TLS) |
| Zarządzanie certyfikatami | Nie jest to konieczne | Tylko główny urząd certyfikacji na urządzeniu | Pełne PKI: CA + certyfikat klienta |
| Zalecane użycie | Tylko środowiska testowe | Produkcja standardowa | Produkcja krytyczna, P&C ISO 15118 |
Zarządzanie certyfikatami w produkcji (profil bezpieczeństwa 3)
Dzięki profilowi zabezpieczeń 3 zarządzanie cyklem życia certyfikatu staje się operacja krytyczna. OCPP 2.0.1 zawiera dedykowane komunikaty: ZnakCertyfikat (stacja wymaga podpisania CSR), CertyfikatPodpisany (CSMS instaluje podpisany certyfikat), Usuń certyfikat (usuwa przestarzały certyfikat), GetInstalledCertificateIds (lista zainstalowanych certyfikatów). I jedno jest niezbędne Solidna infrastruktura PKI przynajmniej z automatycznym odnowieniem 30 dni przed wygaśnięciem, ciągłe monitorowanie ważności i Mechanizm unieważniania list CRL/OCSP.
Inteligentne ładowanie i zarządzanie obciążeniem
Lo Inteligentne ładowanie (blok K) i najbardziej krytyczna funkcjonalność operatorzy z dużymi instalacjami. Umożliwia CSMS dynamiczną kontrolę moc dostarczana przez każdy EVSE w oparciu o ograniczenia sieci, taryfy za energię, priorytet użytkownika i moc transformatora.
Hierarchia profili opłat
OCPP 2.0.1 definiuje cztery typy profili ładowania z poziomami stosu (priorytety):
| Typ profilu | Zakres | Aplikacja | Prześcigać |
|---|---|---|---|
| Stacja ładującaMaxProfile | Bezwzględny maksymalny limit całej stacji | Ochrona transformatora, umowa na dostawę | Nie do nadrobienia |
| Stacja ładującaOgraniczenia zewnętrzne | Limity z systemów zewnętrznych (OSD, agregatory) | Odpowiedź zapotrzebowania, bilansowanie sieci | Tylko z wyższego profilu |
| TxDefaultProfile | Domyślny profil dla transakcji | Polityka taryfowa, podstawowy rozkład jazdy, energia słoneczna | Z określonego TxProfile |
| Profil Tx | Specyficzny profil transakcji | Priorytety użytkownika, indywidualne preferencje | Maksymalna poufność |
SetChargingProfile: Golenie szczytów i integracja z energią słoneczną
// Strategia: integra produzione solare + peak shaving ore serali
// Scenario: sito con 50kW fotovoltaico, trasformatore 100A, picco 19-21h
// 1. Limite massimo stazione (rispetta contratto di fornitura)
[2, "smart-max-001", "SetChargingProfile", {
"evseId": 0,
"chargingProfile": {
"id": 1,
"stackLevel": 1,
"chargingProfilePurpose": "ChargingStationMaxProfile",
"chargingProfileKind": "Absolute",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 100.0 }
]
}]
}
}]
// 2. Profilo solare + peak shaving per un singolo EVSE
[2, "smart-solar-001", "SetChargingProfile", {
"evseId": 1,
"chargingProfile": {
"id": 100,
"stackLevel": 0,
"chargingProfilePurpose": "TxDefaultProfile",
"chargingProfileKind": "Absolute",
"validFrom": "2026-03-09T00:00:00Z",
"validTo": "2026-03-10T00:00:00Z",
"chargingSchedule": [{
"id": 1,
"chargingRateUnit": "A",
"startSchedule": "2026-03-09T06:00:00Z",
"chargingSchedulePeriod": [
{ "startPeriod": 0, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 7200, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 14400, "limit": 32.0, "numberPhases": 3 },
{ "startPeriod": 43200, "limit": 16.0, "numberPhases": 3 },
{ "startPeriod": 46800, "limit": 8.0, "numberPhases": 3 },
{ "startPeriod": 54000, "limit": 24.0, "numberPhases": 3 }
]
}]
}
}]
// Orario potenza: 06-08h: 8A (offpeak, bassa produzione solare)
// 08-12h: 32A (piena produzione FV, massima potenza)
// 12-18h: 32A (picco solare, alta produzione)
// 18-19h: 16A (calo solare, riduzione)
// 19-21h: 8A (picco domanda residenziale, min potenza)
// 21-24h: 24A (fine picco, potenza media)
Algorytm dynamicznego równoważenia obciążenia
Algorytm zarządzania obciążeniem rozdziela dostępną moc pomiędzy sesjami aktywny w czasie rzeczywistym, z zachowaniem limitu i priorytetów transformatora. Najczęstszym podejściem jest Ważony sprawiedliwy udział z ograniczeniami min./maks.
interface ChargingSession {
readonly stationId: string;
readonly evseId: number;
readonly transactionId: string;
readonly priority: number; // 0-9 (9 = massima)
readonly minChargingRate: number; // A minimi per caricare
readonly maxChargingRate: number; // A massimi del connettore
readonly currentChargingRate: number;
readonly energyDelivered: number; // Wh totali erogati
readonly targetEnergy?: number; // Wh target (se specificato dall'utente)
readonly isEV3Phase: boolean; // Veicolo trifase
}
interface LoadBalancerConfig {
readonly maxSitePowerAmps: number; // A max del trasformatore
readonly reservedBuildingAmps: number; // A riservati per l'edificio
readonly minSessionAmps: number; // A minimi per sessione (tipico: 6A)
readonly rebalanceIntervalSec: number; // Secondi tra ricalcoli (tipico: 30s)
}
interface Allocation {
readonly stationId: string;
readonly evseId: number;
readonly allocatedAmps: number;
readonly phases: number;
}
function calculateChargingAllocations(
sessions: ReadonlyArray<ChargingSession>,
config: LoadBalancerConfig
): ReadonlyArray<Allocation> {
if (sessions.length === 0) return [];
const availableAmps = config.maxSitePowerAmps
- config.reservedBuildingAmps;
// Step 1: ordina per priorità (desc), poi energia erogata (asc = meno carico prima)
const sorted = [...sessions].sort((a, b) => {
if (b.priority !== a.priority) return b.priority - a.priority;
return a.energyDelivered - b.energyDelivered;
});
// Step 2: garantisci potenza minima a tutti
const minRequired = sorted.length * config.minSessionAmps;
if (minRequired > availableAmps) {
// Caso critico: potenza insufficiente, sospendi sessioni a bassa priorità
return sorted
.slice(0, Math.floor(availableAmps / config.minSessionAmps))
.map((s) => ({
stationId: s.stationId,
evseId: s.evseId,
allocatedAmps: config.minSessionAmps,
phases: s.isEV3Phase ? 3 : 1,
}));
}
// Step 3: distribuzione proporzionale ai pesi di priorità
const totalWeight = sorted.reduce(
(sum, s) => sum + (1 + s.priority), 0
);
const remainingAmps = availableAmps - minRequired;
const allocations = sorted.map((session) => {
const weight = (1 + session.priority) / totalWeight;
const bonus = remainingAmps * weight;
const raw = config.minSessionAmps + bonus;
// Applica vincoli min/max del connettore
const allocatedAmps = Math.max(
config.minSessionAmps,
Math.min(session.maxChargingRate, Math.round(raw * 10) / 10)
);
return {
stationId: session.stationId,
evseId: session.evseId,
allocatedAmps,
phases: session.isEV3Phase ? 3 : 1,
};
});
// Step 4: verifica finale che il totale non superi il limite
const total = allocations.reduce((s, a) => s + a.allocatedAmps, 0);
if (total <= availableAmps) return allocations;
// Riscaling proporzionale
const scale = availableAmps / total;
return allocations.map((a) => ({
...a,
allocatedAmps: Math.max(
config.minSessionAmps,
Math.round(a.allocatedAmps * scale * 10) / 10
),
}));
}
Implementacja backendu CSMS z Pythonem i PostgreSQL
Biblioteka Pythona ocpp autor: MobilityHouse (open source, ponad 2000 gwiazdek na GitHub)
i najpopularniejsza implementacja referencyjna dla CSMS. Łączymy bibliotekę z
asyncio, websockets e asyncpg dla PostgreSQL dla
zbuduj backend gotowy do produkcji.
Schemat PostgreSQL dla CSMS
-- Registro stazioni di ricarica
CREATE TABLE charging_stations (
station_id TEXT PRIMARY KEY,
vendor_name TEXT NOT NULL,
model TEXT NOT NULL,
serial_number TEXT,
firmware_version TEXT,
security_profile SMALLINT NOT NULL DEFAULT 1,
last_boot_reason TEXT,
last_seen_at TIMESTAMPTZ,
is_online BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Stato EVSE e connettori
CREATE TABLE evse_status (
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT NOT NULL,
connector_type TEXT, -- cCCS2, cCHAdeMO, cType2, sType3
status TEXT NOT NULL DEFAULT 'Unknown',
error_code TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (station_id, evse_id, connector_id)
);
-- Transazioni di ricarica
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
station_id TEXT NOT NULL REFERENCES charging_stations(station_id),
evse_id SMALLINT NOT NULL,
connector_id SMALLINT,
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'Started',
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
meter_start_wh NUMERIC(12, 3),
meter_end_wh NUMERIC(12, 3),
energy_wh NUMERIC(12, 3) GENERATED ALWAYS AS (meter_end_wh - meter_start_wh) STORED,
stop_reason TEXT,
total_cost_cents INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Meter values (time series - usare TimescaleDB in produzione)
CREATE TABLE meter_values (
id BIGSERIAL PRIMARY KEY,
transaction_id TEXT REFERENCES transactions(transaction_id),
station_id TEXT NOT NULL,
evse_id SMALLINT NOT NULL,
sampled_at TIMESTAMPTZ NOT NULL,
energy_wh NUMERIC(12, 3),
power_w NUMERIC(10, 2),
current_a NUMERIC(8, 3),
voltage_v NUMERIC(8, 2),
soc_pct SMALLINT -- State of Charge da ISO 15118
);
CREATE INDEX idx_meter_values_station_time
ON meter_values(station_id, sampled_at DESC);
CREATE INDEX idx_transactions_station_id
ON transactions(station_id, started_at DESC);
-- Token di autorizzazione (lista locale cache)
CREATE TABLE authorization_cache (
id_token TEXT NOT NULL,
id_token_type TEXT NOT NULL,
status TEXT NOT NULL, -- Accepted, Invalid, Blocked, Expired
group_id TEXT,
expiry_date TIMESTAMPTZ,
cached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id_token, id_token_type)
);
Pełny backend CSMS w Pythonie
import asyncio
import logging
from datetime import datetime, timezone
from typing import Optional, Any
import asyncpg
import websockets
from ocpp.routing import on
from ocpp.v201 import ChargePoint as Cp
from ocpp.v201 import call, call_result
from ocpp.v201.enums import (
Action, RegistrationStatusType, AuthorizationStatusType,
ConnectorStatusType
)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
log = logging.getLogger('csms')
# Pool globale connessioni PostgreSQL
_db_pool: Optional[asyncpg.Pool] = None
async def get_db() -> asyncpg.Pool:
global _db_pool
if _db_pool is None:
_db_pool = await asyncpg.create_pool(
dsn='postgresql://csms:password@localhost/csms_db',
min_size=5,
max_size=20,
)
return _db_pool
class ChargePointHandler(Cp):
"""
Handler OCPP 2.0.1 per una singola Charging Station.
Un'istanza per ogni connessione WebSocket attiva.
"""
@on(Action.boot_notification)
async def on_boot_notification(
self, charging_station: dict, reason: str, **kwargs
) -> call_result.BootNotification:
log.info(
f"Boot: {self.id} | "
f"{charging_station['vendor_name']} {charging_station['model']} "
f"| reason={reason}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO charging_stations
(station_id, vendor_name, model, serial_number,
firmware_version, last_boot_reason, last_seen_at, is_online)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), TRUE)
ON CONFLICT (station_id) DO UPDATE SET
vendor_name = EXCLUDED.vendor_name,
model = EXCLUDED.model,
firmware_version = EXCLUDED.firmware_version,
last_boot_reason = EXCLUDED.last_boot_reason,
last_seen_at = NOW(),
is_online = TRUE
""",
self.id,
charging_station['vendor_name'],
charging_station['model'],
charging_station.get('serial_number'),
charging_station.get('firmware_version'),
reason,
)
return call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=300,
status=RegistrationStatusType.accepted,
)
@on(Action.heartbeat)
async def on_heartbeat(self) -> call_result.Heartbeat:
db = await get_db()
await db.execute(
"UPDATE charging_stations SET last_seen_at = NOW() WHERE station_id = $1",
self.id
)
return call_result.Heartbeat(
current_time=datetime.now(timezone.utc).isoformat()
)
@on(Action.status_notification)
async def on_status_notification(
self, timestamp: str, connector_status: str,
evse_id: int, connector_id: int, **kwargs
) -> call_result.StatusNotification:
log.info(
f"Status: {self.id} EVSE[{evse_id}]"
f"Connector[{connector_id}] = {connector_status}"
)
db = await get_db()
await db.execute(
"""
INSERT INTO evse_status
(station_id, evse_id, connector_id, status, updated_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (station_id, evse_id, connector_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = NOW()
""",
self.id, evse_id, connector_id, connector_status
)
return call_result.StatusNotification()
@on(Action.authorize)
async def on_authorize(
self, id_token: dict, **kwargs
) -> call_result.Authorize:
token = id_token['id_token']
token_type = id_token['type']
log.info(f"Authorize: {self.id} token={token} type={token_type}")
db = await get_db()
# Controlla prima la cache locale
row = await db.fetchrow(
"""
SELECT status, group_id, expiry_date
FROM authorization_cache
WHERE id_token = $1 AND id_token_type = $2
AND (expiry_date IS NULL OR expiry_date > NOW())
""",
token, token_type
)
status = AuthorizationStatusType.invalid
if row and row['status'] == 'Accepted':
status = AuthorizationStatusType.accepted
return call_result.Authorize(
id_token_info={'status': status}
)
@on(Action.transaction_event)
async def on_transaction_event(
self,
event_type: str,
timestamp: str,
trigger_reason: str,
seq_no: int,
transaction_info: dict,
evse: Optional[dict] = None,
id_token: Optional[dict] = None,
meter_value: Optional[list] = None,
**kwargs
) -> call_result.TransactionEvent:
tx_id = transaction_info['transaction_id']
log.info(
f"TransactionEvent: {self.id} {event_type} "
f"tx={tx_id} trigger={trigger_reason}"
)
db = await get_db()
if event_type == 'Started':
await self._handle_tx_started(
db, tx_id, evse, id_token, timestamp, meter_value
)
elif event_type == 'Updated' and meter_value:
await self._handle_tx_updated(db, tx_id, evse, meter_value)
elif event_type == 'Ended':
await self._handle_tx_ended(
db, tx_id, timestamp, transaction_info, meter_value
)
return call_result.TransactionEvent(
total_cost=0,
charging_priority=0,
id_token_info={'status': AuthorizationStatusType.accepted},
)
async def _handle_tx_started(
self, db, tx_id, evse, id_token, timestamp, meter_value
):
evse_id = evse['id'] if evse else 0
connector_id = evse.get('connector_id') if evse else None
token = id_token['id_token'] if id_token else 'unknown'
token_type = id_token['type'] if id_token else 'Local'
meter_start = self._extract_energy(meter_value)
await db.execute(
"""
INSERT INTO transactions
(transaction_id, station_id, evse_id, connector_id,
id_token, id_token_type, state, started_at, meter_start_wh)
VALUES ($1, $2, $3, $4, $5, $6, 'Started', $7, $8)
ON CONFLICT (transaction_id) DO NOTHING
""",
tx_id, self.id, evse_id, connector_id,
token, token_type, timestamp, meter_start
)
async def _handle_tx_updated(self, db, tx_id, evse, meter_value):
evse_id = evse['id'] if evse else 0
energy = self._extract_energy(meter_value)
power = self._extract_power(meter_value)
if energy is not None:
await db.execute(
"""
INSERT INTO meter_values
(transaction_id, station_id, evse_id, sampled_at, energy_wh, power_w)
VALUES ($1, $2, $3, NOW(), $4, $5)
""",
tx_id, self.id, evse_id, energy, power
)
async def _handle_tx_ended(
self, db, tx_id, timestamp, transaction_info, meter_value
):
meter_end = self._extract_energy(meter_value)
stop_reason = transaction_info.get('stopped_reason')
await db.execute(
"""
UPDATE transactions SET
state = 'Ended',
ended_at = $1,
meter_end_wh = $2,
stop_reason = $3
WHERE transaction_id = $4
""",
timestamp, meter_end, stop_reason, tx_id
)
def _extract_energy(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Energy.Active.Import.Register':
return float(sv['value'])
return None
def _extract_power(self, meter_values: Optional[list]) -> Optional[float]:
if not meter_values:
return None
for mv in meter_values:
for sv in mv.get('sampled_value', []):
if sv.get('measurand', '') == 'Power.Active.Import':
return float(sv['value'])
return None
# === Comandi CSMS -> Stazione ===
async def send_remote_start(
self, evse_id: int, id_token: str, limit_amps: float = 32.0
) -> str:
"""Avvia una sessione da remoto su EVSE specificato."""
request = call.RequestStartTransaction(
id_token={'id_token': id_token, 'type': 'Central'},
evse_id=evse_id,
charging_profile={
'id': 999,
'stack_level': 0,
'charging_profile_purpose': 'TxProfile',
'charging_profile_kind': 'Relative',
'charging_schedule': [{
'id': 1,
'charging_rate_unit': 'A',
'charging_schedule_period': [
{'start_period': 0, 'limit': limit_amps}
],
}],
},
)
response = await self.call(request)
log.info(f"RemoteStart {self.id} EVSE{evse_id}: {response.status}")
return response.status
async def send_charging_profile(
self, evse_id: int, profile: dict
) -> str:
"""Imposta un profilo di carica per smart charging."""
request = call.SetChargingProfile(
evse_id=evse_id,
charging_profile=profile
)
response = await self.call(request)
log.info(f"ChargingProfile {self.id} EVSE{evse_id}: {response.status}")
return response.status
# Registry globale delle connessioni attive
_connected_stations: dict[str, ChargePointHandler] = {}
async def on_connect(websocket, path: str):
"""Callback per nuove connessioni WebSocket OCPP."""
station_id = path.strip('/').split('/')[-1]
if not station_id:
await websocket.close(1008, 'Missing station ID')
return
log.info(f"Connessione da: {station_id} | path={path}")
cp = ChargePointHandler(station_id, websocket)
_connected_stations[station_id] = cp
try:
await cp.start()
except websockets.exceptions.ConnectionClosed as e:
log.info(f"Disconnesso: {station_id} code={e.code}")
except Exception as e:
log.error(f"Errore: {station_id} - {e}")
finally:
_connected_stations.pop(station_id, None)
db = await get_db()
await db.execute(
"UPDATE charging_stations SET is_online = FALSE WHERE station_id = $1",
station_id
)
async def main():
await get_db() # Inizializza pool DB
log.info("CSMS OCPP 2.0.1 avviato")
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp2.0.1'],
# TLS: aggiungere ssl=ssl_context per Security Profile 2-3
ping_interval=60,
ping_timeout=30,
max_size=1_048_576, # 1MB max message size
)
log.info("In ascolto su ws://0.0.0.0:9000/ocpp/{stationId}")
await server.wait_closed()
if __name__ == '__main__':
asyncio.run(main())
ISO 15118 oraz Podłącz i ładuj
ISO15118 definiuje komunikację wysokiego poziomu pomiędzy pojazdami elektryczny (EV) i stacja ładowania (EVSE) poprzez komunikację Power Line (PLC) na kablu ładowania DC (CCS). OCPP 2.0.1 natywnie integruje ISO 15118 poprzez blok funkcjonalny M, umożliwiający Podłącz i ładuj: pojazd automatycznie uwierzytelnia się za pomocą certyfikatu cyfrowego X.509, bez RFID lub aplikacja mobilna.
Architektura PKI V2G (Vehicle-to-Grid).
System certyfikatów dla Plug & Charge opiera się na PKI (Public Key Infrastruktura) szczegółowa hierarchia mobilności elektrycznej:
V2G Root CA (Root of Trust - gestita da OEM o eMSP)
|
+-- V2G Intermediate CA
| |
| +-- EVSE Certificate (installato nella stazione)
| CN = EVSE-IT-MIL-001
|
+-- eMobility Service Provider CA (eMSP)
|
+-- Contract Certificate (installato nel veicolo)
CN = IT.CPO.000001234 (eMAID - e-Mobility Account Identifier)
SubjectAltName = eMAID:IT.CPO.000001234
Kompletny proces podłączania i ładowania
EV EVSE CSMS eMSP
| | | |
|-- Plug cavo DC --> | | |
| | | |
|<= ISO 15118-2 TLS =>| (PLC sul cavo SLAC) | |
| | | |
|-- ContractCert --->| | |
| (eMAID, X.509) | | |
| |--- Authorize req ----->| |
| | idToken.type=eMAID | |
| | |--- OCPI check --->|
| | |<-- Contract OK --|
| | | |
| |<-- Authorize.conf ----| |
| | status: Accepted | |
| | | |
|<= Charging Start ==>| | |
| | | |
| EV invia target | | |
|-- EnergyRequest --->| | |
| SoC: 45% | | |
| Target: 80% | | |
| Departure: 18:30 | | |
| |--- TransactionEvent -->| |
| | ISO15118Trigger | |
| | | |
| |<-- SetChargingProfile--| |
|<= Schedule via PLC =>| | |
Status ISO 15118 w produkcji (2026)
- ISO 15118-2: Plug & Charge AC/DC – szeroko obsługiwane przez ładowarki HPC DC (Ionity, Fastned, Tesla Supercharger V3)
- ISO 15118-20: Dwukierunkowa obsługa V2G – gotowa obsługa sprzętu, wdrożenie oprogramowania w latach 2025–2026
- Wymóg AFIR: Wszystkie nowe stacje obsługujące V2G muszą obsługiwać normę ISO 15118 od 2026 r
- Zgodność z AFIR 2027: Każda ładowarka zainstalowana po 01.01.2027 musi być gotowa do inteligentnego ładowania
- Prawdziwe V2G we Włoszech: pierwsi piloci z Enel X Way i Nissan Leaf w standardzie V2H (Vehicle-to-Home)
Skalowalna architektura: od 10 do 100 000 punktów ładowania
Korporacyjny system CSMS musi obsłużyć od kilkudziesięciu do setek tysięcy połączeń Konkurencyjne WebSockety. Architektura ewoluuje etapami, dodając dodatkowe komponenty pojawiają się one w różnej skali.
Faza 1: Mała skala (10-500 stacji)
+------------------+ WebSocket/OCPP +------------------+
| Charging |------------------------| CSMS Monolitico |
| Stations (10-500)| wss://csms:9000/ocpp | Python/asyncio |
+------------------+ | Port 9000 |
+--------+---------+
|
+-------+-------+
| PostgreSQL |
| Redis (cache) |
+---------------+
Stack: Python asyncio + PostgreSQL + Redis
Deployment: 1 VM (4 vCPU, 8GB RAM), 1 DB managed
Costo: ~$200/mese
Faza 2: Średnia skala (500-10 000 stacji)
+------------+ +------------------+ +--------------+
| Load | | WS Gateway #1 | | Message |
| Balancer +---->| (asyncio CSMS) +---->| Broker |
| (HAProxy) | | Max 2000 conn | | (RabbitMQ) |
| | +------------------+ | |
| Sticky +---->| WS Gateway #2 +---->| |
| Sessions | | (asyncio CSMS) | +--------------+
| | +------------------+ |
+------------+ +------+------+
| Business |
| Services |
| (FastAPI) |
+------+------+
|
+----------+----------+
| PostgreSQL (HA) |
| TimescaleDB |
| Redis Cluster |
+---------------------+
Sticky sessions: basate su station_id nel path URL
Cross-node ops: Redis pub/sub per inviare comandi alle stazioni
Costo: ~$2.000/mese (K8s managed)
Faza 3: Duża skala (10 000–100 000 stacji)
Global Load Balancer (Anycast)
|
+---------------+---------------+
| |
Region EU-WEST Region EU-SOUTH
+------------------+ +------------------+
| WS Gateway Pool | | WS Gateway Pool |
| (50 pods, 2000 | | (30 pods) |
| conn each = 100K)| +--------+---------+
+--------+---------+ |
| |
+-------------+----------------+
|
+-------+--------+
| Apache Kafka |
| (12 partitions)|
| per topic |
+-------+--------+
|
+-----------------+------------------+
| | |
+------+------+ +-------+------+ +--------+------+
| Transaction | | Smart | | Device |
| Service | | Charging Svc | | Mgmt Svc |
| (10 replicas)| | (5 replicas) | | (3 replicas) |
+------+------+ +-------+------+ +--------+------+
| | |
+--------+--------+------------------+
|
+--------+--------+
| PostgreSQL |
| Citus (sharding) |
| Shard key: |
| station_id hash |
+--------+---------+
|
+--------+--------+
| TimescaleDB |
| (meter values) |
+--------+--------+
Kafka Topics:
- ocpp.boot-notification (chiave: station_id)
- ocpp.transaction-events (chiave: transaction_id)
- ocpp.meter-values (chiave: station_id)
- ocpp.status-notifications (chiave: station_id)
- csms.commands (chiave: station_id)
Throughput target: 1M messaggi/ora, latenza P99 < 200ms
Costo: ~$30.000/mese (multi-region Kubernetes)
Zarządzanie połączeniami między węzłami za pomocą Redis
W przypadku wdrożenia wielowęzłowego CSMS musi wiedzieć, która brama jest włączona stację do wysyłania poleceń (SetChargingProfile, RemoteStart itp.). Redis pub/sub rozwiązuje problem:
import json
import asyncio
import redis.asyncio as aioredis
redis_client = aioredis.from_url(
'redis://redis-cluster:6379',
encoding='utf-8',
decode_responses=True
)
# Registra il nodo della connessione
async def register_connection(station_id: str, gateway_id: str):
await redis_client.setex(
f"csms:gateway:{station_id}",
value=gateway_id,
time=600 # TTL: 10 minuti, rinnovato a ogni heartbeat
)
# Pubblica un comando verso una stazione (qualunque nodo sia)
async def publish_command(station_id: str, action: str, payload: dict):
channel = f"csms:commands:{station_id}"
await redis_client.publish(channel, json.dumps({
'action': action,
'payload': payload
}))
# Su ogni nodo gateway: ascolta i comandi per le stazioni connesse
async def listen_for_commands(connected_stations: dict):
pubsub = redis_client.pubsub()
# Sottoscrivi ai canali delle stazioni connesse a questo nodo
async def subscribe_station(station_id: str):
await pubsub.subscribe(f"csms:commands:{station_id}")
async for message in pubsub.listen():
if message['type'] != 'message':
continue
station_id = message['channel'].split(':')[-1]
cp = connected_stations.get(station_id)
if not cp:
continue # Stazione non su questo nodo, ignora
cmd = json.loads(message['data'])
try:
if cmd['action'] == 'SetChargingProfile':
await cp.send_charging_profile(
cmd['payload']['evse_id'],
cmd['payload']['profile']
)
elif cmd['action'] == 'RemoteStart':
await cp.send_remote_start(
cmd['payload']['evse_id'],
cmd['payload']['id_token'],
)
except Exception as e:
log.error(f"Errore esecuzione comando {cmd['action']}: {e}")
Monitorowanie, metryki i pulpit nawigacyjny Grafana
CSMS w środowisku produkcyjnym wymaga kompleksowego systemu obserwowalności. Metryki kluczowe do monitorowania dotyczą stanu infrastruktury, jakości wydajność serwisową i operacyjną.
Kluczowe wskaźniki operacyjne
| Metryczny | Formuła/źródło | Docelowa umowa SLA | Próg alertu |
|---|---|---|---|
| Dostępność stacji | Stacje online / Tachimetry x 100 | >= 99% | < 95% |
| Opóźnienie wiadomości OCPP P99 | Czas połączenia -> WYNIK POŁĄCZENIA (95. percentyl) | < 2s | > 5 s |
| Wskaźnik powodzenia transakcji | TX zakończona / TX rozpoczęta x 100 | >= 98% | < 95% |
| Dostarczona energia (kWh/godz.) | Suma wartości licznika na razie | Wartość bazowa +10% | < wartość bazowa -20% |
| Współczynnik odrzucenia uwierzytelnienia | Nieprawidłowe uwierzytelnienie / całkowite uwierzytelnienie x 100 | < 2% | > 10% (możliwy atak) |
| Ponowne połączenia WebSocket/godz | Licznik nowych połączeń na stację | < 2/godzinę/stację | > 10/godzinę/stację |
| Zgodność z inteligentnym ładowaniem | Rzeczywista moc vs ustawiony profil | +/- 5% | Odchylenie > 15% |
| Certyfikowane dni ważności | Dni do wygaśnięcia certyfikatów TLS | > 30 dni | < 30 dni (alarm o odnowieniu) |
Prometheus Exporter dla CSMS
from prometheus_client import (
Counter, Gauge, Histogram, start_http_server
)
# Metriche Prometheus
OCPP_MESSAGES_TOTAL = Counter(
'ocpp_messages_total',
'Numero totale messaggi OCPP processati',
['action', 'direction', 'status'] # direction: inbound/outbound
)
OCPP_MESSAGE_DURATION = Histogram(
'ocpp_message_duration_seconds',
'Latenza elaborazione messaggi OCPP',
['action'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
STATIONS_CONNECTED = Gauge(
'csms_stations_connected_total',
'Numero stazioni connesse al CSMS'
)
ACTIVE_TRANSACTIONS = Gauge(
'csms_active_transactions_total',
'Numero transazioni di ricarica attive'
)
ENERGY_DELIVERED_WH = Counter(
'csms_energy_delivered_wh_total',
'Energia totale erogata in Wh',
['station_id']
)
AUTH_RESULTS = Counter(
'csms_authorization_results_total',
'Risultati delle autorizzazioni OCPP',
['status'] # Accepted, Invalid, Blocked, Expired
)
SMART_CHARGING_EVENTS = Counter(
'csms_smart_charging_events_total',
'Operazioni smart charging',
['action', 'result']
)
def start_metrics_server(port: int = 8001):
"""Avvia il server HTTP Prometheus su porta specificata."""
start_http_server(port)
log.info(f"Prometheus metrics su http://0.0.0.0:{port}/metrics")
# Decorator per misurare latenza handler
import time
import functools
def track_ocpp_handler(action: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='success'
).inc()
return result
except Exception as e:
OCPP_MESSAGES_TOTAL.labels(
action=action, direction='inbound', status='error'
).inc()
raise
finally:
OCPP_MESSAGE_DURATION.labels(action=action).observe(
time.monotonic() - start
)
return wrapper
return decorator
// Pannelli principali per dashboard Grafana CSMS
// 1. Stazioni online (Gauge)
{
"title": "Stazioni Connesse",
"type": "gauge",
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "red", "value": 0 },
{ "color": "yellow", "value": 90 },
{ "color": "green", "value": 99 }
]
}
}
},
"targets": [{
"expr": "csms_stations_connected_total / csms_stations_registered_total * 100",
"legendFormat": "Availability %"
}]
}
// 2. Latenza messaggi OCPP P99 (Time Series)
{
"title": "OCPP Message Latency P99",
"type": "timeseries",
"targets": [{
"expr": "histogram_quantile(0.99, rate(ocpp_message_duration_seconds_bucket[5m]))",
"legendFormat": "P99 - {{action}}"
}]
}
// 3. Energia erogata (Stat panel)
{
"title": "Energia Totale Erogata oggi (kWh)",
"type": "stat",
"targets": [{
"expr": "increase(csms_energy_delivered_wh_total[24h]) / 1000",
"legendFormat": "kWh"
}]
}
// 4. Auth rejection rate - alert su attacchi
{
"title": "Rejection Rate Autorizzazioni (%)",
"type": "timeseries",
"targets": [{
"expr": "rate(csms_authorization_results_total{status='Invalid'}[5m]) / rate(csms_authorization_results_total[5m]) * 100"
}]
}
Przepisy włoskie i europejskie: AFIR, PNIRE, PNRR
Infrastruktura ładowania pojazdów elektrycznych we Włoszech i w Europie podlega ścisłym regulacjom. Zgodność z przepisami nie jest opcjonalna: dotyczy zarówno wymagań technicznych stacji (metrologia prawna, dostępność, płatności) i standardy komunikacji.
AFIR (rozporządzenie w sprawie infrastruktury paliw alternatywnych – UE 2023/1804)
AFIR wszedł w życie w kwietniu 2024 r. i określa dokładny harmonogram obowiązki dotyczące infrastruktury ładowania w sieciach TEN-T i na obszarach miejskich:
| Wygaśnięcie | Wymóg | Możliwość zastosowania |
|---|---|---|
| 31 grudnia 2025 r | Stacja >= 150 kW co 60 km w sieci bazowej TEN-T | Cała UE |
| 31 grudnia 2027 r | Stacja >= 150 kW co 60 km Sieć TEN-T Kompleksowa | Cała UE |
| 14 kwietnia 2025 r | Bezpłatne dane statyczne i dynamiczne (lokalizacja, typ złącza, dostępność) | Stacje publiczne |
| 01 stycznia 2027 r | Gotowe do inteligentnego ładowania dla nowych/remontowanych stacji > 22 kW | Stacje publiczne |
| Natychmiastowy | Płatność doraźna bez abonamentu (bezstykowa karta bankowa) | Stacje publiczne > 50 kW |
| 2026+ | ISO 15118 dla stacji obsługujących V2G | Stacje dwukierunkowe |
PNIRE i PNRR dla Włoch
We Włoszech wdrażanie AFIR odbywa się za pomocą dwóch głównych narzędzi:
- PNIRE (Krajowy plan infrastruktury ładowania energii elektrycznej): zarządzany przez MASE (Ministerstwo Środowiska i Bezpieczeństwa Energetycznego), definiuje cele krajowe: 13 755 stacji publicznych w 2025 r., skupienie się na sieci drogowej i obszarach miejskich
- PNRR Misja 2, Inwestycja 4.3: ponad 700 milionów euro przeznaczone do ładowania dużą mocą (HPC >= 150 kW) na autostradach i obszarach obsługa. Według MEMORY PNRR przeznaczył łącznie 12,7 miliarda euro, z czego nadal częściowe wykorzystanie
- Zaproszenie MASE 2024-2025: zachęty dla prywatnych operatorów, którzy instalować infrastrukturę na obszarach o niskiej gęstości ładowania (południowe Włochy, obszary wiejskie). Dotacja do 60% kosztów instalacji
Sytuacja we Włoszech: 73 000 punktów ładowania w 2025 r
Na dzień 31 grudnia 2025 r. liczą się Włochy Ponad 73 000 publicznych punktów ładowania (+18% w porównaniu z 2024 r.), przy 93% zasięgu terytorialnym kraju. Spośród nich: około 12 000 to punkty szybkiego ładowania (> 22 kW), a 5 000 to punkty HPC (>= 150 kW). Regionem z największą infrastrukturą jest Lombardia (23%), a następnie z Lacjum (12%) i Toskanii (9%). Południe pozostaje obszarem priorytetowym dla m.in finansowanie PNRR w celu uzupełnienia luki do 2026 r.
Obowiązki techniczne dotyczące oprogramowania CSMS
- OCPI (interfejs otwartego punktu ładowania): obowiązkowy protokół dla roamingu pomiędzy CPO (operatorem punktu ładowania) a eMSP (dostawcą usług e-mobilności). Zalecana wersja OCPI 2.2.1
- Metrologia prawna: w Niemczech (Eichrecht) i stopniowo w UE (MID – dyrektywa w sprawie przyrządów pomiarowych) systemy pomiarowe muszą być certyfikowane, a odczyty muszą być przejrzyste i niezmienne przez użytkownika
- RODO: dane sesji ładowania (RFID, lokalizacja, godziny) są danymi osobowymi. Wymaga polityki prywatności, minimalizacji danych i prawa do bycia zapomnianym
- CDR (szczegółowe zapisy dotyczące opłat): musi być zgodny z formatem OCPI dla interoperacyjnego fakturowania i przechowywany przez co najmniej 5 lat (włoskie obowiązki podatkowe)
Studium przypadku: Włoska sieć ładowania obejmująca ponad 50 stacji
Przyjrzyjmy się architekturze prawdziwego systemu CSMS dla włoskiego operatora, który zarządza 50 stacjami ładowania DC (22-150 kW) rozmieszczonymi na parkingach miejskich i centra handlowe w 3 regionach.
Wymagania operacyjne
- 50 stacji, łącznie 150 EVSE (średnio 3 EVSE na stację), 300 złączy (CCS2 + Type2)
- Szczyt dzienny: 400-600 sesji ładowania (7-9, 12-14, 17-21)
- Inteligentne ładowanie: zgodność z limitem 200A na stację, integracja z SEM (Site Energy Manager)
- Multi-tenant: 3 CPO z częściową widocznością stacji, roaming OCPI z Enel
- SLA dostępności: 99,5% miesięcznie na stację, 99,9% na backend CSMS
Wybrana architektura
+------------------+ +------------------+ +------------------+
| 50 Stazioni | | HAProxy | | CSMS Primary |
| OCPP 2.0.1 |---->| (WS sticky sess.)|---->| Python asyncio |
| Security Profile 2| | Port 443 (TLS) | | 2 replicas |
+------------------+ +------------------+ +--------+---------+
|
+------------------+ +--------+---------+
| CSMS Worker |<----| Redis Cluster |
| (FastAPI REST) | | (stato sessioni) |
| Dashboard, API | +------------------+
+------------------+
|
+-----------+----------+
| |
+-------+-------+ +---------+------+
| PostgreSQL 16 | | TimescaleDB |
| (transazioni, | | (meter values)|
| auth, device | | 2TB/anno est. |
| model, CDR) | +---------------+
+---------------+
Monitoring: Prometheus + Grafana Cloud
Alerting: PagerDuty (P1: stazione offline >5min, P2: CSMS latency >2s)
CDR/Billing: integrazione ERP via webhook PostgreSQL NOTIFY
Rzeczywiste wskaźniki operacyjne (typowy miesiąc)
| Metryczny | Wartość | Notatki |
|---|---|---|
| Sesje/miesiąc | ~ 14 000 | Średnio 280 sesji dziennie |
| Dostarczona energia | ~85 000 kWh/miesiąc | Średnio 6 kWh/sesję |
| Średnia dostępność stacji | 99,3% | 0,7% przestojów = ~5 godzin/miesiąc na stację |
| Współczynnik akceptacji autoryzacji | 96,8% | 3,2% odrzuconych = wygasło lub nie zostało zarejestrowane |
| Opóźnienie OCPP P95 | 180 ms | Zawiera stację LTE w obie strony |
| Inteligentne zdarzenia ładowania/dzień | ~1200 | Średnio 24 ponowne salda na godzinę |
| Maksymalna skuteczność golenia | 92% | 92% mocy czasowej <= ustawiony limit |
Bezpieczeństwo CSMS: OWASP i model zagrożeń
CSMS to infrastruktura krytyczna: kompromis może spowodować zakłócenia ładowania, manipulacji rachunkami lub ataków na sieć elektryczną poprzez ładowanie obciążeń. Główne zagrożenia można zidentyfikować na podstawie: konkretny model zagrożenia.
Model zagrożeń CSMS
| Zagrożenie | Wektor | Uderzenie | Łagodzenie |
|---|---|---|---|
| Nieautoryzowana stacja | Połączenie ze skradzionymi danymi uwierzytelniającymi | Wstrzykiwanie fałszywych danych, oszukańcza konsumpcja | Profil bezpieczeństwa 3 (mTLS), certyfikowane przypinanie |
| Człowiek pośrodku | Przechwytywanie WS w niezabezpieczonych sieciach | Przechwytywanie tokenów RFID, manipulacja poleceniami | TLS 1.3 obowiązkowy, certyfikowana przejrzystość |
| Powtórz atak | Retransmisja przechwyconych komunikatów OCPP | Podwójne rozliczenia, nieprawidłowe uprawnienia | Unikalny MessageId, weryfikacja znacznika czasu, nonce |
| WebSocket DDoS | Zalew połączeń lub wiadomości | CSMS nieosiągalny, DoS infrastruktury | Ograniczanie szybkości, ograniczanie połączenia, WAF |
| Wstrzykiwanie SQL za pośrednictwem ładunku OCPP | Ładunek OCPP z ładunkiem SQL w polu idToken | Eksfiltracja bazy danych, eskalacja uprawnień | Gotowe zestawienia, ORM, walidacja danych wejściowych |
| Klonowanie RFID | Klonowanie legalnych kart RFID | Sesje opłacane przez innych użytkowników | ISO 15118 P&C, biała lista RFID, wykrywanie anomalii |
| Złośliwe oprogramowanie sprzętowe | Aktualizacja oprogramowania sprzętowego ze złośliwym oprogramowaniem | Fizyczna kontrola stacji, manipulacja siecią | Cyfrowy podpis oprogramowania sprzętowego, bezpieczny rozruch, SBOM |
Hartowanie CSMS: lista kontrolna bezpieczeństwa
import ssl
import re
from functools import wraps
# 1. Configurazione TLS sicura (Security Profile 2-3)
def create_tls_context(
certfile: str,
keyfile: str,
cafile: str,
require_client_cert: bool = False
) -> ssl.SSLContext:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
if require_client_cert: # Security Profile 3 (mTLS)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=cafile)
# Disabilita cipher suite deboli
ctx.set_ciphers(
'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:!aNULL:!eNULL:!LOW:!EXPORT'
)
return ctx
# 2. Rate limiting per connessioni WebSocket
from collections import defaultdict
import time
_connection_attempts: dict[str, list[float]] = defaultdict(list)
MAX_CONN_PER_MINUTE = 10
def check_rate_limit(client_ip: str) -> bool:
"""Ritorna True se il client può connettersi, False se throttled."""
now = time.monotonic()
window = _connection_attempts[client_ip]
# Rimuovi tentativi più vecchi di 60 secondi
_connection_attempts[client_ip] = [
t for t in window if now - t < 60
]
if len(_connection_attempts[client_ip]) >= MAX_CONN_PER_MINUTE:
log.warning(f"Rate limit superato per IP: {client_ip}")
return False
_connection_attempts[client_ip].append(now)
return True
# 3. Validazione MessageId per prevenire replay attack
_seen_message_ids: set[str] = set()
_message_id_pattern = re.compile(r'^[a-zA-Z0-9\-_\.]{1,36}






