Detectarea fraudei în asigurări: analiză grafică și semnal comportamental
Frauda în asigurări este o problemă sistemică și globală. Conform estimărilor industriei, intre 10% și 15% de daune plătite în fiecare an conține elemente de fraudă — pentru un cost total estimat la mai mult de atât 80 de miliarde de dolari pe an la nivel global. În Europa, Insurance Europe estimează pierderi de fraudă de cca 13 miliarde de euro pe an. Acest cost are un impact inevitabil asupra prime plătite de clienți onești, ceea ce face ca detectarea fraudei să nu fie doar o chestiune de profitabilitatea corporativă, dar și corectitudinea sistemului de asigurări.
Frauda în asigurări vine sub diferite forme: de la fraudă oportunistă (exagerează daune reale), la fraudă planificată (accidente organizate, daune inexistente), până la fraudelor sistematice orchestrate de rețele criminale care implică medici, ateliere de caroserie, avocaţi şi complici asiguraţi. Acest ultim tip – fraudă organizată – este cel mai dăunător economic şi cel mai greu de detectat prin metode tradiţionale.
AI modern, în special modelele de analiză grafică pentru detectare a rețelelor frauduloase și modelele de analiza comportamentului semnalului pentru identificare de tipare anormale, a revoluționat capacitatea companiilor de a detecta și de a preveni frauda. Piața de analiză pentru detectarea fraudelor în asigurări este în creștere CAGR de 33% (2025-2032), condus de GNN (Graph Neural Networks), modele de ansamblu și flux procesare în timp real.
Ce vei învăța
- Taxonomie și modele de detectare a fraudei în asigurări
- Inginerie de caracteristici comportamentale pentru scorarea fraudelor
- Analize grafice pentru detectarea rețelelor frauduloase organizate
- Rețele neuronale grafice (GNN) pentru detectarea fraudelor în asigurări
- Modele de ansamblu: XGBoost + Random Forest + Isolation Forest
- Scoruri de fraudă în timp real cu Apache Kafka și Faust
- Fluxul de lucru și managementul cazurilor SIU (Unitatea de Investigații Speciale).
Taxonomia fraudei în asigurări
Înțelegeți tipurile de fraudă și condiția prealabilă pentru proiectarea sistemelor de detectare eficiente. Fiecare tip are modele distincte care necesită abordări ML diferite.
Tipuri de fraudă și tehnici de detectare
| Tipul de fraudă | Exemplu | Frecvenţă | abordarea ML |
|---|---|---|---|
| Oportunistă | Exagerând daunele unui accident real | Ridicat | Detectarea anomaliilor, estimarea daunelor AI |
| Planificat individual | Incendierea propriului vehicul | Medie | Semnale comportamentale, analiza legăturii |
| Organizat/Inel | Rețele de accidente simulate cu complici multipli | Impact scăzut (dar mare) | Analize grafice, GNN, detectarea comunității |
| Intern (Insider) | Agent care aprobă pretenții false | Foarte scăzut | Analiza comportamentului utilizatorilor, analiza rețelei |
| Identitate sintetică | Politica semnată cu date false | Creştere | Verificarea identității ML, link-uri grafice |
Ingineria caracteristicilor comportamentale pentru scorarea fraudelor
Cele mai predictive caracteristici pentru detectarea fraudelor nu sunt cele demografice de bază (vârsta, sex, profesie), dar cele care surprind comportament a solicitantului: momentul raportării, coerența narațiunii, istoricul interacțiunilor cu compania, relațiile cu alte părți în accidente.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, date, timedelta
@dataclass
class ClaimContext:
"""Contesto completo di un sinistro per fraud scoring."""
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
incident_date: date
report_date: date
incident_day_of_week: int
incident_hour: int
location_zip: str
reported_amount: float
third_party_ids: List[str]
repair_shop_id: Optional[str]
attorney_id: Optional[str]
medical_provider_id: Optional[str]
class FraudFeatureEngineer:
"""
Feature engineering per fraud detection assicurativo.
Costruisce feature comportamentali, temporali e relazionali
per catturare i pattern tipici della frode assicurativa.
"""
def __init__(
self,
historical_claims: pd.DataFrame,
policy_db: pd.DataFrame,
) -> None:
self.historical = historical_claims
self.policy_db = policy_db
def build_features(self, ctx: ClaimContext) -> Dict[str, float]:
features: Dict[str, float] = {}
features.update(self._temporal_features(ctx))
features.update(self._behavioral_features(ctx))
features.update(self._network_features(ctx))
features.update(self._policy_features(ctx))
features.update(self._claim_amount_features(ctx))
return features
def _temporal_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature temporali: quando e avvenuto l'incidente e quando e stato denunciato."""
report_delay_days = (ctx.report_date - ctx.incident_date).days
return {
# Pattern sospetto: denunciare tardi o molto in anticipo
"report_delay_days": float(report_delay_days),
"report_delay_over_30": float(report_delay_days > 30),
"report_delay_over_7": float(report_delay_days > 7),
"same_day_report": float(report_delay_days == 0),
# Pattern sospetto: incidenti nel weekend o di notte
"incident_weekend": float(ctx.incident_day_of_week >= 5),
"incident_night": float(ctx.incident_hour < 6 or ctx.incident_hour >= 22),
"incident_monday": float(ctx.incident_day_of_week == 0),
# Fine mese = pressione finanziaria?
"incident_end_month": float(ctx.incident_date.day >= 25),
}
def _behavioral_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature comportamentali basate sullo storico del richiedente."""
claimant_history = self.historical[
self.historical["claimant_id"] == ctx.claimant_id
]
n_prior_claims = len(claimant_history)
n_prior_fraudulent = claimant_history.get("is_fraud", pd.Series([0])).sum()
n_claims_12m = len(claimant_history[
claimant_history["incident_date"] >=
(ctx.incident_date - timedelta(days=365)).strftime("%Y-%m-%d")
]) if not claimant_history.empty else 0
return {
"prior_claims_total": float(n_prior_claims),
"prior_fraud_confirmed": float(n_prior_fraudulent),
"claims_last_12m": float(n_claims_12m),
"high_claim_frequency": float(n_claims_12m >= 2),
"repeat_claimant": float(n_prior_claims >= 3),
"fraud_history": float(n_prior_fraudulent > 0),
# Cambi polizza frequenti = comportamento anomalo?
"policy_age_days": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days
),
"new_policy_claim": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days < 90
),
}
def _network_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""
Feature di rete: connessioni sospette con altri attori.
Identifica se il richiedente e connesso a terze parti, officine,
avvocati o medici che compaiono frequentemente in sinistri fraudolenti.
"""
# Conta occorrenze delle terze parti in sinistri storici
tp_fraud_rate = self._entity_fraud_rate("third_party_id", ctx.third_party_ids)
shop_fraud_rate = self._entity_fraud_rate(
"repair_shop_id", [ctx.repair_shop_id] if ctx.repair_shop_id else []
)
attorney_fraud_rate = self._entity_fraud_rate(
"attorney_id", [ctx.attorney_id] if ctx.attorney_id else []
)
medical_fraud_rate = self._entity_fraud_rate(
"medical_provider_id",
[ctx.medical_provider_id] if ctx.medical_provider_id else []
)
return {
"n_third_parties": float(len(ctx.third_party_ids)),
"many_third_parties": float(len(ctx.third_party_ids) >= 3),
"tp_avg_fraud_rate": tp_fraud_rate,
"has_high_fraud_tp": float(tp_fraud_rate > 0.3),
"repair_shop_fraud_rate": shop_fraud_rate,
"attorney_present": float(ctx.attorney_id is not None),
"attorney_fraud_rate": attorney_fraud_rate,
"medical_provider_fraud_rate": medical_fraud_rate,
# Combinazione attorney + medical provider in incidente auto = sospetto
"attorney_and_medical": float(
ctx.attorney_id is not None and ctx.medical_provider_id is not None
),
}
def _policy_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative alla polizza."""
policy_row = self.policy_db[self.policy_db["policy_id"] == ctx.policy_id]
if policy_row.empty:
return {"policy_found": 0.0}
policy = policy_row.iloc[0]
return {
"policy_found": 1.0,
"policy_premium": float(policy.get("annual_premium", 0)),
"coverage_amount": float(policy.get("coverage_amount", 0)),
# Rapporto alto tra copertura e premio = polizza sottostimata?
"coverage_premium_ratio": float(
policy.get("coverage_amount", 0) /
max(policy.get("annual_premium", 1), 1)
),
"recent_coverage_increase": float(
policy.get("coverage_increased_90d", False)
),
}
def _claim_amount_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative all'importo dichiarato."""
# Distribuzione storica degli importi per tipo sinistro
type_amounts = self.historical[
self.historical["claim_type"] == ctx.claim_type
]["reported_amount"]
if type_amounts.empty:
return {"amount_percentile": 0.5}
percentile = float(
(type_amounts < ctx.reported_amount).mean()
)
z_score = float(
(ctx.reported_amount - type_amounts.mean()) / max(type_amounts.std(), 1)
)
return {
"reported_amount": ctx.reported_amount,
"reported_amount_log": np.log1p(ctx.reported_amount),
"amount_percentile_for_type": percentile,
"amount_z_score": z_score,
"amount_outlier": float(abs(z_score) > 2.5),
"round_amount": float(ctx.reported_amount % 1000 == 0), # importi "tondi" = sospetto
}
def _get_policy_start(self, policy_id: str) -> date:
row = self.policy_db[self.policy_db["policy_id"] == policy_id]
if row.empty:
return date.today() - timedelta(days=365)
start_str = row.iloc[0].get("start_date", "")
try:
return datetime.strptime(str(start_str), "%Y-%m-%d").date()
except (ValueError, TypeError):
return date.today() - timedelta(days=365)
def _entity_fraud_rate(
self, column: str, entity_ids: List[str]
) -> float:
"""Calcola il tasso di frode storico associato a una lista di entità."""
if not entity_ids or column not in self.historical.columns:
return 0.0
mask = self.historical[column].isin(entity_ids)
subset = self.historical[mask]
if subset.empty:
return 0.0
fraud_col = "is_fraud" if "is_fraud" in subset.columns else None
if fraud_col is None:
return 0.0
return float(subset[fraud_col].mean())
Analize grafice pentru detectarea rețelelor frauduloase
Frauda organizată (cercuri de asigurări) este invizibilă tehnicilor tradiționale de ML ei evaluează individual cererile. Un accident izolat poate părea perfect legitim, dar dacă te uiți la asta într-o rețea de relații — aceleași terți, la fel atelier, același avocat care apare în zeci de revendicări — tiparul reiese clar.
Graficele vă permit să modelați aceste relații: i noduri ei sunt subiectii (asigurati, terti, ateliere, avocati, medici), the arcade ei sunt cei racorduri (acelasi accident, acelasi atelier, aceeasi strada). The algorithms of detectarea comunitatii identifică automat clusterele suspecte.
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from community import best_partition # python-louvain
@dataclass
class FraudRing:
"""Un cluster sospetto identificato dal grafo."""
ring_id: str
members: List[str] # nodi del cluster
claim_ids: List[str]
total_claimed: float
avg_fraud_score: float
ring_type: str # es. "medical_mill", "staged_accident", "repair_shop_ring"
evidence_summary: str
class InsuranceFraudGraphAnalyzer:
"""
Analisi grafo per rilevazione di insurance fraud rings.
Costruisce un grafo bipartito: sinistri <-> entità (persone, officine, medici)
e applica algoritmi di community detection per trovare cluster sospetti.
"""
# Soglie per classificare un nodo come sospetto
SUSPICION_THRESHOLDS = {
"claimant": {"min_claims": 3, "fraud_rate": 0.2},
"repair_shop": {"min_claims": 5, "fraud_rate": 0.15},
"attorney": {"min_claims": 10, "fraud_rate": 0.15},
"medical_provider": {"min_claims": 8, "fraud_rate": 0.20},
}
def __init__(self, claims_df: pd.DataFrame) -> None:
self.claims_df = claims_df
self.graph = self._build_graph()
def _build_graph(self) -> nx.Graph:
"""
Costruisce un grafo di co-occorrenza tra entità nei sinistri.
Due entità sono connesse se compaiono nello stesso sinistro.
"""
G = nx.Graph()
df = self.claims_df
# Aggiungi nodi per ogni tipo di entità
entity_columns = [
("claimant_id", "claimant"),
("third_party_id", "third_party"),
("repair_shop_id", "repair_shop"),
("attorney_id", "attorney"),
("medical_provider_id", "medical_provider"),
]
for _, row in df.iterrows():
entities_in_claim: List[Tuple[str, str]] = []
for col, entity_type in entity_columns:
if pd.notna(row.get(col)):
node_id = f"{entity_type}_{row[col]}"
G.add_node(node_id, entity_type=entity_type, entity_id=str(row[col]))
entities_in_claim.append((node_id, entity_type))
# Collega tutte le entità che compaiono nello stesso sinistro
for i, (node1, type1) in enumerate(entities_in_claim):
for node2, type2 in entities_in_claim[i+1:]:
if G.has_edge(node1, node2):
G[node1][node2]["weight"] += 1
G[node1][node2]["claims"].append(str(row.get("claim_id", "")))
else:
G.add_edge(
node1, node2,
weight=1,
claims=[str(row.get("claim_id", ""))],
)
return G
def detect_fraud_rings(self, min_ring_size: int = 3) -> List[FraudRing]:
"""
Rileva i fraud ring tramite community detection (Louvain algorithm).
Filtra le community per dimensione e score di sospetto.
"""
if self.graph.number_of_nodes() < min_ring_size:
return []
# Louvain community detection
partition: Dict[str, int] = best_partition(self.graph, weight="weight")
# Raggruppa nodi per community
communities: Dict[int, List[str]] = {}
for node, community_id in partition.items():
communities.setdefault(community_id, []).append(node)
fraud_rings: List[FraudRing] = []
for community_id, members in communities.items():
if len(members) < min_ring_size:
continue
ring = self._evaluate_community(community_id, members)
if ring is not None:
fraud_rings.append(ring)
return sorted(fraud_rings, key=lambda r: r.avg_fraud_score, reverse=True)
def _evaluate_community(
self, community_id: int, members: List[str]
) -> "Optional[FraudRing]":
"""Valuta se una community e sospetta e costruisce il FraudRing."""
# Raccogli tutti i sinistri associati ai membri della community
claim_ids: Set[str] = set()
for u, v, data in self.graph.edges(members, data=True):
claim_ids.update(data.get("claims", []))
if not claim_ids:
return None
# Statistiche sui sinistri della community
community_claims = self.claims_df[
self.claims_df["claim_id"].astype(str).isin(claim_ids)
]
if community_claims.empty:
return None
total_claimed = float(community_claims["reported_amount"].sum())
fraud_col = "is_fraud" if "is_fraud" in community_claims.columns else None
fraud_rate = float(community_claims[fraud_col].mean()) if fraud_col else 0.0
# Score di sospetto basato su: densita del grafo, fraud rate storico, importi
subgraph = self.graph.subgraph(members)
density = nx.density(subgraph)
avg_weight = float(np.mean([
d["weight"] for _, _, d in subgraph.edges(data=True)
])) if subgraph.number_of_edges() > 0 else 0.0
suspicion_score = (
density * 0.4 +
fraud_rate * 0.4 +
min(avg_weight / 10, 1.0) * 0.2
)
# Filtra community non sospette
if suspicion_score < 0.3 and fraud_rate < 0.1:
return None
ring_type = self._classify_ring_type(members)
evidence = self._build_evidence_summary(members, community_claims, fraud_rate, density)
return FraudRing(
ring_id=f"ring_{community_id}",
members=members,
claim_ids=list(claim_ids),
total_claimed=round(total_claimed, 2),
avg_fraud_score=round(suspicion_score, 3),
ring_type=ring_type,
evidence_summary=evidence,
)
def _classify_ring_type(self, members: List[str]) -> str:
"""Classifica il tipo di ring in base alle entità presenti."""
types = [self.graph.nodes[m].get("entity_type", "") for m in members]
type_counts: Dict[str, int] = {}
for t in types:
type_counts[t] = type_counts.get(t, 0) + 1
if type_counts.get("medical_provider", 0) >= 2:
return "medical_mill"
if type_counts.get("repair_shop", 0) >= 2:
return "repair_shop_ring"
if type_counts.get("attorney", 0) >= 1 and type_counts.get("medical_provider", 0) >= 1:
return "organized_injury_ring"
return "staged_accident_ring"
def _build_evidence_summary(
self,
members: List[str],
claims: pd.DataFrame,
fraud_rate: float,
density: float,
) -> str:
n_claims = len(claims)
total = claims["reported_amount"].sum() if not claims.empty else 0
return (
f"Community di {len(members)} soggetti, {n_claims} sinistri collegati, "
f"EUR {total:.0f} totale rivendicato. "
f"Fraud rate storico: {fraud_rate:.0%}. "
f"Densita grafo: {density:.2f}."
)
def get_node_centrality_scores(self) -> Dict[str, float]:
"""
Calcola la centralità di ogni nodo nel grafo (betweenness centrality).
Nodi ad alta centralità sono spesso i coordinatori del ring.
"""
centrality = nx.betweenness_centrality(
self.graph, weight="weight", normalized=True
)
return {node: round(score, 6) for node, score in centrality.items()}
Model de ansamblu pentru scorarea fraudelor
Niciun algoritm unic nu prinde toate tipurile de fraudă. Cea mai robustă abordare în producție combinați mai multe modele într-un singur ansamblu: XGBoost pentru modele tabelare, Isolation Forest pentru detectarea anomaliilor pe date neetichetate și un model de caracteristici grafice de încorporat semnalele relaţionale extrase din grafic.
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
class InsuranceFraudEnsemble:
"""
Ensemble di modelli per fraud detection assicurativo.
Combina:
1. XGBoost classifier (classificazione supervisionata)
2. Isolation Forest (anomaly detection non supervisionata)
3. Score da graph centrality (segnale relazionale)
Il vantaggio dell'ensemble e la robustezza: se un modello
manca un tipo di frode, gli altri possono compensare.
"""
XGB_PARAMS: Dict = {
"objective": "binary:logistic",
"eval_metric": "aucpr", # area under precision-recall: meglio di AUC per dati sbilanciati
"max_depth": 6,
"learning_rate": 0.05,
"n_estimators": 400,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": 20, # compensa lo sbilanciamento: tipicamente 1 frode ogni 20 claim
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"tree_method": "hist",
"early_stopping_rounds": 30,
}
# Pesi ensemble (devono sommare a 1)
ENSEMBLE_WEIGHTS: Dict[str, float] = {
"xgboost": 0.55,
"isolation_forest": 0.20,
"graph_centrality": 0.25,
}
def __init__(self) -> None:
self.xgb_model: Optional[xgb.XGBClassifier] = None
self.iso_forest: Optional[IsolationForest] = None
self.scaler = StandardScaler()
self.feature_names: List[str] = []
self._is_fitted = False
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> "InsuranceFraudEnsemble":
"""
Addestra l'ensemble.
Args:
X, y: training set (y=1 se frode, y=0 altrimenti)
X_val, y_val: validation set per early stopping
"""
self.feature_names = X.columns.tolist()
# 1. XGBoost supervisionato
print("Training XGBoost classifier...")
self.xgb_model = xgb.XGBClassifier(**self.XGB_PARAMS)
self.xgb_model.fit(
X, y,
eval_set=[(X_val, y_val)],
verbose=50,
)
# 2. Isolation Forest (non supervisionato, addestrato solo su non-frodi)
print("Training Isolation Forest on clean claims...")
X_clean = X[y == 0]
X_scaled = self.scaler.fit_transform(X_clean)
self.iso_forest = IsolationForest(
n_estimators=200,
contamination=0.05, # stima del 5% di anomalie nel set pulito
random_state=42,
n_jobs=-1,
)
self.iso_forest.fit(X_scaled)
self._is_fitted = True
return self
def predict_fraud_score(
self,
X: pd.DataFrame,
graph_centrality_scores: Optional[Dict[str, float]] = None,
) -> np.ndarray:
"""
Calcola lo score di frode per ogni claim.
Args:
X: feature matrix
graph_centrality_scores: score di centralità dal grafo (opzionale)
Returns:
Array di score [0,1] dove 1 = massima probabilità di frode
"""
if not self._is_fitted:
raise RuntimeError("Ensemble non addestrato. Chiamare fit() prima.")
# Score XGBoost
xgb_scores = self.xgb_model.predict_proba(X)[:, 1]
# Score Isolation Forest (normalizzato a [0,1])
X_scaled = self.scaler.transform(X)
iso_raw = self.iso_forest.decision_function(X_scaled)
# score negativo = anomalia; normalizziamo invertendo e scalando
iso_scores = 1 - (iso_raw - iso_raw.min()) / (iso_raw.max() - iso_raw.min() + 1e-10)
# Graph centrality scores
if graph_centrality_scores and "graph_centrality" in X.columns:
graph_scores = X["graph_centrality"].values
else:
graph_scores = np.zeros(len(X))
# Ensemble ponderato
ensemble_score = (
self.ENSEMBLE_WEIGHTS["xgboost"] * xgb_scores +
self.ENSEMBLE_WEIGHTS["isolation_forest"] * iso_scores +
self.ENSEMBLE_WEIGHTS["graph_centrality"] * graph_scores
)
return np.clip(ensemble_score, 0, 1)
def classify_risk_tier(
self, scores: np.ndarray
) -> List[str]:
"""Classifica gli score in tier di rischio per routing."""
tiers = []
for score in scores:
if score < 0.2:
tiers.append("GREEN") # auto-approve
elif score < 0.4:
tiers.append("YELLOW") # standard review
elif score < 0.7:
tiers.append("ORANGE") # enhanced review
else:
tiers.append("RED") # SIU referral
return tiers
def get_feature_importance(self) -> pd.DataFrame:
"""Feature importance dal modello XGBoost."""
if self.xgb_model is None:
raise RuntimeError("Modello non addestrato.")
importances = self.xgb_model.feature_importances_
return pd.DataFrame({
"feature": self.feature_names,
"importance": importances,
}).sort_values("importance", ascending=False)
Punctajul fraudelor în timp real cu Kafka și Faust
Pentru fraudele care exploatează viteza — de exemplu, asigurarea unui vehicul deja avariat sau mai multe revendicări deschise în aceeași zi pe diferite companii — trebuie să se pună în considerare frauda are loc în în timp real, la momentul FNOL, nu în lot de noapte. Apache Kafka și Faust (procesarea fluxului Python) vă permit să construiți conducte în timp real cu latență sub o secundă.
import faust
from typing import Optional
import json
# Schema del messaggio FNOL in entrata
class FNOLEvent(faust.Record, serializer="json"):
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
reported_amount: float
incident_date: str
report_date: str
third_party_ids: list
repair_shop_id: Optional[str] = None
attorney_id: Optional[str] = None
class FraudScoreResult(faust.Record, serializer="json"):
claim_id: str
fraud_score: float
risk_tier: str
fraud_ring_detected: bool
ring_id: Optional[str]
routing_decision: str
processing_time_ms: float
# Configurazione app Faust
app = faust.App(
"insurance-fraud-detector",
broker="kafka://kafka-broker:9092",
value_serializer="json",
)
fnol_topic = app.topic("insurance.fnol.events", value_type=FNOLEvent)
fraud_scores_topic = app.topic("insurance.fraud.scores", value_type=FraudScoreResult)
# Lazy loading del modello ensemble (caricato una volta all'avvio)
_fraud_ensemble = None
_feature_engineer = None
_graph_analyzer = None
def get_fraud_ensemble():
"""Lazy loading del modello per evitare load al startup."""
global _fraud_ensemble
if _fraud_ensemble is None:
import mlflow
_fraud_ensemble = mlflow.sklearn.load_model(
"models:/insurance-fraud-ensemble/Production"
)
return _fraud_ensemble
@app.agent(fnol_topic)
async def process_fnol_events(events):
"""
Agent Faust che processa ogni FNOL in real-time.
Per ogni evento:
1. Estrae le feature comportamentali
2. Calcola il fraud score ensemble
3. Verifica connessioni con fraud rings noti
4. Pubblica il risultato nel topic di output
"""
import time
async for event in events:
start_time = time.monotonic()
try:
# Costruisci le feature (da cache Redis o DB)
features = await _build_features_async(event)
# Calcola fraud score
ensemble = get_fraud_ensemble()
fraud_score = float(ensemble.predict_fraud_score(features)[0])
risk_tier = ensemble.classify_risk_tier([fraud_score])[0]
# Verifica connessioni con ring noti
ring_id, ring_detected = await _check_ring_connections(event)
# Aggiusta score se ring rilevato
if ring_detected:
fraud_score = min(1.0, fraud_score * 1.4)
risk_tier = "RED"
# Determina routing
routing = _routing_decision(risk_tier, ring_detected, event)
elapsed_ms = (time.monotonic() - start_time) * 1000
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=round(fraud_score, 4),
risk_tier=risk_tier,
fraud_ring_detected=ring_detected,
ring_id=ring_id,
routing_decision=routing,
processing_time_ms=round(elapsed_ms, 2),
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
except Exception as exc:
# Non bloccare lo stream per errori singoli
print(f"[ERROR] Claim {event.claim_id}: {exc}")
# Invia con score conservativo per revisione manuale
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=0.5,
risk_tier="YELLOW",
fraud_ring_detected=False,
ring_id=None,
routing_decision="MANUAL_REVIEW_ERROR",
processing_time_ms=-1.0,
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
async def _build_features_async(event: FNOLEvent) -> "pd.DataFrame":
"""Costruisce le feature in modo asincrono da Redis/DB."""
import pandas as pd
# In produzione: lookup asincrono su Redis per dati real-time
# e su DB per storico sinistri e policy
features = {
"reported_amount_log": 0.0,
"report_delay_days": 0.0,
"prior_claims_total": 0.0,
"n_third_parties": float(len(event.third_party_ids)),
"attorney_present": float(event.attorney_id is not None),
"attorney_and_medical": 0.0,
"incident_weekend": 0.0,
"round_amount": float(event.reported_amount % 1000 == 0),
"amount_percentile_for_type": 0.5,
"graph_centrality": 0.0,
}
return pd.DataFrame([features])
async def _check_ring_connections(event: FNOLEvent):
"""Verifica se il richiedente e connesso a ring fraudolenti noti."""
# In produzione: query su graph DB (Neo4j) o Redis per ring attivi
return None, False
def _routing_decision(
risk_tier: str, ring_detected: bool, event: FNOLEvent
) -> str:
if ring_detected or risk_tier == "RED":
return "SIU_REFERRAL"
elif risk_tier == "ORANGE":
return "ENHANCED_REVIEW"
elif risk_tier == "YELLOW":
return "STANDARD_REVIEW"
return "AUTO_APPROVE"
Cele mai bune practici și anti-modele
Cele mai bune practici pentru detectarea fraudelor în asigurări
- Ansamblu obligatoriu: combină întotdeauna un model supravegheat (XGBoost), un model nesupravegheat (Isolation Forest) și semnale de rețea (analitică grafică); nicio abordare nu acoperă toate tipurile de fraudă
- Calibrarea pragurilor în funcție de tipul de revendicare: pragul optim pentru frauda auto nu este același ca pentru asigurarea de viață sau de accidente; calibrați după linia de activitate
- Bucla de feedback obligatorie: rezultatele investigațiilor SIU trebuie incluse ca etichete în setul de instruire; fără bucle de feedback, modelul nu se îmbunătățește în timp
- DB grafic pentru conexiuni în timp real: utilizați Neo4j sau ArangoDB pentru interogări de conexiune în milisecunde; graficele tradiționale (NetworkX) nu se scalează dincolo de milioane de noduri
- Documentați fiecare decizie: fiecare semnalizare a fraudei trebuie să aibă o pistă de audit detaliată cu caracteristicile care au determinat scorul — obligatoriu pentru posibile acțiuni în justiție
Anti-modele de evitat
- Fals pozitive ridicate: o rată fals pozitivă peste 2% erodează încrederea clienților onești și generează costuri operaționale; monitorizează precizia, precum și rechemarea
- Model instruit numai pentru fraudele cunoscute: frauda evoluează; un model care cunoaște doar modele istorice nu detectează modele noi - utilizați Isolation Forest pentru a captura anomalii neașteptate
- Discriminare prin împuternicire: variabile precum codul poștal, profesia sau naționalitatea pot fi proxy discriminatori; Testați impactul disparat înainte de implementare
- Scorul pe lot de noapte pentru tot: fraudele rapide (aceeași zi, multi-companie) necesită punctare în timp real; lotul de noapte este insuficient pentru aceste cazuri
Concluzii și pașii următori
Detectarea modernă a fraudelor în asigurări necesită o abordare pe mai multe niveluri: caracteristică comportamentale pentru scorarea individuală a fraudelor, analize grafice pentru rețele frauduloase procesare organizată și în timp real pentru fraude rapide. Ansamblul diferitelor modele asigură cea mai largă acoperire posibilă a tiparelor de fraudă.
Cheile unui sistem de succes sunt: buclă de feedback continuă cu rezultate SIU, monitorizarea ratei fals pozitive pentru a nu penaliza clienții onești și a pistă de audit imuabilă pentru a sprijini orice acțiune în justiție.
Următorul articol din serie intră în mai multe detalii ACORD Standard și Integrare API Asigurare: cum se implementează interoperabilitatea între diferite sisteme de asigurare folosind mesaje standard ACORD XML/JSON.
Seria InsurTech Engineering
- 01 - Domeniul de asigurare pentru Dezvoltatori: Produse, Actori și Modele de Date
- 02 - Cloud-Native Policy Management: API-First Architecture
- 03 - Telematics Pipeline: UBI Data Processing la scară
- 04 - Subscriere AI: Ingineria caracteristicilor și scorul de risc
- 05 - Automatizarea revendicărilor: Computer Vision și NLP
- 06 - Detectarea fraudei: analiză grafică și semnal comportamental (acest articol)
- 07 - Integrare ACORD Standard și Insurance API
- 08 - Inginerie de conformitate: Solvabilitate II și IFRS 17







