Telematicapijplijn: UBI-gegevensverwerking op schaal
In 2025 zullen ruim 21 miljoen Amerikaanse chauffeurs telematicagegevens met hen delen verzekering. In Europa lanceerde AXA in januari 2025 een AI-gestuurde UBI-oplossing voor markten Europeanen dat OBD-II, op smartphones gebaseerde telematica en risicovoorspellende modellen integreert. De UBI-markt (Usage-Based Insurance) is in 2025 62,6 miljard dollar waard en zal groeien met CAGR van 24,8% tot 2035.
Verzekeringstelematica is niet simpelweg het verzamelen van GPS-gegevens: het is een complexe pijplijn transformeert ruwe signalen van OBD-II-apparaten, smartphones en IoT-sensoren naar individuele risicoscores die de premie van elke individuele verzekerde bepalen. Een slechte chauffeur betaalt meer, een goede chauffeur bespaart. Het principe is eenvoudig; de technische uitvoering en al het andere.
In dit artikel bouwen we een complete telematicapijplijn: vanaf data-inname OBD-II voor engineering van rijgedrag, van real-time risicoscores tot dynamische prijzen, met schaalbaarheid, latentie en privacyoverwegingen.
Wat je gaat leren
- Telematica-pijplijnarchitectuur voor UBI (Pay-How-You-Drive)
- OBD-II-protocol: parameters, bemonsteringsfrequenties, edge-verwerking
- Feature engineering voor rijgedrag (hard remmen, bochten nemen, te hard rijden)
- Streamingverwerking met Apache Kafka en Apache Flink
- Realtime risicoscores en temporele aggregaties
- Dynamische prijzen op basis van rijscore
- AVG-overwegingen voor geolocatiegegevens
1. Soorten verzekeringstelematica
Er zijn drie hoofdmodellen voor het verzamelen van telematicagegevens, met verschillende afwegingen daartussen nauwkeurigheid, implementatiekosten en klantacceptatie:
| Type | Apparaat | Gegevens verzameld | Nauwkeurigheid | Kosten |
|---|---|---|---|---|
| OBD-II-dongle | OBD-poort plug-in | ECU, snelheid, RPM, acceleratie | Hoog | € 20-50 WW |
| Smartphone-app | iOS/Android-apps | GPS, versnellingsmeter, gyroscoop | Gemiddeld | Nul hardware |
| Black Box-OEM | OEM-apparaat geïnstalleerd | ECU + GPS + versnellingsmeter | Zeer hoog | € 100-300 |
| Verbonden auto-API | Voertuigeigen API | Alle native telematica | Maximaal | OEM-overeenkomst |
2. OBD-II-gegevensstructuur
Het OBD-II-protocol (On-Board Diagnostics II) maakt via PID tientallen ECU-parameters zichtbaar (Parameter-ID's). Voor het scoren van verzekeringsrisico's zijn de meest relevante PID's:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class OBDReading:
"""Singola lettura OBD-II dal veicolo."""
device_id: str # ID univoco del dispositivo
policy_id: str # Polizza associata
timestamp: datetime # UTC timestamp
latitude: float # Gradi decimali (WGS84)
longitude: float # Gradi decimali (WGS84)
speed_kmh: float # Velocita in km/h (PID 0x0D)
rpm: int # Giri per minuto (PID 0x0C)
throttle_pct: float # Posizione acceleratore % (PID 0x11)
engine_load_pct: float # Carico motore % (PID 0x04)
coolant_temp_c: int # Temperatura liquido (PID 0x05)
# Calcolati lato edge (non nativi OBD)
acceleration_ms2: Optional[float] = None # m/s^2 (+/-)
heading_deg: Optional[float] = None # Direzione 0-360
road_type: Optional[str] = None # "URBAN", "RURAL", "HIGHWAY"
@dataclass
class TripSummary:
"""Riepilogo di un singolo viaggio."""
trip_id: str
policy_id: str
device_id: str
start_time: datetime
end_time: datetime
distance_km: float
duration_minutes: float
# Metrics comportamentali (calcolati dalla pipeline)
max_speed_kmh: float
avg_speed_kmh: float
hard_braking_count: int # Decelerazioni > 0.3g
hard_acceleration_count: int # Accelerazioni > 0.3g
hard_cornering_count: int # Laterale > 0.3g
speeding_pct: float # % tempo sopra limite
night_driving_pct: float # % km notturni (22:00-06:00)
highway_pct: float # % km autostrada
# Score aggregati
safety_score: float # 0-100 (100 = ottimo)
fuel_efficiency_score: float # 0-100
3. Randverwerking op het apparaat
Ruwe OBD-II-gegevens moeten aan de rand worden voorbewerkt voordat ze naar de cloud worden verzonden. Dit vermindert de benodigde bandbreedte en berekent discrete gebeurtenissen (hard remmen, te hard rijden) in realtime. De edge-processor draait doorgaans op een ARM-microcontroller in het OBD-apparaat:
from collections import deque
from dataclasses import dataclass
import math
class EdgeEventDetector:
"""
Algoritmo di detection eventi di guida.
Gira sul dispositivo embedded (ARM Cortex-M).
Sampling rate: 10 Hz (una lettura ogni 100ms)
"""
HARD_BRAKING_THRESHOLD = -3.0 # m/s^2 (0.3g)
HARD_ACCEL_THRESHOLD = 3.0 # m/s^2 (0.3g)
HARD_CORNERING_THRESHOLD = 3.0 # m/s^2 laterale
def __init__(self, window_size: int = 5):
self.speed_history = deque(maxlen=window_size)
self.events: list = []
self.prev_timestamp = None
def process_reading(self, reading: dict) -> list:
"""
Processa una singola lettura e rileva eventi.
Restituisce lista di eventi rilevati (vuota se nessuno).
"""
detected = []
current_speed = reading["speed_kmh"]
current_ts = reading["timestamp"]
# Calcola accelerazione longitudinale
if self.speed_history and self.prev_timestamp:
prev_speed = self.speed_history[-1]
dt = (current_ts - self.prev_timestamp).total_seconds()
if dt > 0:
# Conversione km/h -> m/s
delta_v = (current_speed - prev_speed) / 3.6
acceleration = delta_v / dt
if acceleration <= self.HARD_BRAKING_THRESHOLD:
detected.append({
"type": "HARD_BRAKING",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
elif acceleration >= self.HARD_ACCEL_THRESHOLD:
detected.append({
"type": "HARD_ACCELERATION",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
self.speed_history.append(current_speed)
self.prev_timestamp = current_ts
return detected
def detect_speeding(
self,
speed_kmh: float,
speed_limit_kmh: float,
tolerance_pct: float = 0.10
) -> bool:
"""Rileva superamento limite con tolleranza del 10%."""
threshold = speed_limit_kmh * (1 + tolerance_pct)
return speed_kmh > threshold
class EdgeBatchManager:
"""
Gestisce il buffering e l'invio batch al cloud.
Strategia: invia ogni 30 secondi o al termine del viaggio.
"""
BATCH_INTERVAL_SECONDS = 30
MAX_BATCH_SIZE = 300 # max readings per batch
def __init__(self, uploader):
self.buffer: list = []
self.events: list = []
self.last_send = None
self.uploader = uploader
def add_reading(self, reading: dict, events: list) -> None:
self.buffer.append(reading)
self.events.extend(events)
if len(self.buffer) >= self.MAX_BATCH_SIZE:
self._flush_async()
def _flush_async(self) -> None:
if not self.buffer:
return
payload = {
"readings": list(self.buffer),
"events": list(self.events),
"compressed": True
}
# Invia compresso (gzip) via MQTT o HTTPS
self.uploader.send(payload)
self.buffer.clear()
self.events.clear()
4. Streamingpijplijn met Apache Kafka en Apache Flink
De cloudpijplijn ontvangt batches van de edge en streamt deze naar rekenkracht reissamenvattingen, verzamel gebeurtenissen over tijdsperioden en update risicoscores in de nabije toekomst realtime. De architectuur is gebaseerd op Apache Kafka als berichtenbus en Apache Flink voor stroomverwerking:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, Types
import json
from datetime import datetime
def build_telematics_pipeline():
"""
Pipeline Flink per processing dati telematici.
Topic Kafka:
- telemetry.raw : letture OBD grezze
- telemetry.events : eventi discreti (hard braking, etc.)
- telemetry.trips : trip summaries
- risk.scores : driving scores aggiornati
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 1. Source: letture OBD-II raw
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("telemetry.raw")
.set_group_id("telematics-processor")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
raw_stream = env.from_source(
kafka_source,
WatermarkStrategy.for_monotonous_timestamps(),
"OBD Raw Source"
)
# 2. Parse e validazione
parsed_stream = raw_stream.map(lambda s: json.loads(s)) \
.filter(lambda r: validate_reading(r))
# 3. Key by policy_id per partitioning
keyed_stream = parsed_stream.key_by(lambda r: r["policy_id"])
# 4. Trip detection tramite session windows
# Una sessione si chiude dopo 5 minuti di inattivita
trip_stream = keyed_stream \
.window(SessionWindows.with_gap(Time.minutes(5))) \
.apply(TripAggregator())
# 5. Feature engineering per ogni trip
features_stream = trip_stream.map(lambda t: extract_features(t))
# 6. Aggiorna driving score (aggregazione 90 giorni rolling)
score_stream = features_stream \
.key_by(lambda f: f["policy_id"]) \
.window(TumblingEventTimeWindows.of(Time.days(1))) \
.reduce(RollingScoreReducer())
# 7. Sink: aggiorna Redis e pubblica evento
score_stream.add_sink(RedisSink())
score_stream.add_sink(KafkaSink("risk.scores"))
env.execute("Telematics Processing Pipeline")
def validate_reading(reading: dict) -> bool:
"""Validazione basica di una lettura OBD."""
required_fields = ["device_id", "policy_id", "timestamp", "speed_kmh"]
if not all(f in reading for f in required_fields):
return False
if not (0 <= reading["speed_kmh"] <= 300):
return False
if "latitude" in reading and not (-90 <= reading["latitude"] <= 90):
return False
return True
def extract_features(trip: dict) -> dict:
"""
Estrae feature per risk scoring da un trip summary.
Output: feature vector per il modello ML.
"""
readings = trip["readings"]
events = trip["events"]
duration_sec = trip["duration_seconds"]
hard_braking = [e for e in events if e["type"] == "HARD_BRAKING"]
hard_accel = [e for e in events if e["type"] == "HARD_ACCELERATION"]
speeding = [e for e in events if e["type"] == "SPEEDING"]
return {
"policy_id": trip["policy_id"],
"trip_id": trip["trip_id"],
"trip_date": trip["start_time"][:10],
# Feature comportamentali (normalizzate per 100km)
"hard_braking_per_100km": len(hard_braking) / max(trip["distance_km"], 0.1) * 100,
"hard_accel_per_100km": len(hard_accel) / max(trip["distance_km"], 0.1) * 100,
"speeding_events_per_100km": len(speeding) / max(trip["distance_km"], 0.1) * 100,
# Feature temporali
"night_driving_ratio": compute_night_ratio(readings),
"rush_hour_ratio": compute_rush_hour_ratio(readings),
"weekend_ratio": compute_weekend_ratio(readings),
# Feature cinematiche
"avg_speed_kmh": trip.get("avg_speed_kmh", 0),
"max_speed_kmh": trip.get("max_speed_kmh", 0),
"avg_acceleration": compute_avg_acceleration(readings),
# Feature contesto
"highway_ratio": trip.get("highway_pct", 0) / 100,
"urban_ratio": trip.get("urban_pct", 0) / 100,
"distance_km": trip["distance_km"],
"duration_hours": duration_sec / 3600
}
def compute_night_ratio(readings: list) -> float:
"""Calcola frazione dei km percorsi di notte (22:00-06:00)."""
if not readings:
return 0.0
night_readings = [
r for r in readings
if int(r["timestamp"][11:13]) >= 22 or int(r["timestamp"][11:13]) < 6
]
return len(night_readings) / len(readings)
5. Risicoscoremodel: van rijgedrag tot beloningen
Het hart van het OBI-systeem is het model dat gedragskenmerken vertaalt in een rijscore en vervolgens in een premieaanpassingsfactor. Het meest gebruikte model in de sector is a XGBoost-ensemble + bedrijfsregels:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
import xgboost as xgb
from typing import NamedTuple
class DrivingScore(NamedTuple):
"""Driving score aggregato su finestra rolling 90 giorni."""
policy_id: str
score: float # 0-100 (100 = guida eccellente)
percentile: float # Percentile rispetto alla flotta
category: str # "EXCELLENT", "GOOD", "FAIR", "POOR", "DANGEROUS"
premium_factor: float # Moltiplicatore premio (es. 0.85 = -15%)
class DrivingScoreCalculator:
"""
Calcola il driving score aggregando features su finestra 90 giorni.
Architettura: features -> normalization -> weighted scoring -> premium mapping
"""
WEIGHTS = {
"hard_braking_per_100km": -0.30, # Impatto negativo: 30%
"hard_accel_per_100km": -0.15, # Impatto negativo: 15%
"speeding_events_per_100km": -0.25, # Impatto negativo: 25%
"night_driving_ratio": -0.15, # Impatto negativo: 15%
"avg_speed_kmh": -0.10, # Impatto negativo: 10%
"highway_ratio": 0.05, # Bonus autostrada: +5%
}
# Soglie per conversione score -> category
CATEGORY_THRESHOLDS = [
(90, "EXCELLENT"),
(75, "GOOD"),
(55, "FAIR"),
(35, "POOR"),
(0, "DANGEROUS"),
]
# Mappa category -> premium discount/surcharge
PREMIUM_FACTORS = {
"EXCELLENT": 0.75, # -25% sconto
"GOOD": 0.90, # -10% sconto
"FAIR": 1.00, # Premio base
"POOR": 1.15, # +15% sovrapprezzo
"DANGEROUS": 1.35, # +35% sovrapprezzo
}
def __init__(self, fleet_stats: dict):
"""
fleet_stats: statistiche aggregate della flotta per normalizzazione
Es: {"hard_braking_per_100km": {"mean": 2.5, "std": 1.2}, ...}
"""
self.fleet_stats = fleet_stats
def calculate(self, features_90d: dict) -> DrivingScore:
"""
Calcola il driving score aggregato su 90 giorni.
features_90d: dizionario con medie delle feature sul periodo
"""
# Normalizza features rispetto alla flotta
normalized = self._normalize_features(features_90d)
# Calcola score pesato (base 100)
raw_score = 100.0
for feature, weight in self.WEIGHTS.items():
if feature in normalized:
z_score = normalized[feature]
# z-score positivo su feature negative peggiora lo score
raw_score += weight * z_score * 10
# Clamp tra 0 e 100
score = float(np.clip(raw_score, 0, 100))
# Determina categoria
category = self._get_category(score)
# Calcola percentile rispetto alla flotta
percentile = self._get_percentile(score)
return DrivingScore(
policy_id=features_90d["policy_id"],
score=round(score, 1),
percentile=round(percentile, 1),
category=category,
premium_factor=self.PREMIUM_FACTORS[category]
)
def _normalize_features(self, features: dict) -> dict:
normalized = {}
for key, value in features.items():
if key in self.fleet_stats:
stats = self.fleet_stats[key]
std = stats.get("std", 1.0)
if std > 0:
normalized[key] = (value - stats["mean"]) / std
else:
normalized[key] = 0.0
return normalized
def _get_category(self, score: float) -> str:
for threshold, category in self.CATEGORY_THRESHOLDS:
if score >= threshold:
return category
return "DANGEROUS"
def _get_percentile(self, score: float) -> float:
# Approssimazione: usa distribuzione normale con media=65, std=15
from scipy.stats import norm
return float(norm.cdf(score, loc=65, scale=15) * 100)
class UBIPremiumCalculator:
"""Calcola il premio UBI finale applicando il driving score."""
def calculate_ubi_premium(
self,
base_premium: float,
driving_score: DrivingScore,
distance_km: float,
base_distance_km: float = 15000.0
) -> dict:
"""
Premio UBI = Base * FactoreGuida * FattoreDistanza
- Fattore guida: basato su driving score (0.75-1.35)
- Fattore distanza: aggiustamento per km effettivi vs. dichiarati
"""
# Fattore distanza (max ±20%)
distance_ratio = distance_km / base_distance_km
distance_factor = np.clip(distance_ratio, 0.8, 1.2)
# Premio finale
ubi_premium = base_premium * driving_score.premium_factor * distance_factor
return {
"base_premium": round(base_premium, 2),
"driving_score": driving_score.score,
"driving_factor": driving_score.premium_factor,
"distance_km": distance_km,
"distance_factor": round(distance_factor, 3),
"final_premium": round(ubi_premium, 2),
"saving_vs_base": round(base_premium - ubi_premium, 2),
"category": driving_score.category
}
6. Tripdetectie: sessievensters en fijne tripsignalen
Het detecteren van het begin en het einde van een reis is complexer dan het lijkt. Het voertuig kan stoppen bij een stoplicht, een korte stop maken, of een ondergrondse garage binnengaan en iets missen het GPS-signaal. De tripdetectielogica maakt gebruik van sessievensters met configureerbare gap-time-out:
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TripDetector:
"""
Stato macchina per rilevamento trip.
Usa session window: gap > 5 minuti = fine viaggio.
"""
GAP_TIMEOUT_SECONDS: int = 300 # 5 minuti
MIN_TRIP_DISTANCE_KM: float = 0.5 # Ignora microtrip
policy_id: str
current_trip: Optional[dict] = None
last_reading_time: Optional[datetime] = None
readings_buffer: list = field(default_factory=list)
def process_reading(self, reading: dict) -> Optional[dict]:
"""
Processa una lettura e restituisce il trip completato se rilevato.
"""
current_time = datetime.fromisoformat(reading["timestamp"])
completed_trip = None
if self.current_trip is None:
# Inizio nuovo viaggio
if reading["speed_kmh"] > 2.0:
self.current_trip = {
"trip_id": f"trip-{self.policy_id}-{current_time.strftime('%Y%m%d%H%M%S')}",
"policy_id": self.policy_id,
"start_time": current_time.isoformat(),
"start_lat": reading["latitude"],
"start_lng": reading["longitude"],
"readings_count": 0,
"total_distance_km": 0.0
}
self.readings_buffer = [reading]
else:
# Viaggio in corso
if self.last_reading_time:
gap = (current_time - self.last_reading_time).total_seconds()
if gap > self.GAP_TIMEOUT_SECONDS:
# Gap troppo grande: chiudi viaggio corrente
completed_trip = self._close_trip(reading)
# Inizia nuovo viaggio se in moto
if reading["speed_kmh"] > 2.0:
self.current_trip = self._start_new_trip(reading, current_time)
self.readings_buffer = [reading]
else:
self.current_trip = None
self.readings_buffer = []
else:
self.readings_buffer.append(reading)
self._update_trip_stats(reading)
self.last_reading_time = current_time
return completed_trip
def _close_trip(self, last_reading: dict) -> Optional[dict]:
if not self.current_trip:
return None
trip = {
**self.current_trip,
"end_time": last_reading["timestamp"],
"end_lat": last_reading["latitude"],
"end_lng": last_reading["longitude"],
"readings": self.readings_buffer,
"duration_seconds": (
datetime.fromisoformat(last_reading["timestamp"]) -
datetime.fromisoformat(self.current_trip["start_time"])
).total_seconds()
}
# Scarta microtrip
if trip["total_distance_km"] < self.MIN_TRIP_DISTANCE_KM:
return None
return trip
def _update_trip_stats(self, reading: dict) -> None:
"""Aggiorna statistiche del trip corrente."""
if self.readings_buffer and self.current_trip:
prev = self.readings_buffer[-1]
dist = haversine_distance(
prev["latitude"], prev["longitude"],
reading["latitude"], reading["longitude"]
)
self.current_trip["total_distance_km"] += dist
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
"""Distanza Haversine in km tra due coordinate GPS."""
R = 6371.0
import math
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
7. Privacy en AVG voor geolocatiegegevens
Telematicagegevens omvatten nauwkeurige geolocatie, tijden, routes: persoonlijke gegevens gevoelig onder de AVG. Het systeem moet privacy-by-design van de grond af aan implementeren.
AVG-vereisten voor verzekeringstelematica
- Expliciete toestemming: De eigenaar moet expliciet toestemming geven voor het verzamelen van rijgegevens
- Minimalisatie: Verzamel alleen de gegevens die strikt noodzakelijk zijn voor risicoscores
- Behoud: Ruwe GPS-gegevens verwijderd na verwerking (max. 90 dagen); aggregaten die zijn opgeslagen voor de duur van het beleid
- Anonimisering: Na de scoreberekening kunnen de GPS-gegevens worden samengevoegd en geanonimiseerd
- Recht om vergeten te worden: API voor het on-demand verwijderen van gegevens
- Overboekingen: Gegevens verwerkt in de EU (geen overdracht buiten de EER zonder voldoende garanties)
class TelematicsPrivacyService:
"""Gestisce data lifecycle nel rispetto GDPR."""
RAW_GPS_RETENTION_DAYS = 90
AGGREGATED_RETENTION_YEARS = 7 # Per audit assicurativo
async def anonymize_trip(self, trip: dict) -> dict:
"""
Anonimizza un trip rimuovendo coordinate GPS precise.
Mantiene solo dati aggregati per il risk score.
"""
return {
"trip_id": trip["trip_id"],
"policy_id": trip["policy_id"],
"date": trip["start_time"][:10],
"duration_hours": trip["duration_seconds"] / 3600,
"distance_km": trip["total_distance_km"],
# Features comportamentali aggregate (no GPS raw)
"hard_braking_count": trip.get("hard_braking_count", 0),
"speeding_pct": trip.get("speeding_pct", 0),
"night_ratio": trip.get("night_driving_ratio", 0),
# NO: coordinate GPS, itinerario, soste specifiche
}
async def delete_raw_data(self, policy_id: str) -> None:
"""Elimina tutti i dati GPS grezzi per una polizza (diritto all'oblio)."""
await self.db.execute(
"DELETE FROM raw_telemetry WHERE policy_id = $1",
policy_id
)
await self.db.execute(
"DELETE FROM gps_tracks WHERE policy_id = $1",
policy_id
)
# Mantieni solo aggregati anonimi per audit
async def purge_expired_raw_data(self) -> int:
"""Job giornaliero: elimina dati GPS grezzi scaduti."""
from datetime import date, timedelta
cutoff = date.today() - timedelta(days=self.RAW_GPS_RETENTION_DAYS)
result = await self.db.execute(
"DELETE FROM raw_telemetry WHERE created_at < $1",
cutoff
)
return result
8. Dashboard- en chauffeursfeedback
De betrokkenheid van chauffeurs is van fundamenteel belang voor het succes van een UBI. De chauffeurs die ontvangen real-time feedback verbetert het rijgedrag met 15-20% (AXA-gegevens 2024). Het mobiele dashboard moet scores, trends en gepersonaliseerde aanbevelingen tonen:
class DrivingFeedbackGenerator:
"""Genera feedback personalizzato per il guidatore."""
IMPROVEMENT_TIPS = {
"hard_braking": [
"Mantieni una distanza di sicurezza maggiore dal veicolo precedente",
"Anticipa le frenate osservando il traffico a distanza",
"Riduci la velocità in anticipo agli incroci"
],
"speeding": [
"Usa il cruise control in autostrada",
"Imposta un avviso velocità a 10 km/h sopra il limite",
"Considera che la multa media costa più dello sconto UBI"
],
"night_driving": [
"Pianifica viaggi lunghi durante il giorno quando possibile",
"Se guidi di notte, fai pause ogni 2 ore"
]
}
def generate_weekly_report(
self,
policy_id: str,
current_score: DrivingScore,
weekly_trips: list[dict]
) -> dict:
"""Genera report settimanale per il guidatore."""
worst_behavior = self._identify_worst_behavior(weekly_trips)
tips = [
self.IMPROVEMENT_TIPS[b][0]
for b in worst_behavior[:2]
if b in self.IMPROVEMENT_TIPS
]
return {
"policy_id": policy_id,
"week_score": current_score.score,
"score_category": current_score.category,
"premium_factor": current_score.premium_factor,
"estimated_annual_saving": self._estimate_saving(current_score),
"trips_this_week": len(weekly_trips),
"total_km_this_week": sum(t["distance_km"] for t in weekly_trips),
"improvement_tips": tips,
"percentile_rank": f"Guidi meglio del {current_score.percentile:.0f}% degli assicurati"
}
def _identify_worst_behavior(self, trips: list[dict]) -> list[str]:
behaviors = []
avg_hard_braking = sum(
t.get("hard_braking_per_100km", 0) for t in trips
) / max(len(trips), 1)
avg_speeding = sum(
t.get("speeding_events_per_100km", 0) for t in trips
) / max(len(trips), 1)
night_ratio = sum(
t.get("night_driving_ratio", 0) for t in trips
) / max(len(trips), 1)
if avg_hard_braking > 3:
behaviors.append("hard_braking")
if avg_speeding > 2:
behaviors.append("speeding")
if night_ratio > 0.3:
behaviors.append("night_driving")
return behaviors
def _estimate_saving(self, score: DrivingScore) -> float:
base_premium = 600.0 # Premio medio auto in Italia
return round(base_premium * (1 - score.premium_factor), 0)
9. Volledige systeemarchitectuur
De volledige architectuur van het UBI-telematicasysteem bestaat uit de volgende lagen:
| Lagen | Componenten | Technologie | Latentie |
|---|---|---|---|
| Rand | OBD-dongle, smartphone SDK | C/ARM, iOS/Android-SDK | 100ms |
| Inslikken | MQTT-makelaar, REST API | AWS IoT Core/EMQX | <1s |
| Streamen | Gebeurtenisverwerking, tripdetectie | Apache Kafka + Flink | <5s |
| ML-score | Functiewinkel, score-engine | Feest + MLflow | <100 ms |
| Opslag | Tijdreeksen, aggregaten | InfluxDB + PostgreSQL | - |
| Portie | Score API, chauffeursdashboard | FastAPI + Reageer Native | <200ms |
Conclusies
De UBI-telematicapijplijn is een gedistribueerd computersysteem dat raakt aan edge computing, streamingverwerking, machine learning en privacy-engineering. De markt van 62,6 miljard in dollars (2025) met een CAGR van 24,8% weerspiegelt de interesse van de industrie in dit maatwerk van risico.
De kritische punten zijn feature engineering voor het rijgedrag (hard remmen, te hard rijden, rijden in het donker), de berekening van de genormaliseerde rijscore voor het wagenpark, en de verantwoordelijk beheer van geolocatiegegevens in overeenstemming met de AVG. De feedback Voor de bestuurder is het niet alleen een pluspunt: het is de meest effectieve hefboom om het gedrag te verbeteren en het verminderen van vlootongevallen.
Aankomende artikelen in de InsurTech-serie
- AI Underwriting: Feature Engineering en risicoscores
- Claimautomatisering: Computer Vision en NLP
- Fraudedetectie: grafiekanalyse en gedragssignaal







