Detectie van verzekeringsfraude: grafiekanalyse en gedragssignaal
Verzekeringsfraude is een systemisch en mondiaal probleem. Volgens schattingen van de sector is tussen 10% en 15% van de jaarlijks uitbetaalde claims bevat elementen van fraude — voor een totale kostprijs die op meer dan dat wordt geschat 80 miljard dollar per jaar wereldwijd. In Europa schat Insurance Europe verliezen door fraude op ca 13 miljard euro per jaar. Deze kosten hebben onvermijdelijk gevolgen voor premies betaald door eerlijke klanten, waardoor fraudedetectie niet alleen een kwestie van doen is de winstgevendheid van bedrijven, maar ook de eerlijkheid van het verzekeringsstelsel.
Verzekeringsfraude kent verschillende vormen: van opportunistische fraude (overdrijven echte schade), tot geplande fraude (georganiseerde ongelukken, niet-bestaande claims), tot tot systematische fraude, georkestreerd door criminele netwerken waarbij artsen, carrosseriebedrijven, advocaten en verzekerde medeplichtigen. Deze laatste soort – georganiseerde fraude – is het schadelijkst economisch gezien en het moeilijkst te detecteren met traditionele methoden.
Moderne AI, vooral modellen van grafiekanalyse voor detectie van frauduleuze netwerken en de modellen daarvan gedragssignaalanalyse voor identificatie van afwijkende patronen heeft een revolutie teweeggebracht in het vermogen van bedrijven om fraude op te sporen en te voorkomen. De analysemarkt voor de detectie van verzekeringsfraude groeit gestaag CAGR van 33% (2025-2032), aangestuurd door GNN (Graph Neural Networks), ensemble- en streammodellen realtime verwerking.
Wat je gaat leren
- Taxonomie en detectiepatronen van verzekeringsfraude
- Gedragsfunctie-engineering voor het scoren van fraude
- Grafiekanalyse voor de detectie van georganiseerde frauduleuze netwerken
- Graph Neural Networks (GNN) voor detectie van verzekeringsfraude
- Ensemblemodellen: XGBoost + Random Forest + Isolation Forest
- Realtime fraudescores met Apache Kafka en Faust
- SIU (Special Investigation Unit) workflow en casemanagement
Taxonomie van verzekeringsfraude
Begrijp de soorten fraude en de voorwaarde voor het ontwerpen van effectieve detectiesystemen. Elk type heeft verschillende patronen die verschillende ML-benaderingen vereisen.
Soorten fraude en detectietechnieken
| Soort fraude | Voorbeeld | Frequentie | ML-aanpak |
|---|---|---|---|
| Opportunistisch | De schade van een echt ongeval overdrijven | Hoog | Anomaliedetectie, AI-schadeschatting |
| Individueel gepland | Brandstichting van uw eigen voertuig | Gemiddeld | Gedragssignalen, linkanalyse |
| Georganiseerd/bel | Netwerken van gesimuleerde ongelukken met meerdere medeplichtigen | Lage (maar hoge impact) | Grafiekanalyse, GNN, communitydetectie |
| Intern (Insider) | Agent die valse claims goedkeurt | Zeer laag | Analyse van gebruikersgedrag, netwerkanalyse |
| Synthetische identiteit | Beleid ondertekend met valse gegevens | Groeien | Identiteitsverificatie ML, grafieklinks |
Gedragsfunctie-engineering voor fraudescores
De meest voorspellende kenmerken voor fraudedetectie zijn niet de demografische basiskenmerken (leeftijd, geslacht, beroep), maar degenen die de gedrag van de aanvrager: timing van het rapport, samenhang van het verhaal, geschiedenis van interacties met het bedrijf, relaties met andere partijen bij ongevallen.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, date, timedelta
@dataclass
class ClaimContext:
"""Contesto completo di un sinistro per fraud scoring."""
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
incident_date: date
report_date: date
incident_day_of_week: int
incident_hour: int
location_zip: str
reported_amount: float
third_party_ids: List[str]
repair_shop_id: Optional[str]
attorney_id: Optional[str]
medical_provider_id: Optional[str]
class FraudFeatureEngineer:
"""
Feature engineering per fraud detection assicurativo.
Costruisce feature comportamentali, temporali e relazionali
per catturare i pattern tipici della frode assicurativa.
"""
def __init__(
self,
historical_claims: pd.DataFrame,
policy_db: pd.DataFrame,
) -> None:
self.historical = historical_claims
self.policy_db = policy_db
def build_features(self, ctx: ClaimContext) -> Dict[str, float]:
features: Dict[str, float] = {}
features.update(self._temporal_features(ctx))
features.update(self._behavioral_features(ctx))
features.update(self._network_features(ctx))
features.update(self._policy_features(ctx))
features.update(self._claim_amount_features(ctx))
return features
def _temporal_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature temporali: quando e avvenuto l'incidente e quando e stato denunciato."""
report_delay_days = (ctx.report_date - ctx.incident_date).days
return {
# Pattern sospetto: denunciare tardi o molto in anticipo
"report_delay_days": float(report_delay_days),
"report_delay_over_30": float(report_delay_days > 30),
"report_delay_over_7": float(report_delay_days > 7),
"same_day_report": float(report_delay_days == 0),
# Pattern sospetto: incidenti nel weekend o di notte
"incident_weekend": float(ctx.incident_day_of_week >= 5),
"incident_night": float(ctx.incident_hour < 6 or ctx.incident_hour >= 22),
"incident_monday": float(ctx.incident_day_of_week == 0),
# Fine mese = pressione finanziaria?
"incident_end_month": float(ctx.incident_date.day >= 25),
}
def _behavioral_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature comportamentali basate sullo storico del richiedente."""
claimant_history = self.historical[
self.historical["claimant_id"] == ctx.claimant_id
]
n_prior_claims = len(claimant_history)
n_prior_fraudulent = claimant_history.get("is_fraud", pd.Series([0])).sum()
n_claims_12m = len(claimant_history[
claimant_history["incident_date"] >=
(ctx.incident_date - timedelta(days=365)).strftime("%Y-%m-%d")
]) if not claimant_history.empty else 0
return {
"prior_claims_total": float(n_prior_claims),
"prior_fraud_confirmed": float(n_prior_fraudulent),
"claims_last_12m": float(n_claims_12m),
"high_claim_frequency": float(n_claims_12m >= 2),
"repeat_claimant": float(n_prior_claims >= 3),
"fraud_history": float(n_prior_fraudulent > 0),
# Cambi polizza frequenti = comportamento anomalo?
"policy_age_days": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days
),
"new_policy_claim": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days < 90
),
}
def _network_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""
Feature di rete: connessioni sospette con altri attori.
Identifica se il richiedente e connesso a terze parti, officine,
avvocati o medici che compaiono frequentemente in sinistri fraudolenti.
"""
# Conta occorrenze delle terze parti in sinistri storici
tp_fraud_rate = self._entity_fraud_rate("third_party_id", ctx.third_party_ids)
shop_fraud_rate = self._entity_fraud_rate(
"repair_shop_id", [ctx.repair_shop_id] if ctx.repair_shop_id else []
)
attorney_fraud_rate = self._entity_fraud_rate(
"attorney_id", [ctx.attorney_id] if ctx.attorney_id else []
)
medical_fraud_rate = self._entity_fraud_rate(
"medical_provider_id",
[ctx.medical_provider_id] if ctx.medical_provider_id else []
)
return {
"n_third_parties": float(len(ctx.third_party_ids)),
"many_third_parties": float(len(ctx.third_party_ids) >= 3),
"tp_avg_fraud_rate": tp_fraud_rate,
"has_high_fraud_tp": float(tp_fraud_rate > 0.3),
"repair_shop_fraud_rate": shop_fraud_rate,
"attorney_present": float(ctx.attorney_id is not None),
"attorney_fraud_rate": attorney_fraud_rate,
"medical_provider_fraud_rate": medical_fraud_rate,
# Combinazione attorney + medical provider in incidente auto = sospetto
"attorney_and_medical": float(
ctx.attorney_id is not None and ctx.medical_provider_id is not None
),
}
def _policy_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative alla polizza."""
policy_row = self.policy_db[self.policy_db["policy_id"] == ctx.policy_id]
if policy_row.empty:
return {"policy_found": 0.0}
policy = policy_row.iloc[0]
return {
"policy_found": 1.0,
"policy_premium": float(policy.get("annual_premium", 0)),
"coverage_amount": float(policy.get("coverage_amount", 0)),
# Rapporto alto tra copertura e premio = polizza sottostimata?
"coverage_premium_ratio": float(
policy.get("coverage_amount", 0) /
max(policy.get("annual_premium", 1), 1)
),
"recent_coverage_increase": float(
policy.get("coverage_increased_90d", False)
),
}
def _claim_amount_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative all'importo dichiarato."""
# Distribuzione storica degli importi per tipo sinistro
type_amounts = self.historical[
self.historical["claim_type"] == ctx.claim_type
]["reported_amount"]
if type_amounts.empty:
return {"amount_percentile": 0.5}
percentile = float(
(type_amounts < ctx.reported_amount).mean()
)
z_score = float(
(ctx.reported_amount - type_amounts.mean()) / max(type_amounts.std(), 1)
)
return {
"reported_amount": ctx.reported_amount,
"reported_amount_log": np.log1p(ctx.reported_amount),
"amount_percentile_for_type": percentile,
"amount_z_score": z_score,
"amount_outlier": float(abs(z_score) > 2.5),
"round_amount": float(ctx.reported_amount % 1000 == 0), # importi "tondi" = sospetto
}
def _get_policy_start(self, policy_id: str) -> date:
row = self.policy_db[self.policy_db["policy_id"] == policy_id]
if row.empty:
return date.today() - timedelta(days=365)
start_str = row.iloc[0].get("start_date", "")
try:
return datetime.strptime(str(start_str), "%Y-%m-%d").date()
except (ValueError, TypeError):
return date.today() - timedelta(days=365)
def _entity_fraud_rate(
self, column: str, entity_ids: List[str]
) -> float:
"""Calcola il tasso di frode storico associato a una lista di entità."""
if not entity_ids or column not in self.historical.columns:
return 0.0
mask = self.historical[column].isin(entity_ids)
subset = self.historical[mask]
if subset.empty:
return 0.0
fraud_col = "is_fraud" if "is_fraud" in subset.columns else None
if fraud_col is None:
return 0.0
return float(subset[fraud_col].mean())
Grafiekanalyse voor detectie van frauduleuze netwerken
Georganiseerde fraude (verzekeringsorganisaties) is onzichtbaar voor traditionele ML-technieken zij beoordelen claims individueel. Een op zichzelf staand ongeval lijkt misschien volkomen legitiem, maar als je het bekijkt binnen een netwerk van relaties – dezelfde derde partijen, hetzelfde workshop, dezelfde advocaat die in tientallen claims voorkomt – het patroon komt duidelijk naar voren.
Met grafieken kunt u deze relaties modelleren: i knopen zij zijn de onderwerpen (verzekerden, derden, werkplaatsen, advocaten, artsen), de bogen zij zijn de verbindingen (hetzelfde ongeval, dezelfde werkplaats, dezelfde straat). De algoritmen van gemeenschap detectie automatisch verdachte clusters identificeren.
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from community import best_partition # python-louvain
@dataclass
class FraudRing:
"""Un cluster sospetto identificato dal grafo."""
ring_id: str
members: List[str] # nodi del cluster
claim_ids: List[str]
total_claimed: float
avg_fraud_score: float
ring_type: str # es. "medical_mill", "staged_accident", "repair_shop_ring"
evidence_summary: str
class InsuranceFraudGraphAnalyzer:
"""
Analisi grafo per rilevazione di insurance fraud rings.
Costruisce un grafo bipartito: sinistri <-> entità (persone, officine, medici)
e applica algoritmi di community detection per trovare cluster sospetti.
"""
# Soglie per classificare un nodo come sospetto
SUSPICION_THRESHOLDS = {
"claimant": {"min_claims": 3, "fraud_rate": 0.2},
"repair_shop": {"min_claims": 5, "fraud_rate": 0.15},
"attorney": {"min_claims": 10, "fraud_rate": 0.15},
"medical_provider": {"min_claims": 8, "fraud_rate": 0.20},
}
def __init__(self, claims_df: pd.DataFrame) -> None:
self.claims_df = claims_df
self.graph = self._build_graph()
def _build_graph(self) -> nx.Graph:
"""
Costruisce un grafo di co-occorrenza tra entità nei sinistri.
Due entità sono connesse se compaiono nello stesso sinistro.
"""
G = nx.Graph()
df = self.claims_df
# Aggiungi nodi per ogni tipo di entità
entity_columns = [
("claimant_id", "claimant"),
("third_party_id", "third_party"),
("repair_shop_id", "repair_shop"),
("attorney_id", "attorney"),
("medical_provider_id", "medical_provider"),
]
for _, row in df.iterrows():
entities_in_claim: List[Tuple[str, str]] = []
for col, entity_type in entity_columns:
if pd.notna(row.get(col)):
node_id = f"{entity_type}_{row[col]}"
G.add_node(node_id, entity_type=entity_type, entity_id=str(row[col]))
entities_in_claim.append((node_id, entity_type))
# Collega tutte le entità che compaiono nello stesso sinistro
for i, (node1, type1) in enumerate(entities_in_claim):
for node2, type2 in entities_in_claim[i+1:]:
if G.has_edge(node1, node2):
G[node1][node2]["weight"] += 1
G[node1][node2]["claims"].append(str(row.get("claim_id", "")))
else:
G.add_edge(
node1, node2,
weight=1,
claims=[str(row.get("claim_id", ""))],
)
return G
def detect_fraud_rings(self, min_ring_size: int = 3) -> List[FraudRing]:
"""
Rileva i fraud ring tramite community detection (Louvain algorithm).
Filtra le community per dimensione e score di sospetto.
"""
if self.graph.number_of_nodes() < min_ring_size:
return []
# Louvain community detection
partition: Dict[str, int] = best_partition(self.graph, weight="weight")
# Raggruppa nodi per community
communities: Dict[int, List[str]] = {}
for node, community_id in partition.items():
communities.setdefault(community_id, []).append(node)
fraud_rings: List[FraudRing] = []
for community_id, members in communities.items():
if len(members) < min_ring_size:
continue
ring = self._evaluate_community(community_id, members)
if ring is not None:
fraud_rings.append(ring)
return sorted(fraud_rings, key=lambda r: r.avg_fraud_score, reverse=True)
def _evaluate_community(
self, community_id: int, members: List[str]
) -> "Optional[FraudRing]":
"""Valuta se una community e sospetta e costruisce il FraudRing."""
# Raccogli tutti i sinistri associati ai membri della community
claim_ids: Set[str] = set()
for u, v, data in self.graph.edges(members, data=True):
claim_ids.update(data.get("claims", []))
if not claim_ids:
return None
# Statistiche sui sinistri della community
community_claims = self.claims_df[
self.claims_df["claim_id"].astype(str).isin(claim_ids)
]
if community_claims.empty:
return None
total_claimed = float(community_claims["reported_amount"].sum())
fraud_col = "is_fraud" if "is_fraud" in community_claims.columns else None
fraud_rate = float(community_claims[fraud_col].mean()) if fraud_col else 0.0
# Score di sospetto basato su: densita del grafo, fraud rate storico, importi
subgraph = self.graph.subgraph(members)
density = nx.density(subgraph)
avg_weight = float(np.mean([
d["weight"] for _, _, d in subgraph.edges(data=True)
])) if subgraph.number_of_edges() > 0 else 0.0
suspicion_score = (
density * 0.4 +
fraud_rate * 0.4 +
min(avg_weight / 10, 1.0) * 0.2
)
# Filtra community non sospette
if suspicion_score < 0.3 and fraud_rate < 0.1:
return None
ring_type = self._classify_ring_type(members)
evidence = self._build_evidence_summary(members, community_claims, fraud_rate, density)
return FraudRing(
ring_id=f"ring_{community_id}",
members=members,
claim_ids=list(claim_ids),
total_claimed=round(total_claimed, 2),
avg_fraud_score=round(suspicion_score, 3),
ring_type=ring_type,
evidence_summary=evidence,
)
def _classify_ring_type(self, members: List[str]) -> str:
"""Classifica il tipo di ring in base alle entità presenti."""
types = [self.graph.nodes[m].get("entity_type", "") for m in members]
type_counts: Dict[str, int] = {}
for t in types:
type_counts[t] = type_counts.get(t, 0) + 1
if type_counts.get("medical_provider", 0) >= 2:
return "medical_mill"
if type_counts.get("repair_shop", 0) >= 2:
return "repair_shop_ring"
if type_counts.get("attorney", 0) >= 1 and type_counts.get("medical_provider", 0) >= 1:
return "organized_injury_ring"
return "staged_accident_ring"
def _build_evidence_summary(
self,
members: List[str],
claims: pd.DataFrame,
fraud_rate: float,
density: float,
) -> str:
n_claims = len(claims)
total = claims["reported_amount"].sum() if not claims.empty else 0
return (
f"Community di {len(members)} soggetti, {n_claims} sinistri collegati, "
f"EUR {total:.0f} totale rivendicato. "
f"Fraud rate storico: {fraud_rate:.0%}. "
f"Densita grafo: {density:.2f}."
)
def get_node_centrality_scores(self) -> Dict[str, float]:
"""
Calcola la centralità di ogni nodo nel grafo (betweenness centrality).
Nodi ad alta centralità sono spesso i coordinatori del ring.
"""
centrality = nx.betweenness_centrality(
self.graph, weight="weight", normalized=True
)
return {node: round(score, 6) for node, score in centrality.items()}
Ensemblemodel voor fraudescores
Geen enkel algoritme vangt alle vormen van fraude op. De meest robuuste aanpak in de productie combineer meerdere patronen in één ensemble: XGBoost voor patronen in tabelvorm, Isolation Forest voor detectie van afwijkingen op niet-gelabelde gegevens, en een model met grafiekfuncties om op te nemen de relationele signalen die uit de grafiek worden gehaald.
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
class InsuranceFraudEnsemble:
"""
Ensemble di modelli per fraud detection assicurativo.
Combina:
1. XGBoost classifier (classificazione supervisionata)
2. Isolation Forest (anomaly detection non supervisionata)
3. Score da graph centrality (segnale relazionale)
Il vantaggio dell'ensemble e la robustezza: se un modello
manca un tipo di frode, gli altri possono compensare.
"""
XGB_PARAMS: Dict = {
"objective": "binary:logistic",
"eval_metric": "aucpr", # area under precision-recall: meglio di AUC per dati sbilanciati
"max_depth": 6,
"learning_rate": 0.05,
"n_estimators": 400,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": 20, # compensa lo sbilanciamento: tipicamente 1 frode ogni 20 claim
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"tree_method": "hist",
"early_stopping_rounds": 30,
}
# Pesi ensemble (devono sommare a 1)
ENSEMBLE_WEIGHTS: Dict[str, float] = {
"xgboost": 0.55,
"isolation_forest": 0.20,
"graph_centrality": 0.25,
}
def __init__(self) -> None:
self.xgb_model: Optional[xgb.XGBClassifier] = None
self.iso_forest: Optional[IsolationForest] = None
self.scaler = StandardScaler()
self.feature_names: List[str] = []
self._is_fitted = False
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> "InsuranceFraudEnsemble":
"""
Addestra l'ensemble.
Args:
X, y: training set (y=1 se frode, y=0 altrimenti)
X_val, y_val: validation set per early stopping
"""
self.feature_names = X.columns.tolist()
# 1. XGBoost supervisionato
print("Training XGBoost classifier...")
self.xgb_model = xgb.XGBClassifier(**self.XGB_PARAMS)
self.xgb_model.fit(
X, y,
eval_set=[(X_val, y_val)],
verbose=50,
)
# 2. Isolation Forest (non supervisionato, addestrato solo su non-frodi)
print("Training Isolation Forest on clean claims...")
X_clean = X[y == 0]
X_scaled = self.scaler.fit_transform(X_clean)
self.iso_forest = IsolationForest(
n_estimators=200,
contamination=0.05, # stima del 5% di anomalie nel set pulito
random_state=42,
n_jobs=-1,
)
self.iso_forest.fit(X_scaled)
self._is_fitted = True
return self
def predict_fraud_score(
self,
X: pd.DataFrame,
graph_centrality_scores: Optional[Dict[str, float]] = None,
) -> np.ndarray:
"""
Calcola lo score di frode per ogni claim.
Args:
X: feature matrix
graph_centrality_scores: score di centralità dal grafo (opzionale)
Returns:
Array di score [0,1] dove 1 = massima probabilità di frode
"""
if not self._is_fitted:
raise RuntimeError("Ensemble non addestrato. Chiamare fit() prima.")
# Score XGBoost
xgb_scores = self.xgb_model.predict_proba(X)[:, 1]
# Score Isolation Forest (normalizzato a [0,1])
X_scaled = self.scaler.transform(X)
iso_raw = self.iso_forest.decision_function(X_scaled)
# score negativo = anomalia; normalizziamo invertendo e scalando
iso_scores = 1 - (iso_raw - iso_raw.min()) / (iso_raw.max() - iso_raw.min() + 1e-10)
# Graph centrality scores
if graph_centrality_scores and "graph_centrality" in X.columns:
graph_scores = X["graph_centrality"].values
else:
graph_scores = np.zeros(len(X))
# Ensemble ponderato
ensemble_score = (
self.ENSEMBLE_WEIGHTS["xgboost"] * xgb_scores +
self.ENSEMBLE_WEIGHTS["isolation_forest"] * iso_scores +
self.ENSEMBLE_WEIGHTS["graph_centrality"] * graph_scores
)
return np.clip(ensemble_score, 0, 1)
def classify_risk_tier(
self, scores: np.ndarray
) -> List[str]:
"""Classifica gli score in tier di rischio per routing."""
tiers = []
for score in scores:
if score < 0.2:
tiers.append("GREEN") # auto-approve
elif score < 0.4:
tiers.append("YELLOW") # standard review
elif score < 0.7:
tiers.append("ORANGE") # enhanced review
else:
tiers.append("RED") # SIU referral
return tiers
def get_feature_importance(self) -> pd.DataFrame:
"""Feature importance dal modello XGBoost."""
if self.xgb_model is None:
raise RuntimeError("Modello non addestrato.")
importances = self.xgb_model.feature_importances_
return pd.DataFrame({
"feature": self.feature_names,
"importance": importances,
}).sort_values("importance", ascending=False)
Realtime fraudescores met Kafka en Faust
Voor fraudeurs waarbij misbruik wordt gemaakt van snelheid, bijvoorbeeld door een reeds beschadigd voertuig te verzekeren of meerdere claims die op dezelfde dag bij verschillende bedrijven zijn geopend – fraudescoring moet plaatsvinden realtime, ten tijde van FNOL, niet in nachtelijke batch. Met Apache Kafka en Faust (Python-streamverwerking) kunt u realtime pijplijnen bouwen met een latentie van minder dan één seconde.
import faust
from typing import Optional
import json
# Schema del messaggio FNOL in entrata
class FNOLEvent(faust.Record, serializer="json"):
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
reported_amount: float
incident_date: str
report_date: str
third_party_ids: list
repair_shop_id: Optional[str] = None
attorney_id: Optional[str] = None
class FraudScoreResult(faust.Record, serializer="json"):
claim_id: str
fraud_score: float
risk_tier: str
fraud_ring_detected: bool
ring_id: Optional[str]
routing_decision: str
processing_time_ms: float
# Configurazione app Faust
app = faust.App(
"insurance-fraud-detector",
broker="kafka://kafka-broker:9092",
value_serializer="json",
)
fnol_topic = app.topic("insurance.fnol.events", value_type=FNOLEvent)
fraud_scores_topic = app.topic("insurance.fraud.scores", value_type=FraudScoreResult)
# Lazy loading del modello ensemble (caricato una volta all'avvio)
_fraud_ensemble = None
_feature_engineer = None
_graph_analyzer = None
def get_fraud_ensemble():
"""Lazy loading del modello per evitare load al startup."""
global _fraud_ensemble
if _fraud_ensemble is None:
import mlflow
_fraud_ensemble = mlflow.sklearn.load_model(
"models:/insurance-fraud-ensemble/Production"
)
return _fraud_ensemble
@app.agent(fnol_topic)
async def process_fnol_events(events):
"""
Agent Faust che processa ogni FNOL in real-time.
Per ogni evento:
1. Estrae le feature comportamentali
2. Calcola il fraud score ensemble
3. Verifica connessioni con fraud rings noti
4. Pubblica il risultato nel topic di output
"""
import time
async for event in events:
start_time = time.monotonic()
try:
# Costruisci le feature (da cache Redis o DB)
features = await _build_features_async(event)
# Calcola fraud score
ensemble = get_fraud_ensemble()
fraud_score = float(ensemble.predict_fraud_score(features)[0])
risk_tier = ensemble.classify_risk_tier([fraud_score])[0]
# Verifica connessioni con ring noti
ring_id, ring_detected = await _check_ring_connections(event)
# Aggiusta score se ring rilevato
if ring_detected:
fraud_score = min(1.0, fraud_score * 1.4)
risk_tier = "RED"
# Determina routing
routing = _routing_decision(risk_tier, ring_detected, event)
elapsed_ms = (time.monotonic() - start_time) * 1000
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=round(fraud_score, 4),
risk_tier=risk_tier,
fraud_ring_detected=ring_detected,
ring_id=ring_id,
routing_decision=routing,
processing_time_ms=round(elapsed_ms, 2),
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
except Exception as exc:
# Non bloccare lo stream per errori singoli
print(f"[ERROR] Claim {event.claim_id}: {exc}")
# Invia con score conservativo per revisione manuale
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=0.5,
risk_tier="YELLOW",
fraud_ring_detected=False,
ring_id=None,
routing_decision="MANUAL_REVIEW_ERROR",
processing_time_ms=-1.0,
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
async def _build_features_async(event: FNOLEvent) -> "pd.DataFrame":
"""Costruisce le feature in modo asincrono da Redis/DB."""
import pandas as pd
# In produzione: lookup asincrono su Redis per dati real-time
# e su DB per storico sinistri e policy
features = {
"reported_amount_log": 0.0,
"report_delay_days": 0.0,
"prior_claims_total": 0.0,
"n_third_parties": float(len(event.third_party_ids)),
"attorney_present": float(event.attorney_id is not None),
"attorney_and_medical": 0.0,
"incident_weekend": 0.0,
"round_amount": float(event.reported_amount % 1000 == 0),
"amount_percentile_for_type": 0.5,
"graph_centrality": 0.0,
}
return pd.DataFrame([features])
async def _check_ring_connections(event: FNOLEvent):
"""Verifica se il richiedente e connesso a ring fraudolenti noti."""
# In produzione: query su graph DB (Neo4j) o Redis per ring attivi
return None, False
def _routing_decision(
risk_tier: str, ring_detected: bool, event: FNOLEvent
) -> str:
if ring_detected or risk_tier == "RED":
return "SIU_REFERRAL"
elif risk_tier == "ORANGE":
return "ENHANCED_REVIEW"
elif risk_tier == "YELLOW":
return "STANDARD_REVIEW"
return "AUTO_APPROVE"
Beste praktijken en antipatronen
Best practices voor detectie van verzekeringsfraude
- Verplicht ensemble: combineert altijd een bewaakt model (XGBoost), een niet-gecontroleerd model (Isolation Forest) en netwerksignalen (grafiekanalyse); Geen enkele aanpak omvat alle soorten fraude
- Kalibratie van drempels per soort claim: de optimale drempel voor autofraude is niet dezelfde als voor levens- of ongevallenverzekeringen; kalibreren per branche
- Verplichte feedbacklus: de resultaten van de SIU-onderzoeken moeten als labels in de trainingsset worden opgenomen; zonder feedbacklussen verbetert het model in de loop van de tijd niet
- Grafiek DB voor realtime verbindingen: gebruik Neo4j of ArangoDB voor milliseconde verbindingsquery's; Traditionele grafieken (NetworkX) schalen niet verder dan miljoenen knooppunten
- Documenteer elke beslissing: Elke fraudevlag moet een gedetailleerd audittraject hebben met de kenmerken die de score hebben bepaald – verplicht voor mogelijke juridische stappen
Antipatronen die u moet vermijden
- Hoge valse positieven: een fout-positief percentage van meer dan 2% ondermijnt het vertrouwen van eerlijke klanten en genereert operationele kosten; bewaakt zowel de precisie als de terugroepactie
- Model alleen getraind op bekende fraude: fraude evolueert; een model dat alleen historische patronen kent, detecteert geen nieuwe patronen – gebruik Isolation Forest om onverwachte afwijkingen vast te leggen
- Discriminatie bij volmacht: variabelen zoals postcode, beroep of nationaliteit kunnen discriminerende proxy’s zijn; Test de uiteenlopende impact voordat u deze implementeert
- Nachtelijke batchscores voor alles: snelle fraudes (op dezelfde dag, bij meerdere bedrijven) vereisen realtime scores; de nachtbatch is in deze gevallen onvoldoende
Conclusies en volgende stappen
Moderne fraudedetectie in verzekeringen vereist een aanpak op meerdere niveaus: feature gedragsmatig voor individuele fraudescores, grafiekanalyses voor frauduleuze netwerken georganiseerde en realtime verwerking voor snelle fraude. Het ensemble van verschillende modellen zorgt voor een zo breed mogelijke dekking van fraudepatronen.
De sleutels tot een succesvol systeem zijn: continue feedbackloop met SIU-resultaten, het monitoren van het percentage fout-positieven om eerlijke klanten niet te bestraffen, en a onveranderlijk audittraject ter ondersteuning van eventuele juridische stappen.
Het volgende artikel in de serie gaat hier dieper op in ACORD-standaard- en API-integratie Verzekering: hoe interoperabiliteit tussen verschillende verzekeringssystemen te implementeren met behulp van standaard ACORD XML/JSON-berichten.
InsurTech Engineering-serie
- 01 - Verzekeringsdomein voor ontwikkelaars: producten, actoren en datamodellen
- 02 - Cloud-native beleidsbeheer: API-First-architectuur
- 03 - Telematicapijplijn: UBI-gegevensverwerking op schaal
- 04 - AI-acceptatie: feature-engineering en risicoscores
- 05 - Claimautomatisering: Computer Vision en NLP
- 06 - Fraudedetectie: grafiekanalyse en gedragssignaal (dit artikel)
- 07 - ACORD Standard en Insurance API-integratie
- 08 - Compliance Engineering: Solvency II en IFRS 17







