Rurociąg telematyczny: przetwarzanie danych UBI na dużą skalę
Do 2025 roku ponad 21 milionów amerykańskich kierowców będzie udostępniać swoim użytkownikom dane telematyczne ubezpieczenie. W Europie AXA wprowadziła na rynki rozwiązanie UBI oparte na sztucznej inteligencji w styczniu 2025 r Europejczyków, która integruje system OBD-II, telematykę opartą na smartfonach i modele przewidywania ryzyka. Rynek UBI (Usage-Based Insurance) będzie wart 62,6 miliarda dolarów w 2025 roku i będzie rósł w tempie CAGR na poziomie 24,8% do 2035 r.
Telematyka ubezpieczeniowa to nie tylko gromadzenie danych GPS: to złożony proces, który ma na celu przekształca surowe sygnały z urządzeń OBD-II, smartfonów i czujników IoT w indywidualne oceny ryzyka które określają składkę każdego indywidualnego ubezpieczonego. Zły kierowca płaci więcej, dobry kierowca oszczędza. Zasada jest prosta; wdrożenie techniczne i wszystko inne.
W tym artykule budujemy kompletny potok telematyczny: od pozyskiwania danych System OBD-II umożliwia inżynierię zachowania kierowcy, od oceny ryzyka w czasie rzeczywistym po dynamiczne ceny, uwzględniające skalowalność, opóźnienia i prywatność.
Czego się nauczysz
- Architektura rurociągu telematycznego dla UBI (Pay-How-You-Drive)
- Protokół OBD-II: parametry, częstotliwości próbkowania, przetwarzanie krawędzi
- Inżynieria funkcji uwzględniająca zachowanie podczas jazdy (mocne hamowanie, pokonywanie zakrętów, nadmierna prędkość)
- Przetwarzanie strumieniowe za pomocą Apache Kafka i Apache Flink
- Ocena ryzyka w czasie rzeczywistym i agregacje czasowe
- Dynamiczna wycena oparta na wynikach za jazdę
- Uwagi dotyczące RODO dotyczące danych geolokalizacyjnych
1. Rodzaje telematyki ubezpieczeniowej
Istnieją trzy główne modele gromadzenia danych telematycznych, różniące się między sobą różnymi kompromisami dokładność, koszt wdrożenia i przyjęcia przez klienta:
| Typ | Urządzenie | Zebrane dane | Dokładność | Koszt |
|---|---|---|---|---|
| Klucz sprzętowy OBD-II | Wtyczka do portu OBD | ECU, prędkość, obroty, przyspieszenie | Wysoki | 20-50 € sprzętu |
| Aplikacja na smartfona | Aplikacje na iOS/Androida | GPS, akcelerometr, żyroskop | Przeciętny | Zero sprzętu |
| Czarna skrzynka OEM | Zainstalowane urządzenie OEM | ECU + GPS + akcelerometr | Bardzo wysoki | 100-300 euro |
| Interfejs API połączonego samochodu | Natywny interfejs API pojazdu | Cała natywna telematyka | Maksymalny | Umowa OEM |
2. Struktura danych OBD-II
Protokół OBD-II (On-Board Diagnostics II) udostępnia dziesiątki parametrów ECU za pośrednictwem PID (Identyfikatory parametrów). Do punktacji ryzyka ubezpieczeniowego najbardziej odpowiednimi identyfikatorami PID są:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class OBDReading:
"""Singola lettura OBD-II dal veicolo."""
device_id: str # ID univoco del dispositivo
policy_id: str # Polizza associata
timestamp: datetime # UTC timestamp
latitude: float # Gradi decimali (WGS84)
longitude: float # Gradi decimali (WGS84)
speed_kmh: float # Velocita in km/h (PID 0x0D)
rpm: int # Giri per minuto (PID 0x0C)
throttle_pct: float # Posizione acceleratore % (PID 0x11)
engine_load_pct: float # Carico motore % (PID 0x04)
coolant_temp_c: int # Temperatura liquido (PID 0x05)
# Calcolati lato edge (non nativi OBD)
acceleration_ms2: Optional[float] = None # m/s^2 (+/-)
heading_deg: Optional[float] = None # Direzione 0-360
road_type: Optional[str] = None # "URBAN", "RURAL", "HIGHWAY"
@dataclass
class TripSummary:
"""Riepilogo di un singolo viaggio."""
trip_id: str
policy_id: str
device_id: str
start_time: datetime
end_time: datetime
distance_km: float
duration_minutes: float
# Metrics comportamentali (calcolati dalla pipeline)
max_speed_kmh: float
avg_speed_kmh: float
hard_braking_count: int # Decelerazioni > 0.3g
hard_acceleration_count: int # Accelerazioni > 0.3g
hard_cornering_count: int # Laterale > 0.3g
speeding_pct: float # % tempo sopra limite
night_driving_pct: float # % km notturni (22:00-06:00)
highway_pct: float # % km autostrada
# Score aggregati
safety_score: float # 0-100 (100 = ottimo)
fuel_efficiency_score: float # 0-100
3. Przetwarzanie brzegowe na urządzeniu
Surowe dane OBD-II muszą zostać wstępnie przetworzone na krawędzi przed wysłaniem do chmury. Zmniejsza to niezbędną szerokość pasma i oblicza dyskretne zdarzenia (gwałtowne hamowanie, przekroczenie prędkości) w czasie rzeczywistym. Procesor brzegowy zazwyczaj działa na mikrokontrolerze ARM w urządzeniu OBD:
from collections import deque
from dataclasses import dataclass
import math
class EdgeEventDetector:
"""
Algoritmo di detection eventi di guida.
Gira sul dispositivo embedded (ARM Cortex-M).
Sampling rate: 10 Hz (una lettura ogni 100ms)
"""
HARD_BRAKING_THRESHOLD = -3.0 # m/s^2 (0.3g)
HARD_ACCEL_THRESHOLD = 3.0 # m/s^2 (0.3g)
HARD_CORNERING_THRESHOLD = 3.0 # m/s^2 laterale
def __init__(self, window_size: int = 5):
self.speed_history = deque(maxlen=window_size)
self.events: list = []
self.prev_timestamp = None
def process_reading(self, reading: dict) -> list:
"""
Processa una singola lettura e rileva eventi.
Restituisce lista di eventi rilevati (vuota se nessuno).
"""
detected = []
current_speed = reading["speed_kmh"]
current_ts = reading["timestamp"]
# Calcola accelerazione longitudinale
if self.speed_history and self.prev_timestamp:
prev_speed = self.speed_history[-1]
dt = (current_ts - self.prev_timestamp).total_seconds()
if dt > 0:
# Conversione km/h -> m/s
delta_v = (current_speed - prev_speed) / 3.6
acceleration = delta_v / dt
if acceleration <= self.HARD_BRAKING_THRESHOLD:
detected.append({
"type": "HARD_BRAKING",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
elif acceleration >= self.HARD_ACCEL_THRESHOLD:
detected.append({
"type": "HARD_ACCELERATION",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
self.speed_history.append(current_speed)
self.prev_timestamp = current_ts
return detected
def detect_speeding(
self,
speed_kmh: float,
speed_limit_kmh: float,
tolerance_pct: float = 0.10
) -> bool:
"""Rileva superamento limite con tolleranza del 10%."""
threshold = speed_limit_kmh * (1 + tolerance_pct)
return speed_kmh > threshold
class EdgeBatchManager:
"""
Gestisce il buffering e l'invio batch al cloud.
Strategia: invia ogni 30 secondi o al termine del viaggio.
"""
BATCH_INTERVAL_SECONDS = 30
MAX_BATCH_SIZE = 300 # max readings per batch
def __init__(self, uploader):
self.buffer: list = []
self.events: list = []
self.last_send = None
self.uploader = uploader
def add_reading(self, reading: dict, events: list) -> None:
self.buffer.append(reading)
self.events.extend(events)
if len(self.buffer) >= self.MAX_BATCH_SIZE:
self._flush_async()
def _flush_async(self) -> None:
if not self.buffer:
return
payload = {
"readings": list(self.buffer),
"events": list(self.events),
"compressed": True
}
# Invia compresso (gzip) via MQTT o HTTPS
self.uploader.send(payload)
self.buffer.clear()
self.events.clear()
4. Potok przesyłania strumieniowego za pomocą Apache Kafka i Apache Flink
Potok w chmurze odbiera partie z brzegu i przesyła je strumieniowo do obliczeń podsumowania podróży, zbiorcze wydarzenia w oknach czasowych i aktualizację ocen ryzyka w najbliższej przyszłości w czasie rzeczywistym. Architektura oparta jest na Apache Kafka jako magistrali komunikatów i Apache Flink przetwarzanie strumieniowe:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, Types
import json
from datetime import datetime
def build_telematics_pipeline():
"""
Pipeline Flink per processing dati telematici.
Topic Kafka:
- telemetry.raw : letture OBD grezze
- telemetry.events : eventi discreti (hard braking, etc.)
- telemetry.trips : trip summaries
- risk.scores : driving scores aggiornati
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 1. Source: letture OBD-II raw
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("telemetry.raw")
.set_group_id("telematics-processor")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
raw_stream = env.from_source(
kafka_source,
WatermarkStrategy.for_monotonous_timestamps(),
"OBD Raw Source"
)
# 2. Parse e validazione
parsed_stream = raw_stream.map(lambda s: json.loads(s)) \
.filter(lambda r: validate_reading(r))
# 3. Key by policy_id per partitioning
keyed_stream = parsed_stream.key_by(lambda r: r["policy_id"])
# 4. Trip detection tramite session windows
# Una sessione si chiude dopo 5 minuti di inattivita
trip_stream = keyed_stream \
.window(SessionWindows.with_gap(Time.minutes(5))) \
.apply(TripAggregator())
# 5. Feature engineering per ogni trip
features_stream = trip_stream.map(lambda t: extract_features(t))
# 6. Aggiorna driving score (aggregazione 90 giorni rolling)
score_stream = features_stream \
.key_by(lambda f: f["policy_id"]) \
.window(TumblingEventTimeWindows.of(Time.days(1))) \
.reduce(RollingScoreReducer())
# 7. Sink: aggiorna Redis e pubblica evento
score_stream.add_sink(RedisSink())
score_stream.add_sink(KafkaSink("risk.scores"))
env.execute("Telematics Processing Pipeline")
def validate_reading(reading: dict) -> bool:
"""Validazione basica di una lettura OBD."""
required_fields = ["device_id", "policy_id", "timestamp", "speed_kmh"]
if not all(f in reading for f in required_fields):
return False
if not (0 <= reading["speed_kmh"] <= 300):
return False
if "latitude" in reading and not (-90 <= reading["latitude"] <= 90):
return False
return True
def extract_features(trip: dict) -> dict:
"""
Estrae feature per risk scoring da un trip summary.
Output: feature vector per il modello ML.
"""
readings = trip["readings"]
events = trip["events"]
duration_sec = trip["duration_seconds"]
hard_braking = [e for e in events if e["type"] == "HARD_BRAKING"]
hard_accel = [e for e in events if e["type"] == "HARD_ACCELERATION"]
speeding = [e for e in events if e["type"] == "SPEEDING"]
return {
"policy_id": trip["policy_id"],
"trip_id": trip["trip_id"],
"trip_date": trip["start_time"][:10],
# Feature comportamentali (normalizzate per 100km)
"hard_braking_per_100km": len(hard_braking) / max(trip["distance_km"], 0.1) * 100,
"hard_accel_per_100km": len(hard_accel) / max(trip["distance_km"], 0.1) * 100,
"speeding_events_per_100km": len(speeding) / max(trip["distance_km"], 0.1) * 100,
# Feature temporali
"night_driving_ratio": compute_night_ratio(readings),
"rush_hour_ratio": compute_rush_hour_ratio(readings),
"weekend_ratio": compute_weekend_ratio(readings),
# Feature cinematiche
"avg_speed_kmh": trip.get("avg_speed_kmh", 0),
"max_speed_kmh": trip.get("max_speed_kmh", 0),
"avg_acceleration": compute_avg_acceleration(readings),
# Feature contesto
"highway_ratio": trip.get("highway_pct", 0) / 100,
"urban_ratio": trip.get("urban_pct", 0) / 100,
"distance_km": trip["distance_km"],
"duration_hours": duration_sec / 3600
}
def compute_night_ratio(readings: list) -> float:
"""Calcola frazione dei km percorsi di notte (22:00-06:00)."""
if not readings:
return 0.0
night_readings = [
r for r in readings
if int(r["timestamp"][11:13]) >= 22 or int(r["timestamp"][11:13]) < 6
]
return len(night_readings) / len(readings)
5. Model punktacji ryzyka: od zachowania kierowcy do nagród
Sercem systemu UBI jest model, który przekłada cechy zachowania na wynik jazdy a następnie na współczynnik korekty składki. Najczęściej używanym modelem w branży jest Zespół XGBoost + zasady biznesowe:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
import xgboost as xgb
from typing import NamedTuple
class DrivingScore(NamedTuple):
"""Driving score aggregato su finestra rolling 90 giorni."""
policy_id: str
score: float # 0-100 (100 = guida eccellente)
percentile: float # Percentile rispetto alla flotta
category: str # "EXCELLENT", "GOOD", "FAIR", "POOR", "DANGEROUS"
premium_factor: float # Moltiplicatore premio (es. 0.85 = -15%)
class DrivingScoreCalculator:
"""
Calcola il driving score aggregando features su finestra 90 giorni.
Architettura: features -> normalization -> weighted scoring -> premium mapping
"""
WEIGHTS = {
"hard_braking_per_100km": -0.30, # Impatto negativo: 30%
"hard_accel_per_100km": -0.15, # Impatto negativo: 15%
"speeding_events_per_100km": -0.25, # Impatto negativo: 25%
"night_driving_ratio": -0.15, # Impatto negativo: 15%
"avg_speed_kmh": -0.10, # Impatto negativo: 10%
"highway_ratio": 0.05, # Bonus autostrada: +5%
}
# Soglie per conversione score -> category
CATEGORY_THRESHOLDS = [
(90, "EXCELLENT"),
(75, "GOOD"),
(55, "FAIR"),
(35, "POOR"),
(0, "DANGEROUS"),
]
# Mappa category -> premium discount/surcharge
PREMIUM_FACTORS = {
"EXCELLENT": 0.75, # -25% sconto
"GOOD": 0.90, # -10% sconto
"FAIR": 1.00, # Premio base
"POOR": 1.15, # +15% sovrapprezzo
"DANGEROUS": 1.35, # +35% sovrapprezzo
}
def __init__(self, fleet_stats: dict):
"""
fleet_stats: statistiche aggregate della flotta per normalizzazione
Es: {"hard_braking_per_100km": {"mean": 2.5, "std": 1.2}, ...}
"""
self.fleet_stats = fleet_stats
def calculate(self, features_90d: dict) -> DrivingScore:
"""
Calcola il driving score aggregato su 90 giorni.
features_90d: dizionario con medie delle feature sul periodo
"""
# Normalizza features rispetto alla flotta
normalized = self._normalize_features(features_90d)
# Calcola score pesato (base 100)
raw_score = 100.0
for feature, weight in self.WEIGHTS.items():
if feature in normalized:
z_score = normalized[feature]
# z-score positivo su feature negative peggiora lo score
raw_score += weight * z_score * 10
# Clamp tra 0 e 100
score = float(np.clip(raw_score, 0, 100))
# Determina categoria
category = self._get_category(score)
# Calcola percentile rispetto alla flotta
percentile = self._get_percentile(score)
return DrivingScore(
policy_id=features_90d["policy_id"],
score=round(score, 1),
percentile=round(percentile, 1),
category=category,
premium_factor=self.PREMIUM_FACTORS[category]
)
def _normalize_features(self, features: dict) -> dict:
normalized = {}
for key, value in features.items():
if key in self.fleet_stats:
stats = self.fleet_stats[key]
std = stats.get("std", 1.0)
if std > 0:
normalized[key] = (value - stats["mean"]) / std
else:
normalized[key] = 0.0
return normalized
def _get_category(self, score: float) -> str:
for threshold, category in self.CATEGORY_THRESHOLDS:
if score >= threshold:
return category
return "DANGEROUS"
def _get_percentile(self, score: float) -> float:
# Approssimazione: usa distribuzione normale con media=65, std=15
from scipy.stats import norm
return float(norm.cdf(score, loc=65, scale=15) * 100)
class UBIPremiumCalculator:
"""Calcola il premio UBI finale applicando il driving score."""
def calculate_ubi_premium(
self,
base_premium: float,
driving_score: DrivingScore,
distance_km: float,
base_distance_km: float = 15000.0
) -> dict:
"""
Premio UBI = Base * FactoreGuida * FattoreDistanza
- Fattore guida: basato su driving score (0.75-1.35)
- Fattore distanza: aggiustamento per km effettivi vs. dichiarati
"""
# Fattore distanza (max ±20%)
distance_ratio = distance_km / base_distance_km
distance_factor = np.clip(distance_ratio, 0.8, 1.2)
# Premio finale
ubi_premium = base_premium * driving_score.premium_factor * distance_factor
return {
"base_premium": round(base_premium, 2),
"driving_score": driving_score.score,
"driving_factor": driving_score.premium_factor,
"distance_km": distance_km,
"distance_factor": round(distance_factor, 3),
"final_premium": round(ubi_premium, 2),
"saving_vs_base": round(base_premium - ubi_premium, 2),
"category": driving_score.category
}
6. Wykrywanie wyzwolenia: Okna sesji i sygnały dokładnego wyzwolenia
Wykrycie początku i końca podróży jest bardziej skomplikowane, niż się wydaje. Pojazd może zatrzymanie się na światłach, krótkie zatrzymanie lub wjechanie do garażu podziemnego i przegapienie sygnał GPS. Logika wykrywania wyzwolenia wykorzystuje okna sesji z konfigurowalnym limitem czasu przerwy:
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TripDetector:
"""
Stato macchina per rilevamento trip.
Usa session window: gap > 5 minuti = fine viaggio.
"""
GAP_TIMEOUT_SECONDS: int = 300 # 5 minuti
MIN_TRIP_DISTANCE_KM: float = 0.5 # Ignora microtrip
policy_id: str
current_trip: Optional[dict] = None
last_reading_time: Optional[datetime] = None
readings_buffer: list = field(default_factory=list)
def process_reading(self, reading: dict) -> Optional[dict]:
"""
Processa una lettura e restituisce il trip completato se rilevato.
"""
current_time = datetime.fromisoformat(reading["timestamp"])
completed_trip = None
if self.current_trip is None:
# Inizio nuovo viaggio
if reading["speed_kmh"] > 2.0:
self.current_trip = {
"trip_id": f"trip-{self.policy_id}-{current_time.strftime('%Y%m%d%H%M%S')}",
"policy_id": self.policy_id,
"start_time": current_time.isoformat(),
"start_lat": reading["latitude"],
"start_lng": reading["longitude"],
"readings_count": 0,
"total_distance_km": 0.0
}
self.readings_buffer = [reading]
else:
# Viaggio in corso
if self.last_reading_time:
gap = (current_time - self.last_reading_time).total_seconds()
if gap > self.GAP_TIMEOUT_SECONDS:
# Gap troppo grande: chiudi viaggio corrente
completed_trip = self._close_trip(reading)
# Inizia nuovo viaggio se in moto
if reading["speed_kmh"] > 2.0:
self.current_trip = self._start_new_trip(reading, current_time)
self.readings_buffer = [reading]
else:
self.current_trip = None
self.readings_buffer = []
else:
self.readings_buffer.append(reading)
self._update_trip_stats(reading)
self.last_reading_time = current_time
return completed_trip
def _close_trip(self, last_reading: dict) -> Optional[dict]:
if not self.current_trip:
return None
trip = {
**self.current_trip,
"end_time": last_reading["timestamp"],
"end_lat": last_reading["latitude"],
"end_lng": last_reading["longitude"],
"readings": self.readings_buffer,
"duration_seconds": (
datetime.fromisoformat(last_reading["timestamp"]) -
datetime.fromisoformat(self.current_trip["start_time"])
).total_seconds()
}
# Scarta microtrip
if trip["total_distance_km"] < self.MIN_TRIP_DISTANCE_KM:
return None
return trip
def _update_trip_stats(self, reading: dict) -> None:
"""Aggiorna statistiche del trip corrente."""
if self.readings_buffer and self.current_trip:
prev = self.readings_buffer[-1]
dist = haversine_distance(
prev["latitude"], prev["longitude"],
reading["latitude"], reading["longitude"]
)
self.current_trip["total_distance_km"] += dist
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
"""Distanza Haversine in km tra due coordinate GPS."""
R = 6371.0
import math
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
7. Prywatność i RODO dla danych geolokalizacyjnych
Dane telematyczne obejmują dokładną geolokalizację, czasy, trasy podróży: dane osobowe wrażliwe w świetle RODO. System musi od podstaw wdrażać zasadę prywatności już w fazie projektowania.
Wymagania RODO dotyczące telematyki ubezpieczeniowej
- Wyraźna zgoda: Właściciel musi wyrazić wyraźną zgodę na gromadzenie danych dotyczących jazdy
- Minimalizacja: Zbieraj tylko dane ściśle niezbędne do oceny ryzyka
- Zatrzymanie: Surowe dane GPS usunięte po przetworzeniu (maks. 90 dni); zbiorcze przechowywane przez okres obowiązywania polisy
- Anonimizacja: Po obliczeniu wyniku dane GPS można agregować i anonimizować
- Prawo do bycia zapomnianym: API do usuwania danych na żądanie
- Transfery: Dane przetwarzane w UE (zakaz przekazywania poza EOG bez odpowiednich gwarancji)
class TelematicsPrivacyService:
"""Gestisce data lifecycle nel rispetto GDPR."""
RAW_GPS_RETENTION_DAYS = 90
AGGREGATED_RETENTION_YEARS = 7 # Per audit assicurativo
async def anonymize_trip(self, trip: dict) -> dict:
"""
Anonimizza un trip rimuovendo coordinate GPS precise.
Mantiene solo dati aggregati per il risk score.
"""
return {
"trip_id": trip["trip_id"],
"policy_id": trip["policy_id"],
"date": trip["start_time"][:10],
"duration_hours": trip["duration_seconds"] / 3600,
"distance_km": trip["total_distance_km"],
# Features comportamentali aggregate (no GPS raw)
"hard_braking_count": trip.get("hard_braking_count", 0),
"speeding_pct": trip.get("speeding_pct", 0),
"night_ratio": trip.get("night_driving_ratio", 0),
# NO: coordinate GPS, itinerario, soste specifiche
}
async def delete_raw_data(self, policy_id: str) -> None:
"""Elimina tutti i dati GPS grezzi per una polizza (diritto all'oblio)."""
await self.db.execute(
"DELETE FROM raw_telemetry WHERE policy_id = $1",
policy_id
)
await self.db.execute(
"DELETE FROM gps_tracks WHERE policy_id = $1",
policy_id
)
# Mantieni solo aggregati anonimi per audit
async def purge_expired_raw_data(self) -> int:
"""Job giornaliero: elimina dati GPS grezzi scaduti."""
from datetime import date, timedelta
cutoff = date.today() - timedelta(days=self.RAW_GPS_RETENTION_DAYS)
result = await self.db.execute(
"DELETE FROM raw_telemetry WHERE created_at < $1",
cutoff
)
return result
8. Panel kontrolny i informacje zwrotne od kierowcy
Zaangażowanie kierowców ma fundamentalne znaczenie dla sukcesu UBI. Kierowcy, którzy otrzymują Informacje zwrotne w czasie rzeczywistym poprawiają zachowanie kierowcy o 15–20% (dane AXA 2024). Mobilny pulpit nawigacyjny musi wyświetlać wyniki, trendy i spersonalizowane rekomendacje:
class DrivingFeedbackGenerator:
"""Genera feedback personalizzato per il guidatore."""
IMPROVEMENT_TIPS = {
"hard_braking": [
"Mantieni una distanza di sicurezza maggiore dal veicolo precedente",
"Anticipa le frenate osservando il traffico a distanza",
"Riduci la velocità in anticipo agli incroci"
],
"speeding": [
"Usa il cruise control in autostrada",
"Imposta un avviso velocità a 10 km/h sopra il limite",
"Considera che la multa media costa più dello sconto UBI"
],
"night_driving": [
"Pianifica viaggi lunghi durante il giorno quando possibile",
"Se guidi di notte, fai pause ogni 2 ore"
]
}
def generate_weekly_report(
self,
policy_id: str,
current_score: DrivingScore,
weekly_trips: list[dict]
) -> dict:
"""Genera report settimanale per il guidatore."""
worst_behavior = self._identify_worst_behavior(weekly_trips)
tips = [
self.IMPROVEMENT_TIPS[b][0]
for b in worst_behavior[:2]
if b in self.IMPROVEMENT_TIPS
]
return {
"policy_id": policy_id,
"week_score": current_score.score,
"score_category": current_score.category,
"premium_factor": current_score.premium_factor,
"estimated_annual_saving": self._estimate_saving(current_score),
"trips_this_week": len(weekly_trips),
"total_km_this_week": sum(t["distance_km"] for t in weekly_trips),
"improvement_tips": tips,
"percentile_rank": f"Guidi meglio del {current_score.percentile:.0f}% degli assicurati"
}
def _identify_worst_behavior(self, trips: list[dict]) -> list[str]:
behaviors = []
avg_hard_braking = sum(
t.get("hard_braking_per_100km", 0) for t in trips
) / max(len(trips), 1)
avg_speeding = sum(
t.get("speeding_events_per_100km", 0) for t in trips
) / max(len(trips), 1)
night_ratio = sum(
t.get("night_driving_ratio", 0) for t in trips
) / max(len(trips), 1)
if avg_hard_braking > 3:
behaviors.append("hard_braking")
if avg_speeding > 2:
behaviors.append("speeding")
if night_ratio > 0.3:
behaviors.append("night_driving")
return behaviors
def _estimate_saving(self, score: DrivingScore) -> float:
base_premium = 600.0 # Premio medio auto in Italia
return round(base_premium * (1 - score.premium_factor), 0)
9. Kompletna architektura systemu
Pełna architektura systemu telematycznego UBI składa się z następujących warstw:
| Warstwy | Komponenty | Technologia | Utajenie |
|---|---|---|---|
| Krawędź | Klucz sprzętowy OBD, SDK do smartfona | C/ARM, SDK dla iOS/Androida | 100ms |
| Przyjmowanie pokarmu | Broker MQTT, API REST | Rdzeń AWS IoT / EMQX | <1s |
| Transmisja strumieniowa | Przetwarzanie zdarzeń, wykrywanie wyłączeń | Apache Kafka + Flink | <5 s |
| Punktacja ML | Sklep z funkcjami, silnik punktacji | Uczta + MLflow | <100ms |
| Składowanie | Szeregi czasowe, agregaty | InfluxDB + PostgreSQL | - |
| Porcja | Score API, panel sterownika | FastAPI + Reaguj natywnie | <200ms |
Wnioski
Rurociąg telematyczny UBI to rozproszony system obliczeniowy, który łączy się z przetwarzaniem brzegowym, przetwarzanie strumieniowe, uczenie maszynowe i inżynieria prywatności. Rynek 62,6 miliarda dolarów (2025) z CAGR 24,8% odzwierciedla zainteresowanie branży tym dostosowaniem ryzyka.
Punktami krytycznymi są inżynieria cech dotycząca zachowania podczas jazdy (mocne hamowanie, przekroczenie prędkości, jazda nocą), obliczenie znormalizowanego wyniku za jazdę floty oraz odpowiedzialne zarządzanie danymi geolokalizacyjnymi zgodnie z RODO. Informacje zwrotne Dla kierowcy to nie tylko plus: to najskuteczniejsza dźwignia poprawy zachowania i zmniejszyć liczbę wypadków flotowych.
Nadchodzące artykuły z serii InsurTech
- Underwriting AI: inżynieria funkcji i punktacja ryzyka
- Automatyzacja roszczeń: wizja komputerowa i NLP
- Wykrywanie oszustw: analiza wykresów i sygnał behawioralny







