Conducta telematică: procesarea datelor UBI la scară
Până în 2025, peste 21 de milioane de șoferi americani vor împărtăși date telematice cu ei asigurare. În Europa, AXA a lansat o soluție UBI bazată pe inteligență artificială pentru piețe în ianuarie 2025 Europenii care integrează OBD-II, telematică bazată pe smartphone și modele de predicție a riscurilor. Piața UBI (Asigurări bazate pe utilizare) valorează 62,6 miliarde USD în 2025 și va crește la CAGR de 24,8% până în 2035.
Telematica asigurărilor nu este doar colectarea de date GPS: este o conductă complexă care transformă semnalele brute de la dispozitive OBD-II, smartphone-uri și senzori IoT în scoruri individuale de risc care determină prima fiecărui asigurat individual. Un șofer rău plătește mai mult, un șofer bun economisește. Principiul este simplu; implementarea tehnică și orice altceva.
În acest articol construim o conductă telematică completă: din ingerarea de date OBD-II va prezenta inginerie a comportamentului de conducere, de la scorul de risc în timp real până la prețuri dinamice, cu considerații de scalabilitate, latență și confidențialitate.
Ce vei învăța
- Arhitectura conductei telematice pentru UBI (Pay-How-You-Drive)
- Protocolul OBD-II: parametrii, frecvențele de eșantionare, procesarea marginilor
- Inginerie caracteristică pentru comportamentul de conducere (frânare puternică, viraje, viteză)
- Procesarea în flux cu Apache Kafka și Apache Flink
- Scorul de risc în timp real și agregarea temporală
- Prețuri dinamice bazate pe scorul de conducere
- Considerații GDPR pentru datele de geolocalizare
1. Tipuri de telematice de asigurare
Există trei modele principale de colectare a datelor telematice, cu diferite compromisuri între acestea acuratețea, costul implementării și adoptarea de către clienți:
| Tip | Dispozitiv | Date colectate | Precizie | Cost |
|---|---|---|---|---|
| Dongle OBD-II | Plug-in portul OBD | ECU, viteza, RPM, acceleratie | Ridicat | 20-50 € HW |
| Aplicația pentru smartphone | Aplicații iOS/Android | GPS, accelerometru, giroscop | Medie | Zero HW |
| Black Box OEM | Dispozitiv OEM instalat | ECU + GPS + accelerometru | Foarte sus | 100-300 € |
| Connected Car API | API nativ pentru vehicul | Toată telematică nativă | Maxim | Acord OEM |
2. Structura de date OBD-II
Protocolul OBD-II (On-Board Diagnostics II) expune zeci de parametri ECU prin PID (ID-urile parametrilor). Pentru scorarea riscului de asigurare, cele mai relevante PID-uri sunt:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class OBDReading:
"""Singola lettura OBD-II dal veicolo."""
device_id: str # ID univoco del dispositivo
policy_id: str # Polizza associata
timestamp: datetime # UTC timestamp
latitude: float # Gradi decimali (WGS84)
longitude: float # Gradi decimali (WGS84)
speed_kmh: float # Velocita in km/h (PID 0x0D)
rpm: int # Giri per minuto (PID 0x0C)
throttle_pct: float # Posizione acceleratore % (PID 0x11)
engine_load_pct: float # Carico motore % (PID 0x04)
coolant_temp_c: int # Temperatura liquido (PID 0x05)
# Calcolati lato edge (non nativi OBD)
acceleration_ms2: Optional[float] = None # m/s^2 (+/-)
heading_deg: Optional[float] = None # Direzione 0-360
road_type: Optional[str] = None # "URBAN", "RURAL", "HIGHWAY"
@dataclass
class TripSummary:
"""Riepilogo di un singolo viaggio."""
trip_id: str
policy_id: str
device_id: str
start_time: datetime
end_time: datetime
distance_km: float
duration_minutes: float
# Metrics comportamentali (calcolati dalla pipeline)
max_speed_kmh: float
avg_speed_kmh: float
hard_braking_count: int # Decelerazioni > 0.3g
hard_acceleration_count: int # Accelerazioni > 0.3g
hard_cornering_count: int # Laterale > 0.3g
speeding_pct: float # % tempo sopra limite
night_driving_pct: float # % km notturni (22:00-06:00)
highway_pct: float # % km autostrada
# Score aggregati
safety_score: float # 0-100 (100 = ottimo)
fuel_efficiency_score: float # 0-100
3. Procesarea marginilor pe dispozitiv
Datele brute OBD-II trebuie să fie preprocesate la margine înainte de a fi trimise în cloud. Acest lucru reduce lățimea de bandă necesară și calculează evenimente discrete (frânare puternică, viteză) în timp real. Procesorul edge rulează de obicei pe un microcontroler ARM în dispozitivul OBD:
from collections import deque
from dataclasses import dataclass
import math
class EdgeEventDetector:
"""
Algoritmo di detection eventi di guida.
Gira sul dispositivo embedded (ARM Cortex-M).
Sampling rate: 10 Hz (una lettura ogni 100ms)
"""
HARD_BRAKING_THRESHOLD = -3.0 # m/s^2 (0.3g)
HARD_ACCEL_THRESHOLD = 3.0 # m/s^2 (0.3g)
HARD_CORNERING_THRESHOLD = 3.0 # m/s^2 laterale
def __init__(self, window_size: int = 5):
self.speed_history = deque(maxlen=window_size)
self.events: list = []
self.prev_timestamp = None
def process_reading(self, reading: dict) -> list:
"""
Processa una singola lettura e rileva eventi.
Restituisce lista di eventi rilevati (vuota se nessuno).
"""
detected = []
current_speed = reading["speed_kmh"]
current_ts = reading["timestamp"]
# Calcola accelerazione longitudinale
if self.speed_history and self.prev_timestamp:
prev_speed = self.speed_history[-1]
dt = (current_ts - self.prev_timestamp).total_seconds()
if dt > 0:
# Conversione km/h -> m/s
delta_v = (current_speed - prev_speed) / 3.6
acceleration = delta_v / dt
if acceleration <= self.HARD_BRAKING_THRESHOLD:
detected.append({
"type": "HARD_BRAKING",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
elif acceleration >= self.HARD_ACCEL_THRESHOLD:
detected.append({
"type": "HARD_ACCELERATION",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
self.speed_history.append(current_speed)
self.prev_timestamp = current_ts
return detected
def detect_speeding(
self,
speed_kmh: float,
speed_limit_kmh: float,
tolerance_pct: float = 0.10
) -> bool:
"""Rileva superamento limite con tolleranza del 10%."""
threshold = speed_limit_kmh * (1 + tolerance_pct)
return speed_kmh > threshold
class EdgeBatchManager:
"""
Gestisce il buffering e l'invio batch al cloud.
Strategia: invia ogni 30 secondi o al termine del viaggio.
"""
BATCH_INTERVAL_SECONDS = 30
MAX_BATCH_SIZE = 300 # max readings per batch
def __init__(self, uploader):
self.buffer: list = []
self.events: list = []
self.last_send = None
self.uploader = uploader
def add_reading(self, reading: dict, events: list) -> None:
self.buffer.append(reading)
self.events.extend(events)
if len(self.buffer) >= self.MAX_BATCH_SIZE:
self._flush_async()
def _flush_async(self) -> None:
if not self.buffer:
return
payload = {
"readings": list(self.buffer),
"events": list(self.events),
"compressed": True
}
# Invia compresso (gzip) via MQTT o HTTPS
self.uploader.send(payload)
self.buffer.clear()
self.events.clear()
4. Streaming Pipeline cu Apache Kafka și Apache Flink
Conducta cloud primește loturi de la margine și le transmite în flux pentru a calcula rezumate de călătorie, evenimente agregate în ferestre de timp și actualizați scorurile de risc în viitorul apropiat în timp real. Arhitectura se bazează pe Apache Kafka ca magistrală de mesaje și Apache Flink pentru procesare flux:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, Types
import json
from datetime import datetime
def build_telematics_pipeline():
"""
Pipeline Flink per processing dati telematici.
Topic Kafka:
- telemetry.raw : letture OBD grezze
- telemetry.events : eventi discreti (hard braking, etc.)
- telemetry.trips : trip summaries
- risk.scores : driving scores aggiornati
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 1. Source: letture OBD-II raw
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("telemetry.raw")
.set_group_id("telematics-processor")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
raw_stream = env.from_source(
kafka_source,
WatermarkStrategy.for_monotonous_timestamps(),
"OBD Raw Source"
)
# 2. Parse e validazione
parsed_stream = raw_stream.map(lambda s: json.loads(s)) \
.filter(lambda r: validate_reading(r))
# 3. Key by policy_id per partitioning
keyed_stream = parsed_stream.key_by(lambda r: r["policy_id"])
# 4. Trip detection tramite session windows
# Una sessione si chiude dopo 5 minuti di inattivita
trip_stream = keyed_stream \
.window(SessionWindows.with_gap(Time.minutes(5))) \
.apply(TripAggregator())
# 5. Feature engineering per ogni trip
features_stream = trip_stream.map(lambda t: extract_features(t))
# 6. Aggiorna driving score (aggregazione 90 giorni rolling)
score_stream = features_stream \
.key_by(lambda f: f["policy_id"]) \
.window(TumblingEventTimeWindows.of(Time.days(1))) \
.reduce(RollingScoreReducer())
# 7. Sink: aggiorna Redis e pubblica evento
score_stream.add_sink(RedisSink())
score_stream.add_sink(KafkaSink("risk.scores"))
env.execute("Telematics Processing Pipeline")
def validate_reading(reading: dict) -> bool:
"""Validazione basica di una lettura OBD."""
required_fields = ["device_id", "policy_id", "timestamp", "speed_kmh"]
if not all(f in reading for f in required_fields):
return False
if not (0 <= reading["speed_kmh"] <= 300):
return False
if "latitude" in reading and not (-90 <= reading["latitude"] <= 90):
return False
return True
def extract_features(trip: dict) -> dict:
"""
Estrae feature per risk scoring da un trip summary.
Output: feature vector per il modello ML.
"""
readings = trip["readings"]
events = trip["events"]
duration_sec = trip["duration_seconds"]
hard_braking = [e for e in events if e["type"] == "HARD_BRAKING"]
hard_accel = [e for e in events if e["type"] == "HARD_ACCELERATION"]
speeding = [e for e in events if e["type"] == "SPEEDING"]
return {
"policy_id": trip["policy_id"],
"trip_id": trip["trip_id"],
"trip_date": trip["start_time"][:10],
# Feature comportamentali (normalizzate per 100km)
"hard_braking_per_100km": len(hard_braking) / max(trip["distance_km"], 0.1) * 100,
"hard_accel_per_100km": len(hard_accel) / max(trip["distance_km"], 0.1) * 100,
"speeding_events_per_100km": len(speeding) / max(trip["distance_km"], 0.1) * 100,
# Feature temporali
"night_driving_ratio": compute_night_ratio(readings),
"rush_hour_ratio": compute_rush_hour_ratio(readings),
"weekend_ratio": compute_weekend_ratio(readings),
# Feature cinematiche
"avg_speed_kmh": trip.get("avg_speed_kmh", 0),
"max_speed_kmh": trip.get("max_speed_kmh", 0),
"avg_acceleration": compute_avg_acceleration(readings),
# Feature contesto
"highway_ratio": trip.get("highway_pct", 0) / 100,
"urban_ratio": trip.get("urban_pct", 0) / 100,
"distance_km": trip["distance_km"],
"duration_hours": duration_sec / 3600
}
def compute_night_ratio(readings: list) -> float:
"""Calcola frazione dei km percorsi di notte (22:00-06:00)."""
if not readings:
return 0.0
night_readings = [
r for r in readings
if int(r["timestamp"][11:13]) >= 22 or int(r["timestamp"][11:13]) < 6
]
return len(night_readings) / len(readings)
5. Modelul de evaluare a riscurilor: de la comportamentul de conducere la recompense
Inima sistemului UBI este modelul care traduce caracteristicile comportamentului într-un scor de conducere și apoi într-un factor de ajustare a primei. Cel mai folosit model în sector este a Ansamblul XGBoost + reguli de afaceri:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
import xgboost as xgb
from typing import NamedTuple
class DrivingScore(NamedTuple):
"""Driving score aggregato su finestra rolling 90 giorni."""
policy_id: str
score: float # 0-100 (100 = guida eccellente)
percentile: float # Percentile rispetto alla flotta
category: str # "EXCELLENT", "GOOD", "FAIR", "POOR", "DANGEROUS"
premium_factor: float # Moltiplicatore premio (es. 0.85 = -15%)
class DrivingScoreCalculator:
"""
Calcola il driving score aggregando features su finestra 90 giorni.
Architettura: features -> normalization -> weighted scoring -> premium mapping
"""
WEIGHTS = {
"hard_braking_per_100km": -0.30, # Impatto negativo: 30%
"hard_accel_per_100km": -0.15, # Impatto negativo: 15%
"speeding_events_per_100km": -0.25, # Impatto negativo: 25%
"night_driving_ratio": -0.15, # Impatto negativo: 15%
"avg_speed_kmh": -0.10, # Impatto negativo: 10%
"highway_ratio": 0.05, # Bonus autostrada: +5%
}
# Soglie per conversione score -> category
CATEGORY_THRESHOLDS = [
(90, "EXCELLENT"),
(75, "GOOD"),
(55, "FAIR"),
(35, "POOR"),
(0, "DANGEROUS"),
]
# Mappa category -> premium discount/surcharge
PREMIUM_FACTORS = {
"EXCELLENT": 0.75, # -25% sconto
"GOOD": 0.90, # -10% sconto
"FAIR": 1.00, # Premio base
"POOR": 1.15, # +15% sovrapprezzo
"DANGEROUS": 1.35, # +35% sovrapprezzo
}
def __init__(self, fleet_stats: dict):
"""
fleet_stats: statistiche aggregate della flotta per normalizzazione
Es: {"hard_braking_per_100km": {"mean": 2.5, "std": 1.2}, ...}
"""
self.fleet_stats = fleet_stats
def calculate(self, features_90d: dict) -> DrivingScore:
"""
Calcola il driving score aggregato su 90 giorni.
features_90d: dizionario con medie delle feature sul periodo
"""
# Normalizza features rispetto alla flotta
normalized = self._normalize_features(features_90d)
# Calcola score pesato (base 100)
raw_score = 100.0
for feature, weight in self.WEIGHTS.items():
if feature in normalized:
z_score = normalized[feature]
# z-score positivo su feature negative peggiora lo score
raw_score += weight * z_score * 10
# Clamp tra 0 e 100
score = float(np.clip(raw_score, 0, 100))
# Determina categoria
category = self._get_category(score)
# Calcola percentile rispetto alla flotta
percentile = self._get_percentile(score)
return DrivingScore(
policy_id=features_90d["policy_id"],
score=round(score, 1),
percentile=round(percentile, 1),
category=category,
premium_factor=self.PREMIUM_FACTORS[category]
)
def _normalize_features(self, features: dict) -> dict:
normalized = {}
for key, value in features.items():
if key in self.fleet_stats:
stats = self.fleet_stats[key]
std = stats.get("std", 1.0)
if std > 0:
normalized[key] = (value - stats["mean"]) / std
else:
normalized[key] = 0.0
return normalized
def _get_category(self, score: float) -> str:
for threshold, category in self.CATEGORY_THRESHOLDS:
if score >= threshold:
return category
return "DANGEROUS"
def _get_percentile(self, score: float) -> float:
# Approssimazione: usa distribuzione normale con media=65, std=15
from scipy.stats import norm
return float(norm.cdf(score, loc=65, scale=15) * 100)
class UBIPremiumCalculator:
"""Calcola il premio UBI finale applicando il driving score."""
def calculate_ubi_premium(
self,
base_premium: float,
driving_score: DrivingScore,
distance_km: float,
base_distance_km: float = 15000.0
) -> dict:
"""
Premio UBI = Base * FactoreGuida * FattoreDistanza
- Fattore guida: basato su driving score (0.75-1.35)
- Fattore distanza: aggiustamento per km effettivi vs. dichiarati
"""
# Fattore distanza (max ±20%)
distance_ratio = distance_km / base_distance_km
distance_factor = np.clip(distance_ratio, 0.8, 1.2)
# Premio finale
ubi_premium = base_premium * driving_score.premium_factor * distance_factor
return {
"base_premium": round(base_premium, 2),
"driving_score": driving_score.score,
"driving_factor": driving_score.premium_factor,
"distance_km": distance_km,
"distance_factor": round(distance_factor, 3),
"final_premium": round(ubi_premium, 2),
"saving_vs_base": round(base_premium - ubi_premium, 2),
"category": driving_score.category
}
6. Detecție de călătorie: ferestre de sesiune și semnale fine-trip
Detectarea începutului și a sfârșitului unei călătorii este mai complexă decât pare. Vehiculul poate oprirea la un semafor, oprirea scurtă sau intrarea într-un garaj subteran și ratarea semnalul GPS. Logica de detectare a deplasării utilizează ferestre de sesiune cu interval de timp de expirare configurabil:
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TripDetector:
"""
Stato macchina per rilevamento trip.
Usa session window: gap > 5 minuti = fine viaggio.
"""
GAP_TIMEOUT_SECONDS: int = 300 # 5 minuti
MIN_TRIP_DISTANCE_KM: float = 0.5 # Ignora microtrip
policy_id: str
current_trip: Optional[dict] = None
last_reading_time: Optional[datetime] = None
readings_buffer: list = field(default_factory=list)
def process_reading(self, reading: dict) -> Optional[dict]:
"""
Processa una lettura e restituisce il trip completato se rilevato.
"""
current_time = datetime.fromisoformat(reading["timestamp"])
completed_trip = None
if self.current_trip is None:
# Inizio nuovo viaggio
if reading["speed_kmh"] > 2.0:
self.current_trip = {
"trip_id": f"trip-{self.policy_id}-{current_time.strftime('%Y%m%d%H%M%S')}",
"policy_id": self.policy_id,
"start_time": current_time.isoformat(),
"start_lat": reading["latitude"],
"start_lng": reading["longitude"],
"readings_count": 0,
"total_distance_km": 0.0
}
self.readings_buffer = [reading]
else:
# Viaggio in corso
if self.last_reading_time:
gap = (current_time - self.last_reading_time).total_seconds()
if gap > self.GAP_TIMEOUT_SECONDS:
# Gap troppo grande: chiudi viaggio corrente
completed_trip = self._close_trip(reading)
# Inizia nuovo viaggio se in moto
if reading["speed_kmh"] > 2.0:
self.current_trip = self._start_new_trip(reading, current_time)
self.readings_buffer = [reading]
else:
self.current_trip = None
self.readings_buffer = []
else:
self.readings_buffer.append(reading)
self._update_trip_stats(reading)
self.last_reading_time = current_time
return completed_trip
def _close_trip(self, last_reading: dict) -> Optional[dict]:
if not self.current_trip:
return None
trip = {
**self.current_trip,
"end_time": last_reading["timestamp"],
"end_lat": last_reading["latitude"],
"end_lng": last_reading["longitude"],
"readings": self.readings_buffer,
"duration_seconds": (
datetime.fromisoformat(last_reading["timestamp"]) -
datetime.fromisoformat(self.current_trip["start_time"])
).total_seconds()
}
# Scarta microtrip
if trip["total_distance_km"] < self.MIN_TRIP_DISTANCE_KM:
return None
return trip
def _update_trip_stats(self, reading: dict) -> None:
"""Aggiorna statistiche del trip corrente."""
if self.readings_buffer and self.current_trip:
prev = self.readings_buffer[-1]
dist = haversine_distance(
prev["latitude"], prev["longitude"],
reading["latitude"], reading["longitude"]
)
self.current_trip["total_distance_km"] += dist
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
"""Distanza Haversine in km tra due coordinate GPS."""
R = 6371.0
import math
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
7. Confidențialitate și GDPR pentru datele de geolocalizare
Datele telematice includ geolocalizare precisă, ore, itinerarii: date personale sensibile conform GDPR. Sistemul trebuie să implementeze confidențialitatea prin proiectare de la zero.
Cerințe GDPR pentru telematică de asigurări
- Consimțământ explicit: Proprietarul trebuie să își dea în mod explicit consimțământul pentru colectarea datelor de conducere
- Minimizare: Colectați numai datele strict necesare pentru scorarea riscului
- Retenţie: Date GPS brute șterse după procesare (maxim 90 de zile); agregate stocate pe durata poliței
- Anonimizare: După calculul scorului, datele GPS pot fi agregate și anonimizate
- Dreptul de a fi uitat: API pentru ștergerea datelor la cerere
- Transferuri: Date prelucrate în UE (fără transfer în afara SEE fără garanții adecvate)
class TelematicsPrivacyService:
"""Gestisce data lifecycle nel rispetto GDPR."""
RAW_GPS_RETENTION_DAYS = 90
AGGREGATED_RETENTION_YEARS = 7 # Per audit assicurativo
async def anonymize_trip(self, trip: dict) -> dict:
"""
Anonimizza un trip rimuovendo coordinate GPS precise.
Mantiene solo dati aggregati per il risk score.
"""
return {
"trip_id": trip["trip_id"],
"policy_id": trip["policy_id"],
"date": trip["start_time"][:10],
"duration_hours": trip["duration_seconds"] / 3600,
"distance_km": trip["total_distance_km"],
# Features comportamentali aggregate (no GPS raw)
"hard_braking_count": trip.get("hard_braking_count", 0),
"speeding_pct": trip.get("speeding_pct", 0),
"night_ratio": trip.get("night_driving_ratio", 0),
# NO: coordinate GPS, itinerario, soste specifiche
}
async def delete_raw_data(self, policy_id: str) -> None:
"""Elimina tutti i dati GPS grezzi per una polizza (diritto all'oblio)."""
await self.db.execute(
"DELETE FROM raw_telemetry WHERE policy_id = $1",
policy_id
)
await self.db.execute(
"DELETE FROM gps_tracks WHERE policy_id = $1",
policy_id
)
# Mantieni solo aggregati anonimi per audit
async def purge_expired_raw_data(self) -> int:
"""Job giornaliero: elimina dati GPS grezzi scaduti."""
from datetime import date, timedelta
cutoff = date.today() - timedelta(days=self.RAW_GPS_RETENTION_DAYS)
result = await self.db.execute(
"DELETE FROM raw_telemetry WHERE created_at < $1",
cutoff
)
return result
8. Tabloul de bord și feedbackul șoferului
Angajamentul șoferului este fundamental pentru succesul UBI. Șoferii care primesc feedback-ul în timp real îmbunătățește comportamentul de conducere cu 15-20% (date AXA 2024). Tabloul de bord mobil trebuie să arate scoruri, tendințe și recomandări personalizate:
class DrivingFeedbackGenerator:
"""Genera feedback personalizzato per il guidatore."""
IMPROVEMENT_TIPS = {
"hard_braking": [
"Mantieni una distanza di sicurezza maggiore dal veicolo precedente",
"Anticipa le frenate osservando il traffico a distanza",
"Riduci la velocità in anticipo agli incroci"
],
"speeding": [
"Usa il cruise control in autostrada",
"Imposta un avviso velocità a 10 km/h sopra il limite",
"Considera che la multa media costa più dello sconto UBI"
],
"night_driving": [
"Pianifica viaggi lunghi durante il giorno quando possibile",
"Se guidi di notte, fai pause ogni 2 ore"
]
}
def generate_weekly_report(
self,
policy_id: str,
current_score: DrivingScore,
weekly_trips: list[dict]
) -> dict:
"""Genera report settimanale per il guidatore."""
worst_behavior = self._identify_worst_behavior(weekly_trips)
tips = [
self.IMPROVEMENT_TIPS[b][0]
for b in worst_behavior[:2]
if b in self.IMPROVEMENT_TIPS
]
return {
"policy_id": policy_id,
"week_score": current_score.score,
"score_category": current_score.category,
"premium_factor": current_score.premium_factor,
"estimated_annual_saving": self._estimate_saving(current_score),
"trips_this_week": len(weekly_trips),
"total_km_this_week": sum(t["distance_km"] for t in weekly_trips),
"improvement_tips": tips,
"percentile_rank": f"Guidi meglio del {current_score.percentile:.0f}% degli assicurati"
}
def _identify_worst_behavior(self, trips: list[dict]) -> list[str]:
behaviors = []
avg_hard_braking = sum(
t.get("hard_braking_per_100km", 0) for t in trips
) / max(len(trips), 1)
avg_speeding = sum(
t.get("speeding_events_per_100km", 0) for t in trips
) / max(len(trips), 1)
night_ratio = sum(
t.get("night_driving_ratio", 0) for t in trips
) / max(len(trips), 1)
if avg_hard_braking > 3:
behaviors.append("hard_braking")
if avg_speeding > 2:
behaviors.append("speeding")
if night_ratio > 0.3:
behaviors.append("night_driving")
return behaviors
def _estimate_saving(self, score: DrivingScore) -> float:
base_premium = 600.0 # Premio medio auto in Italia
return round(base_premium * (1 - score.premium_factor), 0)
9. Arhitectura sistemului complet
Arhitectura completă a sistemului telematic UBI este alcătuită din următoarele straturi:
| Straturi | Componente | Tehnologie | Latența |
|---|---|---|---|
| Margine | Dongle OBD, SDK pentru smartphone | C/ARM, SDK iOS/Android | 100 ms |
| Ingestie | Broker MQTT, API REST | AWS IoT Core / EMQX | <1s |
| Streaming | Procesarea evenimentelor, detectarea călătoriilor | Apache Kafka + Flink | <5s |
| Scor ML | Magazin de caracteristici, motor de punctare | Sărbătoare + MLflow | <100 ms |
| Depozitare | Serii temporale, agregate | InfluxDB + PostgreSQL | - |
| Servire | Score API, tabloul de bord pentru șofer | FastAPI + React Native | <200 ms |
Concluzii
Conducta telematică UBI este un sistem de calcul distribuit care atinge edge computing, procesare în flux, învățare automată și inginerie de confidențialitate. Piața de 62,6 miliarde de dolari (2025) cu CAGR 24,8% reflectă interesul industriei pentru această personalizare de risc.
Punctele critice sunt ingineria caracteristicilor pentru comportamentul de condus (frânare puternică, depășirea vitezei, conducerea pe timp de noapte), calculul scorului normalizat de conducere pe flotă și gestionarea responsabilă a datelor de geolocalizare în conformitate cu GDPR. Feedback-ul Pentru șofer nu este doar un plus: este cea mai eficientă pârghie pentru îmbunătățirea comportamentului și reducerea accidentelor din flotă.
Articole viitoare din seria InsurTech
- Subscriere AI: ingineria caracteristicilor și scorul de risc
- Automatizarea revendicărilor: Computer Vision și NLP
- Detectarea fraudelor: analiză grafică și semnal comportamental







