Sigorta Sahtekarlığı Tespiti: Grafik Analizi ve Davranışsal Sinyal
Sigorta dolandırıcılığı sistemik ve küresel bir sorundur. Sektör tahminlerine göre, arasında %10 ve %15 Her yıl ödenen taleplerin oranı dolandırıcılık unsurları içeriyor — bundan daha fazla olduğu tahmin edilen toplam maliyet için Yılda 80 milyar dolar küresel olarak. Insurance Europe, Avrupa'da dolandırıcılık kayıplarının yaklaşık Yılda 13 milyar euro. Bu maliyet kaçınılmaz olarak Dürüst müşteriler tarafından ödenen primler sayesinde dolandırıcılığın tespiti sadece bir mesele olmaktan çıkıyor. kurumsal kârlılığın yanı sıra sigorta sisteminin adilliği de önemlidir.
Sigorta dolandırıcılığı farklı şekillerde ortaya çıkar: fırsatçı dolandırıcılıktan (abartılı gerçek hasar), planlı dolandırıcılığa (organize kazalar, mevcut olmayan iddialar), doktorların, tamirhanelerin ve tamirhanelerin dahil olduğu suç ağları tarafından düzenlenen sistematik dolandırıcılığa, avukatlar ve sigortalı suç ortakları. Bu son tür olan organize dolandırıcılık en zararlı olanıdır. Ekonomik olarak ve geleneksel yöntemlerle tespit edilmesi en zor olanıdır.
Modern yapay zeka, özellikle de modeller grafik analizi tespit için sahte ağlar ve modelleri davranışsal sinyal analizi kimlik tespiti için anormal kalıpların ortaya çıkması, şirketlerin dolandırıcılığı tespit etme ve önleme becerisinde devrim yarattı. Sigorta dolandırıcılığının tespitine yönelik analitik pazarı hızla büyüyor CAGR'ı %33 (2025-2032), GNN (Grafik Sinir Ağları), topluluk ve akış modelleri tarafından yönlendirilir gerçek zamanlı işleme.
Ne Öğreneceksiniz
- Sigorta dolandırıcılığı sınıflandırması ve tespit kalıpları
- Dolandırıcılık puanlaması için davranışsal özellik mühendisliği
- Organize dolandırıcılık ağlarının tespiti için grafik analitiği
- Sigorta sahtekarlığının tespiti için Grafik Sinir Ağları (GNN)
- Topluluk modelleri: XGBoost + Random Forest + Isolation Forest
- Apache Kafka ve Faust ile gerçek zamanlı dolandırıcılık puanlaması
- SIU (Özel Soruşturma Birimi) iş akışı ve vaka yönetimi
Sigorta Dolandırıcılığının Taksonomisi
Dolandırıcılık türlerini ve etkili tespit sistemleri tasarlamanın ön koşullarını anlayın. Her türün farklı makine öğrenimi yaklaşımları gerektiren farklı modelleri vardır.
Dolandırıcılık Türleri ve Tespit Teknikleri
| Dolandırıcılık Türü | Örnek | Sıklık | Makine öğrenimi yaklaşımı |
|---|---|---|---|
| Fırsatçı | Gerçek bir kazanın zararlarını abartmak | Yüksek | Anormallik tespiti, AI hasar tahmini |
| Bireysel olarak planlanmış | Kendi aracının kundaklanması | Ortalama | Davranışsal sinyaller, bağlantı analizi |
| Organize/Zil | Birden fazla suç ortağının olduğu simüle edilmiş kaza ağları | Düşük (ancak yüksek etki) | Grafik analizi, GNN, topluluk tespiti |
| Dahili (İçeriden Öğrenen) | Sahte iddiaları onaylayan temsilci | Çok düşük | Kullanıcı davranışı analitiği, ağ analizi |
| Sentetik kimlik | Yanlış verilerle imzalanan politika | Büyüyor | Kimlik doğrulama ML, grafik bağlantıları |
Dolandırıcılık Puanlaması için Davranışsal Özellik Mühendisliği
Dolandırıcılık tespitine yönelik en öngörücü özellikler, temel demografik özellikler (yaş, cinsiyet, meslek), ancak bunları yakalayanlar davranış başvuranın: Raporun zamanlaması, anlatının tutarlılığı, şirketle olan etkileşimlerin geçmişi, Kazalarda diğer taraflarla ilişkiler.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, date, timedelta
@dataclass
class ClaimContext:
"""Contesto completo di un sinistro per fraud scoring."""
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
incident_date: date
report_date: date
incident_day_of_week: int
incident_hour: int
location_zip: str
reported_amount: float
third_party_ids: List[str]
repair_shop_id: Optional[str]
attorney_id: Optional[str]
medical_provider_id: Optional[str]
class FraudFeatureEngineer:
"""
Feature engineering per fraud detection assicurativo.
Costruisce feature comportamentali, temporali e relazionali
per catturare i pattern tipici della frode assicurativa.
"""
def __init__(
self,
historical_claims: pd.DataFrame,
policy_db: pd.DataFrame,
) -> None:
self.historical = historical_claims
self.policy_db = policy_db
def build_features(self, ctx: ClaimContext) -> Dict[str, float]:
features: Dict[str, float] = {}
features.update(self._temporal_features(ctx))
features.update(self._behavioral_features(ctx))
features.update(self._network_features(ctx))
features.update(self._policy_features(ctx))
features.update(self._claim_amount_features(ctx))
return features
def _temporal_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature temporali: quando e avvenuto l'incidente e quando e stato denunciato."""
report_delay_days = (ctx.report_date - ctx.incident_date).days
return {
# Pattern sospetto: denunciare tardi o molto in anticipo
"report_delay_days": float(report_delay_days),
"report_delay_over_30": float(report_delay_days > 30),
"report_delay_over_7": float(report_delay_days > 7),
"same_day_report": float(report_delay_days == 0),
# Pattern sospetto: incidenti nel weekend o di notte
"incident_weekend": float(ctx.incident_day_of_week >= 5),
"incident_night": float(ctx.incident_hour < 6 or ctx.incident_hour >= 22),
"incident_monday": float(ctx.incident_day_of_week == 0),
# Fine mese = pressione finanziaria?
"incident_end_month": float(ctx.incident_date.day >= 25),
}
def _behavioral_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature comportamentali basate sullo storico del richiedente."""
claimant_history = self.historical[
self.historical["claimant_id"] == ctx.claimant_id
]
n_prior_claims = len(claimant_history)
n_prior_fraudulent = claimant_history.get("is_fraud", pd.Series([0])).sum()
n_claims_12m = len(claimant_history[
claimant_history["incident_date"] >=
(ctx.incident_date - timedelta(days=365)).strftime("%Y-%m-%d")
]) if not claimant_history.empty else 0
return {
"prior_claims_total": float(n_prior_claims),
"prior_fraud_confirmed": float(n_prior_fraudulent),
"claims_last_12m": float(n_claims_12m),
"high_claim_frequency": float(n_claims_12m >= 2),
"repeat_claimant": float(n_prior_claims >= 3),
"fraud_history": float(n_prior_fraudulent > 0),
# Cambi polizza frequenti = comportamento anomalo?
"policy_age_days": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days
),
"new_policy_claim": float(
(ctx.incident_date - self._get_policy_start(ctx.policy_id)).days < 90
),
}
def _network_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""
Feature di rete: connessioni sospette con altri attori.
Identifica se il richiedente e connesso a terze parti, officine,
avvocati o medici che compaiono frequentemente in sinistri fraudolenti.
"""
# Conta occorrenze delle terze parti in sinistri storici
tp_fraud_rate = self._entity_fraud_rate("third_party_id", ctx.third_party_ids)
shop_fraud_rate = self._entity_fraud_rate(
"repair_shop_id", [ctx.repair_shop_id] if ctx.repair_shop_id else []
)
attorney_fraud_rate = self._entity_fraud_rate(
"attorney_id", [ctx.attorney_id] if ctx.attorney_id else []
)
medical_fraud_rate = self._entity_fraud_rate(
"medical_provider_id",
[ctx.medical_provider_id] if ctx.medical_provider_id else []
)
return {
"n_third_parties": float(len(ctx.third_party_ids)),
"many_third_parties": float(len(ctx.third_party_ids) >= 3),
"tp_avg_fraud_rate": tp_fraud_rate,
"has_high_fraud_tp": float(tp_fraud_rate > 0.3),
"repair_shop_fraud_rate": shop_fraud_rate,
"attorney_present": float(ctx.attorney_id is not None),
"attorney_fraud_rate": attorney_fraud_rate,
"medical_provider_fraud_rate": medical_fraud_rate,
# Combinazione attorney + medical provider in incidente auto = sospetto
"attorney_and_medical": float(
ctx.attorney_id is not None and ctx.medical_provider_id is not None
),
}
def _policy_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative alla polizza."""
policy_row = self.policy_db[self.policy_db["policy_id"] == ctx.policy_id]
if policy_row.empty:
return {"policy_found": 0.0}
policy = policy_row.iloc[0]
return {
"policy_found": 1.0,
"policy_premium": float(policy.get("annual_premium", 0)),
"coverage_amount": float(policy.get("coverage_amount", 0)),
# Rapporto alto tra copertura e premio = polizza sottostimata?
"coverage_premium_ratio": float(
policy.get("coverage_amount", 0) /
max(policy.get("annual_premium", 1), 1)
),
"recent_coverage_increase": float(
policy.get("coverage_increased_90d", False)
),
}
def _claim_amount_features(self, ctx: ClaimContext) -> Dict[str, float]:
"""Feature relative all'importo dichiarato."""
# Distribuzione storica degli importi per tipo sinistro
type_amounts = self.historical[
self.historical["claim_type"] == ctx.claim_type
]["reported_amount"]
if type_amounts.empty:
return {"amount_percentile": 0.5}
percentile = float(
(type_amounts < ctx.reported_amount).mean()
)
z_score = float(
(ctx.reported_amount - type_amounts.mean()) / max(type_amounts.std(), 1)
)
return {
"reported_amount": ctx.reported_amount,
"reported_amount_log": np.log1p(ctx.reported_amount),
"amount_percentile_for_type": percentile,
"amount_z_score": z_score,
"amount_outlier": float(abs(z_score) > 2.5),
"round_amount": float(ctx.reported_amount % 1000 == 0), # importi "tondi" = sospetto
}
def _get_policy_start(self, policy_id: str) -> date:
row = self.policy_db[self.policy_db["policy_id"] == policy_id]
if row.empty:
return date.today() - timedelta(days=365)
start_str = row.iloc[0].get("start_date", "")
try:
return datetime.strptime(str(start_str), "%Y-%m-%d").date()
except (ValueError, TypeError):
return date.today() - timedelta(days=365)
def _entity_fraud_rate(
self, column: str, entity_ids: List[str]
) -> float:
"""Calcola il tasso di frode storico associato a una lista di entità."""
if not entity_ids or column not in self.historical.columns:
return 0.0
mask = self.historical[column].isin(entity_ids)
subset = self.historical[mask]
if subset.empty:
return 0.0
fraud_col = "is_fraud" if "is_fraud" in subset.columns else None
if fraud_col is None:
return 0.0
return float(subset[fraud_col].mean())
Sahte Ağ Tespiti için Grafik Analizi
Organize dolandırıcılık (sigorta halkaları) geleneksel makine öğrenimi tekniklerine görünmez iddiaları tek tek değerlendiriyorlar. Yalıtılmış bir kaza tamamıyla meşru görünebilir, ama eğer olaya bir ilişkiler ağı içerisinde bakarsanız, aynı üçüncü taraflar, aynı Çalıştayda, düzinelerce iddiada yer alan aynı avukatın modeli açıkça ortaya çıkıyor.
Grafikler bu ilişkileri modellemenize olanak tanır: i deniz mili onlar konu (sigortalılar, üçüncü şahıslar, atölyeler, avukatlar, doktorlar), kemerler onlar bağlantılar (aynı kaza, aynı atölye, aynı sokak). algoritmaları topluluk tespiti şüpheli kümeleri otomatik olarak tanımlar.
import networkx as nx
import pandas as pd
import numpy as np
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from community import best_partition # python-louvain
@dataclass
class FraudRing:
"""Un cluster sospetto identificato dal grafo."""
ring_id: str
members: List[str] # nodi del cluster
claim_ids: List[str]
total_claimed: float
avg_fraud_score: float
ring_type: str # es. "medical_mill", "staged_accident", "repair_shop_ring"
evidence_summary: str
class InsuranceFraudGraphAnalyzer:
"""
Analisi grafo per rilevazione di insurance fraud rings.
Costruisce un grafo bipartito: sinistri <-> entità (persone, officine, medici)
e applica algoritmi di community detection per trovare cluster sospetti.
"""
# Soglie per classificare un nodo come sospetto
SUSPICION_THRESHOLDS = {
"claimant": {"min_claims": 3, "fraud_rate": 0.2},
"repair_shop": {"min_claims": 5, "fraud_rate": 0.15},
"attorney": {"min_claims": 10, "fraud_rate": 0.15},
"medical_provider": {"min_claims": 8, "fraud_rate": 0.20},
}
def __init__(self, claims_df: pd.DataFrame) -> None:
self.claims_df = claims_df
self.graph = self._build_graph()
def _build_graph(self) -> nx.Graph:
"""
Costruisce un grafo di co-occorrenza tra entità nei sinistri.
Due entità sono connesse se compaiono nello stesso sinistro.
"""
G = nx.Graph()
df = self.claims_df
# Aggiungi nodi per ogni tipo di entità
entity_columns = [
("claimant_id", "claimant"),
("third_party_id", "third_party"),
("repair_shop_id", "repair_shop"),
("attorney_id", "attorney"),
("medical_provider_id", "medical_provider"),
]
for _, row in df.iterrows():
entities_in_claim: List[Tuple[str, str]] = []
for col, entity_type in entity_columns:
if pd.notna(row.get(col)):
node_id = f"{entity_type}_{row[col]}"
G.add_node(node_id, entity_type=entity_type, entity_id=str(row[col]))
entities_in_claim.append((node_id, entity_type))
# Collega tutte le entità che compaiono nello stesso sinistro
for i, (node1, type1) in enumerate(entities_in_claim):
for node2, type2 in entities_in_claim[i+1:]:
if G.has_edge(node1, node2):
G[node1][node2]["weight"] += 1
G[node1][node2]["claims"].append(str(row.get("claim_id", "")))
else:
G.add_edge(
node1, node2,
weight=1,
claims=[str(row.get("claim_id", ""))],
)
return G
def detect_fraud_rings(self, min_ring_size: int = 3) -> List[FraudRing]:
"""
Rileva i fraud ring tramite community detection (Louvain algorithm).
Filtra le community per dimensione e score di sospetto.
"""
if self.graph.number_of_nodes() < min_ring_size:
return []
# Louvain community detection
partition: Dict[str, int] = best_partition(self.graph, weight="weight")
# Raggruppa nodi per community
communities: Dict[int, List[str]] = {}
for node, community_id in partition.items():
communities.setdefault(community_id, []).append(node)
fraud_rings: List[FraudRing] = []
for community_id, members in communities.items():
if len(members) < min_ring_size:
continue
ring = self._evaluate_community(community_id, members)
if ring is not None:
fraud_rings.append(ring)
return sorted(fraud_rings, key=lambda r: r.avg_fraud_score, reverse=True)
def _evaluate_community(
self, community_id: int, members: List[str]
) -> "Optional[FraudRing]":
"""Valuta se una community e sospetta e costruisce il FraudRing."""
# Raccogli tutti i sinistri associati ai membri della community
claim_ids: Set[str] = set()
for u, v, data in self.graph.edges(members, data=True):
claim_ids.update(data.get("claims", []))
if not claim_ids:
return None
# Statistiche sui sinistri della community
community_claims = self.claims_df[
self.claims_df["claim_id"].astype(str).isin(claim_ids)
]
if community_claims.empty:
return None
total_claimed = float(community_claims["reported_amount"].sum())
fraud_col = "is_fraud" if "is_fraud" in community_claims.columns else None
fraud_rate = float(community_claims[fraud_col].mean()) if fraud_col else 0.0
# Score di sospetto basato su: densita del grafo, fraud rate storico, importi
subgraph = self.graph.subgraph(members)
density = nx.density(subgraph)
avg_weight = float(np.mean([
d["weight"] for _, _, d in subgraph.edges(data=True)
])) if subgraph.number_of_edges() > 0 else 0.0
suspicion_score = (
density * 0.4 +
fraud_rate * 0.4 +
min(avg_weight / 10, 1.0) * 0.2
)
# Filtra community non sospette
if suspicion_score < 0.3 and fraud_rate < 0.1:
return None
ring_type = self._classify_ring_type(members)
evidence = self._build_evidence_summary(members, community_claims, fraud_rate, density)
return FraudRing(
ring_id=f"ring_{community_id}",
members=members,
claim_ids=list(claim_ids),
total_claimed=round(total_claimed, 2),
avg_fraud_score=round(suspicion_score, 3),
ring_type=ring_type,
evidence_summary=evidence,
)
def _classify_ring_type(self, members: List[str]) -> str:
"""Classifica il tipo di ring in base alle entità presenti."""
types = [self.graph.nodes[m].get("entity_type", "") for m in members]
type_counts: Dict[str, int] = {}
for t in types:
type_counts[t] = type_counts.get(t, 0) + 1
if type_counts.get("medical_provider", 0) >= 2:
return "medical_mill"
if type_counts.get("repair_shop", 0) >= 2:
return "repair_shop_ring"
if type_counts.get("attorney", 0) >= 1 and type_counts.get("medical_provider", 0) >= 1:
return "organized_injury_ring"
return "staged_accident_ring"
def _build_evidence_summary(
self,
members: List[str],
claims: pd.DataFrame,
fraud_rate: float,
density: float,
) -> str:
n_claims = len(claims)
total = claims["reported_amount"].sum() if not claims.empty else 0
return (
f"Community di {len(members)} soggetti, {n_claims} sinistri collegati, "
f"EUR {total:.0f} totale rivendicato. "
f"Fraud rate storico: {fraud_rate:.0%}. "
f"Densita grafo: {density:.2f}."
)
def get_node_centrality_scores(self) -> Dict[str, float]:
"""
Calcola la centralità di ogni nodo nel grafo (betweenness centrality).
Nodi ad alta centralità sono spesso i coordinatori del ring.
"""
centrality = nx.betweenness_centrality(
self.graph, weight="weight", normalized=True
)
return {node: round(score, 6) for node, score in centrality.items()}
Dolandırıcılık Puanlaması için Topluluk Modeli
Tek bir algoritma tüm dolandırıcılık türlerini yakalayamaz. Üretimde en sağlam yaklaşım birden fazla deseni tek bir grupta birleştirin: tablo desenleri için XGBoost, etiketlenmemiş verilerde anormallik tespiti ve dahil edilecek bir grafik özellikleri modeli grafikten çıkarılan ilişkisel sinyaller.
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
class InsuranceFraudEnsemble:
"""
Ensemble di modelli per fraud detection assicurativo.
Combina:
1. XGBoost classifier (classificazione supervisionata)
2. Isolation Forest (anomaly detection non supervisionata)
3. Score da graph centrality (segnale relazionale)
Il vantaggio dell'ensemble e la robustezza: se un modello
manca un tipo di frode, gli altri possono compensare.
"""
XGB_PARAMS: Dict = {
"objective": "binary:logistic",
"eval_metric": "aucpr", # area under precision-recall: meglio di AUC per dati sbilanciati
"max_depth": 6,
"learning_rate": 0.05,
"n_estimators": 400,
"min_child_weight": 10,
"subsample": 0.8,
"colsample_bytree": 0.8,
"scale_pos_weight": 20, # compensa lo sbilanciamento: tipicamente 1 frode ogni 20 claim
"reg_alpha": 0.1,
"reg_lambda": 1.0,
"tree_method": "hist",
"early_stopping_rounds": 30,
}
# Pesi ensemble (devono sommare a 1)
ENSEMBLE_WEIGHTS: Dict[str, float] = {
"xgboost": 0.55,
"isolation_forest": 0.20,
"graph_centrality": 0.25,
}
def __init__(self) -> None:
self.xgb_model: Optional[xgb.XGBClassifier] = None
self.iso_forest: Optional[IsolationForest] = None
self.scaler = StandardScaler()
self.feature_names: List[str] = []
self._is_fitted = False
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series,
) -> "InsuranceFraudEnsemble":
"""
Addestra l'ensemble.
Args:
X, y: training set (y=1 se frode, y=0 altrimenti)
X_val, y_val: validation set per early stopping
"""
self.feature_names = X.columns.tolist()
# 1. XGBoost supervisionato
print("Training XGBoost classifier...")
self.xgb_model = xgb.XGBClassifier(**self.XGB_PARAMS)
self.xgb_model.fit(
X, y,
eval_set=[(X_val, y_val)],
verbose=50,
)
# 2. Isolation Forest (non supervisionato, addestrato solo su non-frodi)
print("Training Isolation Forest on clean claims...")
X_clean = X[y == 0]
X_scaled = self.scaler.fit_transform(X_clean)
self.iso_forest = IsolationForest(
n_estimators=200,
contamination=0.05, # stima del 5% di anomalie nel set pulito
random_state=42,
n_jobs=-1,
)
self.iso_forest.fit(X_scaled)
self._is_fitted = True
return self
def predict_fraud_score(
self,
X: pd.DataFrame,
graph_centrality_scores: Optional[Dict[str, float]] = None,
) -> np.ndarray:
"""
Calcola lo score di frode per ogni claim.
Args:
X: feature matrix
graph_centrality_scores: score di centralità dal grafo (opzionale)
Returns:
Array di score [0,1] dove 1 = massima probabilità di frode
"""
if not self._is_fitted:
raise RuntimeError("Ensemble non addestrato. Chiamare fit() prima.")
# Score XGBoost
xgb_scores = self.xgb_model.predict_proba(X)[:, 1]
# Score Isolation Forest (normalizzato a [0,1])
X_scaled = self.scaler.transform(X)
iso_raw = self.iso_forest.decision_function(X_scaled)
# score negativo = anomalia; normalizziamo invertendo e scalando
iso_scores = 1 - (iso_raw - iso_raw.min()) / (iso_raw.max() - iso_raw.min() + 1e-10)
# Graph centrality scores
if graph_centrality_scores and "graph_centrality" in X.columns:
graph_scores = X["graph_centrality"].values
else:
graph_scores = np.zeros(len(X))
# Ensemble ponderato
ensemble_score = (
self.ENSEMBLE_WEIGHTS["xgboost"] * xgb_scores +
self.ENSEMBLE_WEIGHTS["isolation_forest"] * iso_scores +
self.ENSEMBLE_WEIGHTS["graph_centrality"] * graph_scores
)
return np.clip(ensemble_score, 0, 1)
def classify_risk_tier(
self, scores: np.ndarray
) -> List[str]:
"""Classifica gli score in tier di rischio per routing."""
tiers = []
for score in scores:
if score < 0.2:
tiers.append("GREEN") # auto-approve
elif score < 0.4:
tiers.append("YELLOW") # standard review
elif score < 0.7:
tiers.append("ORANGE") # enhanced review
else:
tiers.append("RED") # SIU referral
return tiers
def get_feature_importance(self) -> pd.DataFrame:
"""Feature importance dal modello XGBoost."""
if self.xgb_model is None:
raise RuntimeError("Modello non addestrato.")
importances = self.xgb_model.feature_importances_
return pd.DataFrame({
"feature": self.feature_names,
"importance": importances,
}).sort_values("importance", ascending=False)
Kafka ve Faust ile Gerçek Zamanlı Dolandırıcılık Puanlaması
Hızdan yararlanan dolandırıcılıklara (örneğin, zaten hasar görmüş bir aracın sigortalanması) yönelik veya aynı gün farklı şirketlere yönelik birden fazla talep açıldı; dolandırıcılık puanlaması mutlaka yapılmalıdır. içinde gerçekleşmek gerçek zamanlı, FNOL zamanında, gecelik toplu olarak değil. Apache Kafka ve Faust (Python akış işleme), gerçek zamanlı işlem hatları oluşturmanıza olanak tanır bir saniyenin altındaki gecikmeyle.
import faust
from typing import Optional
import json
# Schema del messaggio FNOL in entrata
class FNOLEvent(faust.Record, serializer="json"):
claim_id: str
policy_id: str
claimant_id: str
claim_type: str
reported_amount: float
incident_date: str
report_date: str
third_party_ids: list
repair_shop_id: Optional[str] = None
attorney_id: Optional[str] = None
class FraudScoreResult(faust.Record, serializer="json"):
claim_id: str
fraud_score: float
risk_tier: str
fraud_ring_detected: bool
ring_id: Optional[str]
routing_decision: str
processing_time_ms: float
# Configurazione app Faust
app = faust.App(
"insurance-fraud-detector",
broker="kafka://kafka-broker:9092",
value_serializer="json",
)
fnol_topic = app.topic("insurance.fnol.events", value_type=FNOLEvent)
fraud_scores_topic = app.topic("insurance.fraud.scores", value_type=FraudScoreResult)
# Lazy loading del modello ensemble (caricato una volta all'avvio)
_fraud_ensemble = None
_feature_engineer = None
_graph_analyzer = None
def get_fraud_ensemble():
"""Lazy loading del modello per evitare load al startup."""
global _fraud_ensemble
if _fraud_ensemble is None:
import mlflow
_fraud_ensemble = mlflow.sklearn.load_model(
"models:/insurance-fraud-ensemble/Production"
)
return _fraud_ensemble
@app.agent(fnol_topic)
async def process_fnol_events(events):
"""
Agent Faust che processa ogni FNOL in real-time.
Per ogni evento:
1. Estrae le feature comportamentali
2. Calcola il fraud score ensemble
3. Verifica connessioni con fraud rings noti
4. Pubblica il risultato nel topic di output
"""
import time
async for event in events:
start_time = time.monotonic()
try:
# Costruisci le feature (da cache Redis o DB)
features = await _build_features_async(event)
# Calcola fraud score
ensemble = get_fraud_ensemble()
fraud_score = float(ensemble.predict_fraud_score(features)[0])
risk_tier = ensemble.classify_risk_tier([fraud_score])[0]
# Verifica connessioni con ring noti
ring_id, ring_detected = await _check_ring_connections(event)
# Aggiusta score se ring rilevato
if ring_detected:
fraud_score = min(1.0, fraud_score * 1.4)
risk_tier = "RED"
# Determina routing
routing = _routing_decision(risk_tier, ring_detected, event)
elapsed_ms = (time.monotonic() - start_time) * 1000
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=round(fraud_score, 4),
risk_tier=risk_tier,
fraud_ring_detected=ring_detected,
ring_id=ring_id,
routing_decision=routing,
processing_time_ms=round(elapsed_ms, 2),
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
except Exception as exc:
# Non bloccare lo stream per errori singoli
print(f"[ERROR] Claim {event.claim_id}: {exc}")
# Invia con score conservativo per revisione manuale
result = FraudScoreResult(
claim_id=event.claim_id,
fraud_score=0.5,
risk_tier="YELLOW",
fraud_ring_detected=False,
ring_id=None,
routing_decision="MANUAL_REVIEW_ERROR",
processing_time_ms=-1.0,
)
await fraud_scores_topic.send(key=event.claim_id, value=result)
async def _build_features_async(event: FNOLEvent) -> "pd.DataFrame":
"""Costruisce le feature in modo asincrono da Redis/DB."""
import pandas as pd
# In produzione: lookup asincrono su Redis per dati real-time
# e su DB per storico sinistri e policy
features = {
"reported_amount_log": 0.0,
"report_delay_days": 0.0,
"prior_claims_total": 0.0,
"n_third_parties": float(len(event.third_party_ids)),
"attorney_present": float(event.attorney_id is not None),
"attorney_and_medical": 0.0,
"incident_weekend": 0.0,
"round_amount": float(event.reported_amount % 1000 == 0),
"amount_percentile_for_type": 0.5,
"graph_centrality": 0.0,
}
return pd.DataFrame([features])
async def _check_ring_connections(event: FNOLEvent):
"""Verifica se il richiedente e connesso a ring fraudolenti noti."""
# In produzione: query su graph DB (Neo4j) o Redis per ring attivi
return None, False
def _routing_decision(
risk_tier: str, ring_detected: bool, event: FNOLEvent
) -> str:
if ring_detected or risk_tier == "RED":
return "SIU_REFERRAL"
elif risk_tier == "ORANGE":
return "ENHANCED_REVIEW"
elif risk_tier == "YELLOW":
return "STANDARD_REVIEW"
return "AUTO_APPROVE"
En İyi Uygulamalar ve Anti-kalıplar
Sigorta Sahtekarlığının Tespiti için En İyi Uygulamalar
- Zorunlu topluluk: her zaman denetlenen bir modeli (XGBoost), denetlenmeyen bir modeli (İzolasyon Ormanı) ve ağ sinyallerini (grafik analizi) birleştirir; tek bir yaklaşım tüm dolandırıcılık türlerini kapsamaz
- Talep türüne göre eşiklerin kalibrasyonu: otomobil dolandırıcılığı için en uygun eşik, hayat veya kaza sigortasıyla aynı değildir; iş koluna göre kalibre edin
- Zorunlu geri bildirim döngüsü: SIU araştırmalarının sonuçları eğitim setinde etiket olarak yer almalıdır; Geri bildirim döngüleri olmadan model zamanla gelişmez
- Gerçek zamanlı bağlantılar için Graph DB: milisaniyelik bağlantı sorguları için Neo4j veya ArangoDB'yi kullanın; geleneksel grafikler (NetworkX) milyonlarca düğümün ötesine ölçeklenemez
- Her kararı belgeleyin: Her dolandırıcılık bayrağının, puanı belirleyen özelliklerin yer aldığı ayrıntılı bir denetim izi olmalıdır; olası yasal işlem için zorunludur
Kaçınılması Gereken Anti-örüntüler
- Yüksek yanlış pozitifler: %2'nin üzerindeki hatalı pozitif oran, dürüst müşterilerin güvenini aşındırır ve operasyonel maliyetlere neden olur; geri çağırmanın yanı sıra hassasiyeti de izler
- Yalnızca bilinen dolandırıcılıklara yönelik eğitilmiş model: dolandırıcılık gelişir; yalnızca tarihsel kalıpları bilen bir model yeni kalıpları tespit etmez; beklenmeyen anormallikleri yakalamak için İzolasyon Ormanı'nı kullanın
- Vekaleten ayrımcılık: Posta kodu, meslek veya uyruk gibi değişkenler ayrımcı temsiller olabilir; Dağıtımdan önce farklı etkiyi test edin
- Her şey için gecelik toplu puanlama: hızlı dolandırıcılıklar (aynı gün, çok şirketli) gerçek zamanlı puanlama gerektirir; bu durumlar için gece partisi yetersiz
Sonuçlar ve Sonraki Adımlar
Sigortada modern sahtekarlık tespiti çok seviyeli bir yaklaşım gerektirir: özellik bireysel dolandırıcılık puanlaması için davranışsal, dolandırıcılık ağları için grafik analizi hızlı dolandırıcılıklara karşı organize ve gerçek zamanlı işleme. Farklı modellerden oluşan topluluk Dolandırıcılık kalıplarının mümkün olan en geniş şekilde kapsanmasını sağlar.
Başarılı bir sistemin anahtarları şunlardır: SIU sonuçlarıyla sürekli geri bildirim döngüsü, Dürüst müşterileri cezalandırmamak için yanlış pozitif oranının izlenmesi ve Herhangi bir yasal işlemi desteklemek için değişmez denetim izi.
Serinin bir sonraki makalesi daha fazla ayrıntıya giriyor ACORD Standardı ve API Entegrasyonu Sigorta: farklı sigorta sistemleri arasında birlikte çalışabilirliğin nasıl uygulanacağı standart ACORD XML/JSON mesajlarını kullanarak.
InsurTech Mühendislik Serisi
- 01 - Geliştiriciler için Sigorta Alanı: Ürünler, Aktörler ve Veri Modelleri
- 02 - Bulutta Yerel Politika Yönetimi: API Öncelikli Mimari
- 03 - Telematik İşlem Hattı: Geniş Ölçekte UBI Veri İşleme
- 04 - Yapay Zeka Sigortacılığı: Özellik Mühendisliği ve Risk Puanlaması
- 05 - Talep Otomasyonu: Bilgisayarla Görme ve NLP
- 06 - Dolandırıcılık Tespiti: Grafik Analizi ve Davranışsal Sinyal (bu makale)
- 07 - ACORD Standardı ve Sigorta API Entegrasyonu
- 08 - Uyumluluk Mühendisliği: Solvency II ve UFRS 17







