Telematik İşlem Hattı: Geniş Ölçekte UBI Veri İşleme
2025 yılına kadar 21 milyondan fazla Amerikalı sürücü telematik verilerini kendileriyle paylaşacak sigorta. AXA, Avrupa'da Ocak 2025'te pazarlar için yapay zeka odaklı bir UBI çözümü başlattı OBD-II, akıllı telefon tabanlı telematik ve risk tahmin modellerini entegre eden Avrupalılar. UBI (Kullanım Bazlı Sigorta) pazarı 2025'te 62,6 milyar dolar değerinde ve 2025'te büyüyecek 2035'e kadar yıllık bileşik büyüme oranı %24,8.
Sigorta telematiği yalnızca GPS verilerinin toplanması değildir: Sigorta telematiği, OBD-II cihazlarından, akıllı telefonlardan ve IoT sensörlerinden gelen ham sinyalleri bireysel risk puanları her bir sigortalının primini belirler. Kötü sürücü daha çok öder, iyi sürücü tasarruf eder. Prensip basittir; teknik uygulama ve diğer her şey.
Bu makalede eksiksiz bir telematik hattı oluşturuyoruz: veri alımından itibaren OBD-II, gerçek zamanlı risk puanlamasından sürüş davranışı mühendisliğine sahip olacak ölçeklenebilirlik, gecikme ve gizlilik hususlarını içeren dinamik fiyatlandırma.
Ne Öğreneceksiniz
- UBI (Pay-How-You-Drive) için telematik ardışık düzen mimarisi
- OBD-II protokolü: parametreler, örnekleme frekansları, kenar işleme
- Sürüş davranışına yönelik özellik mühendisliği (sert frenleme, viraj alma, hızlanma)
- Apache Kafka ve Apache Flink ile akış işleme
- Gerçek zamanlı risk puanlaması ve zamansal toplamalar
- Sürüş puanına dayalı dinamik fiyatlandırma
- Coğrafi konum verileriyle ilgili GDPR hususları
1. Sigorta Telematiği Türleri
Telematik veri toplamanın üç ana modeli vardır ve bunlar arasında farklı değiş-tokuşlar vardır. doğruluk, dağıtım maliyeti ve müşteri benimsemesi:
| Tip | Cihaz | Toplanan Veriler | Kesinlik | Maliyet |
|---|---|---|---|---|
| OBD-II Dongle'ı | OBD bağlantı noktası eklentisi | ECU, hız, RPM, hızlanma | Yüksek | €20-50 Donanım |
| Akıllı Telefon Uygulaması | iOS/Android uygulamaları | GPS, ivmeölçer, jiroskop | Ortalama | Sıfır donanım |
| Kara Kutu OEM | OEM cihazı kuruldu | ECU + GPS + ivmeölçer | Çok yüksek | €100-300 |
| Bağlantılı Araç API'si | Araç yerel API'si | Tüm yerel telematik | Maksimum | OEM Anlaşması |
2. OBD-II Veri Yapısı
OBD-II (Yerleşik Teşhis II) protokolü, PID aracılığıyla düzinelerce ECU parametresini açığa çıkarır (Parametre kimlikleri). Sigorta riski puanlaması için en alakalı PID'ler şunlardır:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class OBDReading:
"""Singola lettura OBD-II dal veicolo."""
device_id: str # ID univoco del dispositivo
policy_id: str # Polizza associata
timestamp: datetime # UTC timestamp
latitude: float # Gradi decimali (WGS84)
longitude: float # Gradi decimali (WGS84)
speed_kmh: float # Velocita in km/h (PID 0x0D)
rpm: int # Giri per minuto (PID 0x0C)
throttle_pct: float # Posizione acceleratore % (PID 0x11)
engine_load_pct: float # Carico motore % (PID 0x04)
coolant_temp_c: int # Temperatura liquido (PID 0x05)
# Calcolati lato edge (non nativi OBD)
acceleration_ms2: Optional[float] = None # m/s^2 (+/-)
heading_deg: Optional[float] = None # Direzione 0-360
road_type: Optional[str] = None # "URBAN", "RURAL", "HIGHWAY"
@dataclass
class TripSummary:
"""Riepilogo di un singolo viaggio."""
trip_id: str
policy_id: str
device_id: str
start_time: datetime
end_time: datetime
distance_km: float
duration_minutes: float
# Metrics comportamentali (calcolati dalla pipeline)
max_speed_kmh: float
avg_speed_kmh: float
hard_braking_count: int # Decelerazioni > 0.3g
hard_acceleration_count: int # Accelerazioni > 0.3g
hard_cornering_count: int # Laterale > 0.3g
speeding_pct: float # % tempo sopra limite
night_driving_pct: float # % km notturni (22:00-06:00)
highway_pct: float # % km autostrada
# Score aggregati
safety_score: float # 0-100 (100 = ottimo)
fuel_efficiency_score: float # 0-100
3. Cihazda Kenar İşleme
Ham OBD-II verilerinin buluta gönderilmeden önce uçta önceden işlenmesi gerekir. Bu, gerekli bant genişliğini azaltır ve ayrı olayları (sert frenleme, hızlanma) gerçek zamanlı olarak hesaplar. Edge işlemcisi genellikle OBD cihazındaki bir ARM mikrokontrolcüsü üzerinde çalışır:
from collections import deque
from dataclasses import dataclass
import math
class EdgeEventDetector:
"""
Algoritmo di detection eventi di guida.
Gira sul dispositivo embedded (ARM Cortex-M).
Sampling rate: 10 Hz (una lettura ogni 100ms)
"""
HARD_BRAKING_THRESHOLD = -3.0 # m/s^2 (0.3g)
HARD_ACCEL_THRESHOLD = 3.0 # m/s^2 (0.3g)
HARD_CORNERING_THRESHOLD = 3.0 # m/s^2 laterale
def __init__(self, window_size: int = 5):
self.speed_history = deque(maxlen=window_size)
self.events: list = []
self.prev_timestamp = None
def process_reading(self, reading: dict) -> list:
"""
Processa una singola lettura e rileva eventi.
Restituisce lista di eventi rilevati (vuota se nessuno).
"""
detected = []
current_speed = reading["speed_kmh"]
current_ts = reading["timestamp"]
# Calcola accelerazione longitudinale
if self.speed_history and self.prev_timestamp:
prev_speed = self.speed_history[-1]
dt = (current_ts - self.prev_timestamp).total_seconds()
if dt > 0:
# Conversione km/h -> m/s
delta_v = (current_speed - prev_speed) / 3.6
acceleration = delta_v / dt
if acceleration <= self.HARD_BRAKING_THRESHOLD:
detected.append({
"type": "HARD_BRAKING",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
elif acceleration >= self.HARD_ACCEL_THRESHOLD:
detected.append({
"type": "HARD_ACCELERATION",
"timestamp": current_ts.isoformat(),
"value": round(acceleration, 2),
"speed_at_event": current_speed
})
self.speed_history.append(current_speed)
self.prev_timestamp = current_ts
return detected
def detect_speeding(
self,
speed_kmh: float,
speed_limit_kmh: float,
tolerance_pct: float = 0.10
) -> bool:
"""Rileva superamento limite con tolleranza del 10%."""
threshold = speed_limit_kmh * (1 + tolerance_pct)
return speed_kmh > threshold
class EdgeBatchManager:
"""
Gestisce il buffering e l'invio batch al cloud.
Strategia: invia ogni 30 secondi o al termine del viaggio.
"""
BATCH_INTERVAL_SECONDS = 30
MAX_BATCH_SIZE = 300 # max readings per batch
def __init__(self, uploader):
self.buffer: list = []
self.events: list = []
self.last_send = None
self.uploader = uploader
def add_reading(self, reading: dict, events: list) -> None:
self.buffer.append(reading)
self.events.extend(events)
if len(self.buffer) >= self.MAX_BATCH_SIZE:
self._flush_async()
def _flush_async(self) -> None:
if not self.buffer:
return
payload = {
"readings": list(self.buffer),
"events": list(self.events),
"compressed": True
}
# Invia compresso (gzip) via MQTT o HTTPS
self.uploader.send(payload)
self.buffer.clear()
self.events.clear()
4. Apache Kafka ve Apache Flink ile Akış Ardışık Düzeni
Bulut işlem hattı partileri uçtan alır ve bunları hesaplamaya aktarır gezi özetleri, zaman aralıklarındaki olayları toplu hale getirme ve yakın gelecekte risk puanlarını güncelleme gerçek zamanlı. Mimari, mesaj veriyolu olarak Apache Kafka'yı ve mesaj veri yolu olarak Apache Flink'i temel alır. akış işleme:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, Types
import json
from datetime import datetime
def build_telematics_pipeline():
"""
Pipeline Flink per processing dati telematici.
Topic Kafka:
- telemetry.raw : letture OBD grezze
- telemetry.events : eventi discreti (hard braking, etc.)
- telemetry.trips : trip summaries
- risk.scores : driving scores aggiornati
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 1. Source: letture OBD-II raw
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("telemetry.raw")
.set_group_id("telematics-processor")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
raw_stream = env.from_source(
kafka_source,
WatermarkStrategy.for_monotonous_timestamps(),
"OBD Raw Source"
)
# 2. Parse e validazione
parsed_stream = raw_stream.map(lambda s: json.loads(s)) \
.filter(lambda r: validate_reading(r))
# 3. Key by policy_id per partitioning
keyed_stream = parsed_stream.key_by(lambda r: r["policy_id"])
# 4. Trip detection tramite session windows
# Una sessione si chiude dopo 5 minuti di inattivita
trip_stream = keyed_stream \
.window(SessionWindows.with_gap(Time.minutes(5))) \
.apply(TripAggregator())
# 5. Feature engineering per ogni trip
features_stream = trip_stream.map(lambda t: extract_features(t))
# 6. Aggiorna driving score (aggregazione 90 giorni rolling)
score_stream = features_stream \
.key_by(lambda f: f["policy_id"]) \
.window(TumblingEventTimeWindows.of(Time.days(1))) \
.reduce(RollingScoreReducer())
# 7. Sink: aggiorna Redis e pubblica evento
score_stream.add_sink(RedisSink())
score_stream.add_sink(KafkaSink("risk.scores"))
env.execute("Telematics Processing Pipeline")
def validate_reading(reading: dict) -> bool:
"""Validazione basica di una lettura OBD."""
required_fields = ["device_id", "policy_id", "timestamp", "speed_kmh"]
if not all(f in reading for f in required_fields):
return False
if not (0 <= reading["speed_kmh"] <= 300):
return False
if "latitude" in reading and not (-90 <= reading["latitude"] <= 90):
return False
return True
def extract_features(trip: dict) -> dict:
"""
Estrae feature per risk scoring da un trip summary.
Output: feature vector per il modello ML.
"""
readings = trip["readings"]
events = trip["events"]
duration_sec = trip["duration_seconds"]
hard_braking = [e for e in events if e["type"] == "HARD_BRAKING"]
hard_accel = [e for e in events if e["type"] == "HARD_ACCELERATION"]
speeding = [e for e in events if e["type"] == "SPEEDING"]
return {
"policy_id": trip["policy_id"],
"trip_id": trip["trip_id"],
"trip_date": trip["start_time"][:10],
# Feature comportamentali (normalizzate per 100km)
"hard_braking_per_100km": len(hard_braking) / max(trip["distance_km"], 0.1) * 100,
"hard_accel_per_100km": len(hard_accel) / max(trip["distance_km"], 0.1) * 100,
"speeding_events_per_100km": len(speeding) / max(trip["distance_km"], 0.1) * 100,
# Feature temporali
"night_driving_ratio": compute_night_ratio(readings),
"rush_hour_ratio": compute_rush_hour_ratio(readings),
"weekend_ratio": compute_weekend_ratio(readings),
# Feature cinematiche
"avg_speed_kmh": trip.get("avg_speed_kmh", 0),
"max_speed_kmh": trip.get("max_speed_kmh", 0),
"avg_acceleration": compute_avg_acceleration(readings),
# Feature contesto
"highway_ratio": trip.get("highway_pct", 0) / 100,
"urban_ratio": trip.get("urban_pct", 0) / 100,
"distance_km": trip["distance_km"],
"duration_hours": duration_sec / 3600
}
def compute_night_ratio(readings: list) -> float:
"""Calcola frazione dei km percorsi di notte (22:00-06:00)."""
if not readings:
return 0.0
night_readings = [
r for r in readings
if int(r["timestamp"][11:13]) >= 22 or int(r["timestamp"][11:13]) < 6
]
return len(night_readings) / len(readings)
5. Risk Puanlama Modeli: Sürüş Davranışından Ödüllere
UBI sisteminin kalbi, davranış özelliklerini sürüş puanına dönüştüren modeldir. ve daha sonra prim ayarlama faktörüne dönüşür. Sektörde en çok kullanılan model XGBoost topluluğu + iş kuralları:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
import xgboost as xgb
from typing import NamedTuple
class DrivingScore(NamedTuple):
"""Driving score aggregato su finestra rolling 90 giorni."""
policy_id: str
score: float # 0-100 (100 = guida eccellente)
percentile: float # Percentile rispetto alla flotta
category: str # "EXCELLENT", "GOOD", "FAIR", "POOR", "DANGEROUS"
premium_factor: float # Moltiplicatore premio (es. 0.85 = -15%)
class DrivingScoreCalculator:
"""
Calcola il driving score aggregando features su finestra 90 giorni.
Architettura: features -> normalization -> weighted scoring -> premium mapping
"""
WEIGHTS = {
"hard_braking_per_100km": -0.30, # Impatto negativo: 30%
"hard_accel_per_100km": -0.15, # Impatto negativo: 15%
"speeding_events_per_100km": -0.25, # Impatto negativo: 25%
"night_driving_ratio": -0.15, # Impatto negativo: 15%
"avg_speed_kmh": -0.10, # Impatto negativo: 10%
"highway_ratio": 0.05, # Bonus autostrada: +5%
}
# Soglie per conversione score -> category
CATEGORY_THRESHOLDS = [
(90, "EXCELLENT"),
(75, "GOOD"),
(55, "FAIR"),
(35, "POOR"),
(0, "DANGEROUS"),
]
# Mappa category -> premium discount/surcharge
PREMIUM_FACTORS = {
"EXCELLENT": 0.75, # -25% sconto
"GOOD": 0.90, # -10% sconto
"FAIR": 1.00, # Premio base
"POOR": 1.15, # +15% sovrapprezzo
"DANGEROUS": 1.35, # +35% sovrapprezzo
}
def __init__(self, fleet_stats: dict):
"""
fleet_stats: statistiche aggregate della flotta per normalizzazione
Es: {"hard_braking_per_100km": {"mean": 2.5, "std": 1.2}, ...}
"""
self.fleet_stats = fleet_stats
def calculate(self, features_90d: dict) -> DrivingScore:
"""
Calcola il driving score aggregato su 90 giorni.
features_90d: dizionario con medie delle feature sul periodo
"""
# Normalizza features rispetto alla flotta
normalized = self._normalize_features(features_90d)
# Calcola score pesato (base 100)
raw_score = 100.0
for feature, weight in self.WEIGHTS.items():
if feature in normalized:
z_score = normalized[feature]
# z-score positivo su feature negative peggiora lo score
raw_score += weight * z_score * 10
# Clamp tra 0 e 100
score = float(np.clip(raw_score, 0, 100))
# Determina categoria
category = self._get_category(score)
# Calcola percentile rispetto alla flotta
percentile = self._get_percentile(score)
return DrivingScore(
policy_id=features_90d["policy_id"],
score=round(score, 1),
percentile=round(percentile, 1),
category=category,
premium_factor=self.PREMIUM_FACTORS[category]
)
def _normalize_features(self, features: dict) -> dict:
normalized = {}
for key, value in features.items():
if key in self.fleet_stats:
stats = self.fleet_stats[key]
std = stats.get("std", 1.0)
if std > 0:
normalized[key] = (value - stats["mean"]) / std
else:
normalized[key] = 0.0
return normalized
def _get_category(self, score: float) -> str:
for threshold, category in self.CATEGORY_THRESHOLDS:
if score >= threshold:
return category
return "DANGEROUS"
def _get_percentile(self, score: float) -> float:
# Approssimazione: usa distribuzione normale con media=65, std=15
from scipy.stats import norm
return float(norm.cdf(score, loc=65, scale=15) * 100)
class UBIPremiumCalculator:
"""Calcola il premio UBI finale applicando il driving score."""
def calculate_ubi_premium(
self,
base_premium: float,
driving_score: DrivingScore,
distance_km: float,
base_distance_km: float = 15000.0
) -> dict:
"""
Premio UBI = Base * FactoreGuida * FattoreDistanza
- Fattore guida: basato su driving score (0.75-1.35)
- Fattore distanza: aggiustamento per km effettivi vs. dichiarati
"""
# Fattore distanza (max ±20%)
distance_ratio = distance_km / base_distance_km
distance_factor = np.clip(distance_ratio, 0.8, 1.2)
# Premio finale
ubi_premium = base_premium * driving_score.premium_factor * distance_factor
return {
"base_premium": round(base_premium, 2),
"driving_score": driving_score.score,
"driving_factor": driving_score.premium_factor,
"distance_km": distance_km,
"distance_factor": round(distance_factor, 3),
"final_premium": round(ubi_premium, 2),
"saving_vs_base": round(base_premium - ubi_premium, 2),
"category": driving_score.category
}
6. Hata Tespiti: Oturum Pencereleri ve Hassas Açma Sinyalleri
Bir yolculuğun başlangıcını ve sonunu tespit etmek göründüğünden daha karmaşıktır. Araç şunları yapabilir: Trafik ışıklarında durmak, kısa bir süre durmak veya yer altı garajına girip fırsatı kaçırmak GPS sinyali. Açma tespit mantığı, yapılandırılabilir boşluk zaman aşımına sahip oturum pencerelerini kullanır:
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class TripDetector:
"""
Stato macchina per rilevamento trip.
Usa session window: gap > 5 minuti = fine viaggio.
"""
GAP_TIMEOUT_SECONDS: int = 300 # 5 minuti
MIN_TRIP_DISTANCE_KM: float = 0.5 # Ignora microtrip
policy_id: str
current_trip: Optional[dict] = None
last_reading_time: Optional[datetime] = None
readings_buffer: list = field(default_factory=list)
def process_reading(self, reading: dict) -> Optional[dict]:
"""
Processa una lettura e restituisce il trip completato se rilevato.
"""
current_time = datetime.fromisoformat(reading["timestamp"])
completed_trip = None
if self.current_trip is None:
# Inizio nuovo viaggio
if reading["speed_kmh"] > 2.0:
self.current_trip = {
"trip_id": f"trip-{self.policy_id}-{current_time.strftime('%Y%m%d%H%M%S')}",
"policy_id": self.policy_id,
"start_time": current_time.isoformat(),
"start_lat": reading["latitude"],
"start_lng": reading["longitude"],
"readings_count": 0,
"total_distance_km": 0.0
}
self.readings_buffer = [reading]
else:
# Viaggio in corso
if self.last_reading_time:
gap = (current_time - self.last_reading_time).total_seconds()
if gap > self.GAP_TIMEOUT_SECONDS:
# Gap troppo grande: chiudi viaggio corrente
completed_trip = self._close_trip(reading)
# Inizia nuovo viaggio se in moto
if reading["speed_kmh"] > 2.0:
self.current_trip = self._start_new_trip(reading, current_time)
self.readings_buffer = [reading]
else:
self.current_trip = None
self.readings_buffer = []
else:
self.readings_buffer.append(reading)
self._update_trip_stats(reading)
self.last_reading_time = current_time
return completed_trip
def _close_trip(self, last_reading: dict) -> Optional[dict]:
if not self.current_trip:
return None
trip = {
**self.current_trip,
"end_time": last_reading["timestamp"],
"end_lat": last_reading["latitude"],
"end_lng": last_reading["longitude"],
"readings": self.readings_buffer,
"duration_seconds": (
datetime.fromisoformat(last_reading["timestamp"]) -
datetime.fromisoformat(self.current_trip["start_time"])
).total_seconds()
}
# Scarta microtrip
if trip["total_distance_km"] < self.MIN_TRIP_DISTANCE_KM:
return None
return trip
def _update_trip_stats(self, reading: dict) -> None:
"""Aggiorna statistiche del trip corrente."""
if self.readings_buffer and self.current_trip:
prev = self.readings_buffer[-1]
dist = haversine_distance(
prev["latitude"], prev["longitude"],
reading["latitude"], reading["longitude"]
)
self.current_trip["total_distance_km"] += dist
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
"""Distanza Haversine in km tra due coordinate GPS."""
R = 6371.0
import math
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
7. Coğrafi Konum Verilerine İlişkin Gizlilik ve GDPR
Telematik veriler kesin coğrafi konumu, saatleri, seyahat planlarını içerir: kişisel veriler GDPR kapsamında hassas. Sistem, tasarım gereği gizliliği baştan sona uygulamalıdır.
Sigorta Telematiği için GDPR Gereksinimleri
- Açık rıza: Araç sahibi, sürüş verilerinin toplanmasına açıkça izin vermelidir
- Minimizasyon: Yalnızca risk puanlaması için kesinlikle gerekli olan verileri toplayın
- Tutulma: Ham GPS verileri işlendikten sonra silinir (maks. 90 gün); Politika süresince saklanan toplamlar
- Anonimleştirme: Puan hesaplamasının ardından GPS verileri toplanabilir ve anonimleştirilebilir
- Unutulma hakkı: İsteğe bağlı veri silmeye yönelik API
- Transferler: AB'de işlenen veriler (yeterli garantiler olmadan AEA dışına aktarım yapılamaz)
class TelematicsPrivacyService:
"""Gestisce data lifecycle nel rispetto GDPR."""
RAW_GPS_RETENTION_DAYS = 90
AGGREGATED_RETENTION_YEARS = 7 # Per audit assicurativo
async def anonymize_trip(self, trip: dict) -> dict:
"""
Anonimizza un trip rimuovendo coordinate GPS precise.
Mantiene solo dati aggregati per il risk score.
"""
return {
"trip_id": trip["trip_id"],
"policy_id": trip["policy_id"],
"date": trip["start_time"][:10],
"duration_hours": trip["duration_seconds"] / 3600,
"distance_km": trip["total_distance_km"],
# Features comportamentali aggregate (no GPS raw)
"hard_braking_count": trip.get("hard_braking_count", 0),
"speeding_pct": trip.get("speeding_pct", 0),
"night_ratio": trip.get("night_driving_ratio", 0),
# NO: coordinate GPS, itinerario, soste specifiche
}
async def delete_raw_data(self, policy_id: str) -> None:
"""Elimina tutti i dati GPS grezzi per una polizza (diritto all'oblio)."""
await self.db.execute(
"DELETE FROM raw_telemetry WHERE policy_id = $1",
policy_id
)
await self.db.execute(
"DELETE FROM gps_tracks WHERE policy_id = $1",
policy_id
)
# Mantieni solo aggregati anonimi per audit
async def purge_expired_raw_data(self) -> int:
"""Job giornaliero: elimina dati GPS grezzi scaduti."""
from datetime import date, timedelta
cutoff = date.today() - timedelta(days=self.RAW_GPS_RETENTION_DAYS)
result = await self.db.execute(
"DELETE FROM raw_telemetry WHERE created_at < $1",
cutoff
)
return result
8. Kontrol Paneli ve Sürücü Geri Bildirimi
Sürücü katılımı UBI başarısının temelidir. Alınan sürücüler gerçek zamanlı geri bildirim, sürüş davranışını %15-20 oranında iyileştirir (AXA verileri 2024). Mobil kontrol paneli puanları, trendleri ve kişiselleştirilmiş önerileri göstermelidir:
class DrivingFeedbackGenerator:
"""Genera feedback personalizzato per il guidatore."""
IMPROVEMENT_TIPS = {
"hard_braking": [
"Mantieni una distanza di sicurezza maggiore dal veicolo precedente",
"Anticipa le frenate osservando il traffico a distanza",
"Riduci la velocità in anticipo agli incroci"
],
"speeding": [
"Usa il cruise control in autostrada",
"Imposta un avviso velocità a 10 km/h sopra il limite",
"Considera che la multa media costa più dello sconto UBI"
],
"night_driving": [
"Pianifica viaggi lunghi durante il giorno quando possibile",
"Se guidi di notte, fai pause ogni 2 ore"
]
}
def generate_weekly_report(
self,
policy_id: str,
current_score: DrivingScore,
weekly_trips: list[dict]
) -> dict:
"""Genera report settimanale per il guidatore."""
worst_behavior = self._identify_worst_behavior(weekly_trips)
tips = [
self.IMPROVEMENT_TIPS[b][0]
for b in worst_behavior[:2]
if b in self.IMPROVEMENT_TIPS
]
return {
"policy_id": policy_id,
"week_score": current_score.score,
"score_category": current_score.category,
"premium_factor": current_score.premium_factor,
"estimated_annual_saving": self._estimate_saving(current_score),
"trips_this_week": len(weekly_trips),
"total_km_this_week": sum(t["distance_km"] for t in weekly_trips),
"improvement_tips": tips,
"percentile_rank": f"Guidi meglio del {current_score.percentile:.0f}% degli assicurati"
}
def _identify_worst_behavior(self, trips: list[dict]) -> list[str]:
behaviors = []
avg_hard_braking = sum(
t.get("hard_braking_per_100km", 0) for t in trips
) / max(len(trips), 1)
avg_speeding = sum(
t.get("speeding_events_per_100km", 0) for t in trips
) / max(len(trips), 1)
night_ratio = sum(
t.get("night_driving_ratio", 0) for t in trips
) / max(len(trips), 1)
if avg_hard_braking > 3:
behaviors.append("hard_braking")
if avg_speeding > 2:
behaviors.append("speeding")
if night_ratio > 0.3:
behaviors.append("night_driving")
return behaviors
def _estimate_saving(self, score: DrivingScore) -> float:
base_premium = 600.0 # Premio medio auto in Italia
return round(base_premium * (1 - score.premium_factor), 0)
9. Komple Sistem Mimarisi
UBI telematik sisteminin tüm mimarisi aşağıdaki katmanlardan oluşur:
| Katmanlar | Bileşenler | Teknoloji | Gecikme |
|---|---|---|---|
| Kenar | OBD donanım kilidi, akıllı telefon SDK'sı | C/ARM, iOS/Android SDK'sı | 100ms |
| Yutma | MQTT komisyoncusu, REST API | AWS IoT Core / EMQX | <1sn |
| Akış | Olay işleme, açma tespiti | Apache Kafka + Flink | <5sn |
| ML Puanlaması | Özellik deposu, puanlama motoru | Bayram + MLflow | <100ms |
| Depolamak | Zaman serileri, toplamlar | InfluxDB + PostgreSQL | - |
| Servis | Puan API'si, sürücü kontrol paneli | FastAPI + Yerel Tepki | <200ms |
Sonuçlar
UBI telematik boru hattı, uç bilişime dokunan dağıtılmış bir bilgi işlem sistemidir. akış işleme, makine öğrenimi ve gizlilik mühendisliği. 62,6 milyarlık pazar %24,8 CAGR ile dolar (2025) sektörün bu özelleştirmeye olan ilgisini yansıtıyor risk.
Kritik noktalar, sürüş davranışına yönelik özellik mühendisliğidir (sert frenleme, hız yapma, gece sürüşü), filodaki normalleştirilmiş sürüş puanının hesaplanması ve GDPR'ye uygun olarak coğrafi konum verilerinin sorumlu yönetimi. Geri bildirim Sürücü için bu sadece bir artı değildir: Davranışı iyileştirmenin en etkili aracıdır. ve filo kazalarını azaltın.
InsurTech Serisinde Gelecek Makaleler
- Yapay Zeka Sigortacılığı: Özellik Mühendisliği ve Risk Puanlaması
- Talep Otomasyonu: Bilgisayarla Görme ve NLP
- Dolandırıcılık Tespiti: Grafik Analizi ve Davranışsal Sinyal







