Claims Automation: Computer Vision și NLP pentru Claims Management
Gestionarea reclamațiilor este, în mod tradițional, cel mai costisitor și mai intens proces pentru clienți a industriei asigurărilor. Un accident de mașină durează în medie 8-15 zile pentru a se rezolva, cu 4-7 puncte de contact între client și companie, documente care urmează să fie colectate în format hârtie, evaluări fizice asupra vehiculului și sărituri între diferite departamente. Rata de nemulțumire a clienții în faza de reclamații și din punct de vedere istoric cea mai mare dintre orice interacțiune cu compania.
Inteligența artificială transformă acest proces de la zero. Rezultatele industriei în 2025 sunt extraordinare: Amiralul Seguros a ajuns la 90% din estimări automate complet fără atingere, cu 98% din evaluări finalizate în mai puțin de 15 minute. Unele companii raportați ratele de automatizare end-to-end până la 57%, cu un timp mediu de Achitarea a fost redusă de la săptămâni la ore pentru revendicările auto standard. Precizie în extracție datele din documente ajung la 96%, comparativ cu 65% pentru operatorul uman.
Acest ghid construiește un sistem complet de automatizare a reclamațiilor: din managementul digital de FNOL (Primul Notice de Pierdere), la evaluarea daunelor cu viziune computerizată, la extracție informații din documente cu NLP, până la orchestrarea fluxului de lucru de decontare.
Ce vei învăța
- Arhitectura unui sistem de automatizare end-to-end a daunelor
- FNOL digital: recepție și triaj automat de raportare
- Viziune computerizată pentru estimarea daunelor vehiculelor: modele CNN și transformatoare vizuale
- PNL pentru extragerea datelor din documentele de asigurare: rapoarte medicale, rapoarte de politie
- OCR avansat pentru digitizarea documentelor vechi
- Orchestrarea fluxului de lucru pentru procesul de decontare
- Măsuri de monitorizare pentru sistemele de automatizare a daunelor
Arhitectura sistemului de automatizare a reclamațiilor
Un sistem modern de automatizare a revendicărilor constă din straturi distincte cu responsabilități distincte definit. Arhitectura de microservicii permite componentelor să se scaleze independent și pentru a actualiza modele individuale fără a afecta întreaga conductă.
Straturi arhitecturale
| Straturi | Componente | Tehnologii |
|---|---|---|
| Ingestie | Admitere FNOL, încărcare document, gateway API | FastAPI, S3/GCS, Kafka |
| Prelucrare | OCR, extracție NLP, Computer Vision | Tesseract, spaCy, PyTorch, Hugging Face |
| Inteligența | Estimarea daunelor, scorul fraudelor, calculul rezervelor | YOLOv8, Detectron2, XGBoost |
| Orchestrație | Motor de flux de lucru, management SLA, escaladare | Temporal.io, Airflow, mașini de stat |
| Ieșiri | Oferte de decontare, comunicații cu clienții, jurnale de audit | Twilio, SendGrid, EventStore |
FNOL Digital: Prima notificare de pierdere inteligentă
FNOL (First Notice of Loss) și raportarea inițială a reclamației de către client. În mod tradițional, acest lucru se făcea prin telefon sau personal; în sistemele moderne se întâmplă prin intermediul aplicațiilor mobil, chatbot sau portal web. AI intervine din primul moment pentru a clasifica tipul de revendicare, evaluați-i complexitatea și direcționați-o către fluxul de lucru corect.
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import uuid
class ClaimType(str, Enum):
AUTO_COLLISION = "auto_collision"
AUTO_THEFT = "auto_theft"
AUTO_WINDSHIELD = "auto_windshield"
PROPERTY_WATER = "property_water"
PROPERTY_FIRE = "property_fire"
LIABILITY = "liability"
HEALTH = "health"
UNKNOWN = "unknown"
class ClaimComplexity(str, Enum):
SIMPLE = "simple" # liquidazione automatica
STANDARD = "standard" # processo guidato
COMPLEX = "complex" # intervento umano
LITIGIOUS = "litigious" # legal team
@dataclass
class FNOLSubmission:
"""Rappresenta una segnalazione FNOL ricevuta dal cliente."""
policy_number: str
incident_date: datetime
incident_description: str
location: str
photos: List[str] = field(default_factory=list) # URL foto caricate
documents: List[str] = field(default_factory=list)
contact_phone: str = ""
contact_email: str = ""
third_parties_involved: bool = False
injuries_reported: bool = False
police_report_available: bool = False
claim_id: str = field(default_factory=lambda: str(uuid.uuid4()))
received_at: datetime = field(default_factory=datetime.now)
@dataclass
class FNOLAssessment:
"""Risultato dell'analisi AI della FNOL."""
claim_id: str
claim_type: ClaimType
complexity: ClaimComplexity
estimated_severity: str # low/medium/high
auto_settlement_eligible: bool
required_documents: List[str]
assigned_workflow: str
fraud_risk_score: float # 0-1
priority: int # 1=urgente, 5=normale
routing_notes: str = ""
class FNOLTriageService:
"""
Servizio di triaging automatico per segnalazioni FNOL.
Combina regole di business e modelli ML per classificare
ogni sinistro e assegnarlo al workflow appropriato.
"""
# Keyword per classificazione tipo sinistro (semplificata)
CLAIM_TYPE_KEYWORDS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"incidente", "scontro", "tamponamento", "urto",
"collision", "crash", "accident"
],
ClaimType.AUTO_THEFT: [
"furto", "rubato", "scomparso", "theft", "stolen"
],
ClaimType.AUTO_WINDSHIELD: [
"parabrezza", "vetro", "windshield", "cristallo"
],
ClaimType.PROPERTY_WATER: [
"allagamento", "perdita", "infiltrazione", "acqua",
"flood", "water", "leak"
],
ClaimType.PROPERTY_FIRE: [
"incendio", "fuoco", "fiamme", "fire", "burn"
],
}
REQUIRED_DOCS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"CID/CAI o rapporto polizia",
"foto veicolo (4+ lati)",
"documento identità",
"patente di guida",
"carta di circolazione",
],
ClaimType.AUTO_THEFT: [
"denuncia polizia (entro 48h)",
"documento identità",
"carte del veicolo",
"chiavi originali",
],
ClaimType.PROPERTY_WATER: [
"foto danni",
"intervento idraulico (se disponibile)",
"preventivo riparazione",
],
}
def triage(
self, fnol: FNOLSubmission, fraud_score: float = 0.0
) -> FNOLAssessment:
"""Esegue il triaging automatico della segnalazione FNOL."""
claim_type = self._classify_type(fnol.incident_description)
complexity = self._assess_complexity(fnol, fraud_score)
severity = self._estimate_severity(fnol, claim_type)
auto_eligible = self._is_auto_settlement_eligible(fnol, complexity, fraud_score)
workflow = self._assign_workflow(claim_type, complexity)
priority = self._calculate_priority(fnol, complexity, fraud_score)
return FNOLAssessment(
claim_id=fnol.claim_id,
claim_type=claim_type,
complexity=complexity,
estimated_severity=severity,
auto_settlement_eligible=auto_eligible,
required_documents=self.REQUIRED_DOCS.get(claim_type, [
"documento identità",
"foto danni",
"preventivo riparazione",
]),
assigned_workflow=workflow,
fraud_risk_score=fraud_score,
priority=priority,
routing_notes=self._build_routing_notes(
fnol, complexity, fraud_score
),
)
def _classify_type(self, description: str) -> ClaimType:
"""Classifica il tipo di sinistro dalla descrizione testuale."""
desc_lower = description.lower()
scores: Dict[ClaimType, int] = {}
for claim_type, keywords in self.CLAIM_TYPE_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in desc_lower)
if score > 0:
scores[claim_type] = score
if not scores:
return ClaimType.UNKNOWN
return max(scores, key=lambda k: scores[k])
def _assess_complexity(
self, fnol: FNOLSubmission, fraud_score: float
) -> ClaimComplexity:
"""Determina la complessità del sinistro."""
if fnol.injuries_reported:
return ClaimComplexity.LITIGIOUS
if fraud_score > 0.7:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved and not fnol.police_report_available:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved or fraud_score > 0.4:
return ClaimComplexity.STANDARD
return ClaimComplexity.SIMPLE
def _estimate_severity(
self, fnol: FNOLSubmission, claim_type: ClaimType
) -> str:
"""Stima la severita economica (low/medium/high)."""
if fnol.injuries_reported:
return "high"
if claim_type == ClaimType.AUTO_THEFT:
return "high"
if fnol.third_parties_involved:
return "medium"
return "low"
def _is_auto_settlement_eligible(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> bool:
"""Verifica l'eleggibilita per liquidazione automatica."""
if complexity not in [ClaimComplexity.SIMPLE, ClaimComplexity.STANDARD]:
return False
if fraud_score > 0.3:
return False
if fnol.injuries_reported or fnol.third_parties_involved:
return False
if len(fnol.photos) < 2:
return False
return True
def _assign_workflow(
self, claim_type: ClaimType, complexity: ClaimComplexity
) -> str:
workflow_map = {
(ClaimType.AUTO_COLLISION, ClaimComplexity.SIMPLE): "auto_collision_fast_track",
(ClaimType.AUTO_COLLISION, ClaimComplexity.STANDARD): "auto_collision_standard",
(ClaimType.AUTO_COLLISION, ClaimComplexity.COMPLEX): "auto_collision_manual",
(ClaimType.AUTO_THEFT, ClaimComplexity.SIMPLE): "auto_theft_standard",
(ClaimType.AUTO_WINDSHIELD, ClaimComplexity.SIMPLE): "windshield_auto",
}
return workflow_map.get(
(claim_type, complexity),
f"generic_{complexity.value}_workflow"
)
def _calculate_priority(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> int:
if fnol.injuries_reported:
return 1 # massima priorità
if complexity == ClaimComplexity.LITIGIOUS:
return 1
if fraud_score > 0.7:
return 2 # priorità alta per indagine fraud
if complexity == ClaimComplexity.COMPLEX:
return 3
return 5 # normale
def _build_routing_notes(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> str:
notes = []
if fnol.injuries_reported:
notes.append("ATTENZIONE: lesioni personali dichiarate - escalation legal/medical obbligatoria")
if fraud_score > 0.5:
notes.append(f"Fraud score elevato ({fraud_score:.2f}) - revisione SIU raccomandata")
if not fnol.photos:
notes.append("Nessuna foto allegata - richiedere al cliente prima di procedere")
return "; ".join(notes) if notes else "Nessuna nota particolare"
Viziune computerizată pentru estimarea daunelor auto
Estimarea automată a daunelor este cea mai de impact componentă AI în automatizarea daunelor. Clientul fotografiază vehiculul avariat cu smartphone-ul său, iar sistemul analizează imaginile pentru a identifica piesele deteriorate, estimați tipul de intervenție necesar (reparație vs înlocuire) și calculați o estimare a costurilor pe baza bazelor de date de preț actualizate.
Cele mai folosite modele din industrie se combină detectarea obiectelor (identificați piese deteriorate) cu clasificarea severității daunei (clasificați entitatea de daune minore până la totale). Abordări precum Tractable, liderul pieței, au dovedit acest lucru că aceste sisteme pot egala sau depăși acuratețea unui expert evaluator uman.
import torch
import torchvision.transforms as T
from torchvision.models import resnet50, ResNet50_Weights
import numpy as np
from PIL import Image
from typing import Dict, List, Tuple
from dataclasses import dataclass
import io
@dataclass
class DamageRegion:
"""Regione di danno identificata nell'immagine."""
part_name: str # es. "bumper_front", "door_left"
damage_type: str # scratch, dent, crack, broken
severity: str # minor, moderate, severe, total_loss
confidence: float # 0-1
repair_vs_replace: str # "repair" o "replace"
estimated_cost_eur: float
@dataclass
class VehicleDamageAssessment:
"""Risultato completo dell'analisi danni veicolo."""
claim_id: str
images_analyzed: int
damage_regions: List[DamageRegion]
total_estimated_cost: float
total_loss_likelihood: float # 0-1
settlement_recommendation: str
confidence_overall: float
requires_physical_inspection: bool
assessment_notes: str
class VehicleDamageClassifier:
"""
Classificatore di danni veicolo basato su transfer learning.
Architettura: ResNet-50 fine-tuned su dataset proprietario di danni auto
Output: classificazione per parte + severita + tipo danno
In produzione: usare modelli specializzati come quelli di Tractable,
Mitchell, o modelli addestrati su dataset interni.
"""
# Parti veicolo monitorabili
VEHICLE_PARTS = [
"bumper_front", "bumper_rear",
"hood", "trunk",
"door_front_left", "door_front_right",
"door_rear_left", "door_rear_right",
"fender_front_left", "fender_front_right",
"windshield_front", "windshield_rear",
"headlight_left", "headlight_right",
"mirror_left", "mirror_right",
"roof", "pillar",
]
# Tabella costi di riferimento (EUR, valori indicativi 2025)
REPAIR_COST_TABLE: Dict[str, Dict[str, float]] = {
"bumper_front": {"minor": 200, "moderate": 600, "severe": 1200, "replace": 900},
"bumper_rear": {"minor": 200, "moderate": 550, "severe": 1100, "replace": 850},
"hood": {"minor": 300, "moderate": 800, "severe": 1500, "replace": 1200},
"door_front_left": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"door_front_right": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"windshield_front": {"minor": 0, "moderate": 350, "severe": 600, "replace": 450},
"headlight_left": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
"headlight_right": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
}
DEFAULT_COST = {"minor": 150, "moderate": 450, "severe": 900, "replace": 700}
def __init__(self, model_path: str = "damage_classifier.pt") -> None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model(model_path)
self.transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _load_model(self, model_path: str) -> torch.nn.Module:
"""Carica il modello fine-tuned da file o usa il pretrained come fallback."""
try:
model = resnet50(weights=None)
# Adatta la testa di classificazione al numero di parti*severita
num_classes = len(self.VEHICLE_PARTS) * 4 # 4 livelli severita
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
state = torch.load(model_path, map_location=self.device)
model.load_state_dict(state)
print(f"Modello caricato da {model_path}")
except FileNotFoundError:
print("Modello fine-tuned non trovato, uso pretrained ResNet50 (demo only)")
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.eval()
return model.to(self.device)
def analyze_image(self, image_bytes: bytes) -> List[Tuple[str, str, float]]:
"""
Analizza una singola immagine.
Returns: lista di (part_name, severity, confidence)
"""
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
tensor = self.transform(image).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
# Soglia: riporta solo danni con confidence > 30%
results = []
threshold = 0.30
for i, part in enumerate(self.VEHICLE_PARTS):
severities = ["minor", "moderate", "severe", "total_loss"]
for j, sev in enumerate(severities):
idx = i * 4 + j
if idx < len(probs) and probs[idx] > threshold:
results.append((part, sev, float(probs[idx])))
return sorted(results, key=lambda x: x[2], reverse=True)
def estimate_repair_cost(self, part: str, severity: str) -> Tuple[float, str]:
"""
Stima il costo di riparazione e determina repair vs replace.
Returns: (cost_eur, repair_or_replace)
"""
cost_table = self.REPAIR_COST_TABLE.get(part, self.DEFAULT_COST)
if severity in ["severe", "total_loss"]:
replace_cost = cost_table.get("replace", 700)
repair_cost = cost_table.get("severe", 900)
if replace_cost < repair_cost * 0.8:
return replace_cost, "replace"
return repair_cost, "repair"
repair_cost = cost_table.get(severity, 150)
return repair_cost, "repair"
def assess_multiple_images(
self, claim_id: str, image_bytes_list: List[bytes]
) -> VehicleDamageAssessment:
"""
Valutazione completa su più immagini del veicolo.
Aggrega i risultati di tutte le foto per una stima robusta.
"""
# Raccoglie rilevazioni da tutte le immagini
all_detections: Dict[str, List[Tuple[str, float]]] = {}
for img_bytes in image_bytes_list:
detections = self.analyze_image(img_bytes)
for part, severity, confidence in detections:
if part not in all_detections:
all_detections[part] = []
all_detections[part].append((severity, confidence))
# Consolida: per ogni parte prende la severita più alta con confidence media
damage_regions: List[DamageRegion] = []
total_cost = 0.0
severity_order = {"minor": 0, "moderate": 1, "severe": 2, "total_loss": 3}
for part, severity_confidences in all_detections.items():
# Prendi la severita più alta rilevata
max_sev = max(severity_confidences, key=lambda x: severity_order.get(x[0], 0))
severity, _ = max_sev
avg_confidence = np.mean([c for _, c in severity_confidences])
cost, repair_replace = self.estimate_repair_cost(part, severity)
total_cost += cost
damage_regions.append(DamageRegion(
part_name=part,
damage_type="structural" if severity in ["severe", "total_loss"] else "cosmetic",
severity=severity,
confidence=round(float(avg_confidence), 3),
repair_vs_replace=repair_replace,
estimated_cost_eur=cost,
))
total_loss_likelihood = self._estimate_total_loss(damage_regions, total_cost)
requires_inspection = (
total_loss_likelihood > 0.5 or
total_cost > 8000 or
any(d.severity == "severe" and "windshield" in d.part_name
for d in damage_regions)
)
return VehicleDamageAssessment(
claim_id=claim_id,
images_analyzed=len(image_bytes_list),
damage_regions=damage_regions,
total_estimated_cost=round(total_cost, 2),
total_loss_likelihood=round(total_loss_likelihood, 3),
settlement_recommendation=self._settlement_recommendation(
total_cost, total_loss_likelihood
),
confidence_overall=round(
float(np.mean([d.confidence for d in damage_regions])) if damage_regions else 0.0,
3
),
requires_physical_inspection=requires_inspection,
assessment_notes=self._build_notes(damage_regions, total_loss_likelihood),
)
def _estimate_total_loss(
self, regions: List[DamageRegion], total_cost: float
) -> float:
"""Stima la probabilità di perdita totale."""
if total_cost > 15000:
return 0.95
if total_cost > 10000:
return 0.70
severe_count = sum(1 for r in regions if r.severity in ["severe", "total_loss"])
if severe_count >= 4:
return 0.80
if severe_count >= 2:
return 0.40
return max(0.0, (total_cost - 5000) / 10000) if total_cost > 5000 else 0.05
def _settlement_recommendation(self, cost: float, total_loss_prob: float) -> str:
if total_loss_prob > 0.7:
return "TOTAL_LOSS_SETTLEMENT"
if cost > 8000:
return "HIGH_VALUE_REPAIR_AUTHORIZATION"
if cost > 3000:
return "STANDARD_REPAIR_AUTHORIZATION"
return "FAST_TRACK_SETTLEMENT"
def _build_notes(
self, regions: List[DamageRegion], total_loss_prob: float
) -> str:
notes = []
if total_loss_prob > 0.5:
notes.append("Elevata probabilità di perdita totale - verificare valore veicolo")
severe = [r.part_name for r in regions if r.severity == "severe"]
if severe:
notes.append(f"Danni severi rilevati su: {', '.join(severe)}")
return "; ".join(notes) if notes else "Assessment completato senza anomalie"
NLP pentru extragerea de informații din documente
Fiecare accident generează zeci de documente: rapoarte de poliție, rapoarte medicale, deviz atelier, declarații de martori, rapoarte CID/CAI. Prelucrarea manuală a acestor documente și lent (1-3 zile) și predispus la erori. Sisteme moderne NLP, combinate cu OCR avansat, extrag automat informații structurate cu o acuratețe de 90-96%.
import spacy
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, date
from enum import Enum
class DocumentType(str, Enum):
POLICE_REPORT = "police_report"
MEDICAL_REPORT = "medical_report"
REPAIR_ESTIMATE = "repair_estimate"
CID_CAI = "cid_cai" # Constatazione Amichevole di Incidente
WITNESS_STATEMENT = "witness_statement"
INVOICE = "invoice"
UNKNOWN = "unknown"
@dataclass
class ExtractedEntity:
"""Entità estratta da un documento."""
entity_type: str
value: str
confidence: float
source_text: str # testo originale da cui e stata estratta
position: Tuple[int, int] # start, end nel documento
@dataclass
class DocumentExtraction:
"""Risultato dell'estrazione NLP da un documento assicurativo."""
document_type: DocumentType
document_date: Optional[date]
entities: List[ExtractedEntity]
structured_data: Dict
extraction_confidence: float
raw_text: str
class InsuranceDocumentExtractor:
"""
Estrattore NLP per documenti assicurativi.
Combina:
- spaCy per NER (Named Entity Recognition)
- Regex per pattern specifici del dominio assicurativo
- Regole di business per estrazione strutturata
"""
# Pattern regex per entità assicurative italiane
PATTERNS: Dict[str, str] = {
"targa": r"\b[A-Z]{2}\d{3}[A-Z]{2}\b|\b[A-Z]{2}\d{5}\b",
"polizza": r"(?:polizza|n\.\s*pol\.?)\s*[:\.]?\s*([A-Z0-9/-]{6,20})",
"codice_fiscale": r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b",
"partita_iva": r"\b(?:IT)?\d{11}\b",
"euro_amount": r"(?:EUR?|€)\s*[\d.,]{1,10}|[\d.,]{1,10}\s*(?:EUR?|€)",
"data": r"\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b",
"phone": r"(?:\+39|0039)?\s*(?:\d{2,4}[\s\-]?){2,4}\d{4}",
"email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,6}",
}
def __init__(self, spacy_model: str = "it_core_news_lg") -> None:
try:
self.nlp = spacy.load(spacy_model)
except OSError:
print(f"Modello spaCy '{spacy_model}' non trovato - installa con:")
print(f"python -m spacy download {spacy_model}")
self.nlp = None
def classify_document(self, text: str) -> DocumentType:
"""Classifica il tipo di documento basandosi sul contenuto."""
text_lower = text.lower()
classification_rules = [
(DocumentType.POLICE_REPORT, ["verbale", "polizia stradale", "carabinieri", "codice della strada", "accertamento"]),
(DocumentType.MEDICAL_REPORT, ["diagnosi", "prognosi", "lesioni", "ospedale", "pronto soccorso", "medico"]),
(DocumentType.CID_CAI, ["constatazione amichevole", "cid", "cai", "modulo blu"]),
(DocumentType.REPAIR_ESTIMATE, ["preventivo", "officina", "carrozzeria", "ricambi", "manodopera"]),
(DocumentType.INVOICE, ["fattura", "ricevuta", "importo totale", "iva", "imponibile"]),
]
scores: Dict[DocumentType, int] = {}
for doc_type, keywords in classification_rules:
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[doc_type] = score
return max(scores, key=lambda k: scores[k]) if scores else DocumentType.UNKNOWN
def extract(self, raw_text: str) -> DocumentExtraction:
"""Estrae tutte le informazioni rilevanti dal documento."""
doc_type = self.classify_document(raw_text)
entities = self._extract_entities(raw_text)
structured = self._build_structured_data(entities, doc_type, raw_text)
confidence = self._calculate_confidence(entities, doc_type)
doc_date = self._extract_document_date(entities)
return DocumentExtraction(
document_type=doc_type,
document_date=doc_date,
entities=entities,
structured_data=structured,
extraction_confidence=confidence,
raw_text=raw_text,
)
def _extract_entities(self, text: str) -> List[ExtractedEntity]:
"""Estrae entità con regex e NER spaCy."""
entities: List[ExtractedEntity] = []
# Estrazione con regex
for entity_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text, re.IGNORECASE):
entities.append(ExtractedEntity(
entity_type=entity_type,
value=match.group().strip(),
confidence=0.85, # alta confidence per regex su formato noto
source_text=text[max(0, match.start()-20):match.end()+20],
position=(match.start(), match.end()),
))
# Estrazione NER con spaCy
if self.nlp:
doc = self.nlp(text[:100000]) # limit per performance
for ent in doc.ents:
if ent.label_ in ["PER", "ORG", "LOC", "DATE", "MONEY"]:
entities.append(ExtractedEntity(
entity_type=f"spacy_{ent.label_.lower()}",
value=ent.text.strip(),
confidence=0.75,
source_text=text[max(0, ent.start_char-20):ent.end_char+20],
position=(ent.start_char, ent.end_char),
))
return entities
def _build_structured_data(
self,
entities: List[ExtractedEntity],
doc_type: DocumentType,
text: str,
) -> Dict:
"""Costruisce un dizionario strutturato dal documento."""
structured: Dict = {"document_type": doc_type.value}
# Raggruppa entità per tipo
by_type: Dict[str, List[str]] = {}
for ent in entities:
by_type.setdefault(ent.entity_type, []).append(ent.value)
# Mappa entità in campi strutturati
if "targa" in by_type:
structured["vehicle_plates"] = list(set(by_type["targa"]))
if "polizza" in by_type:
structured["policy_numbers"] = list(set(by_type["polizza"]))
if "euro_amount" in by_type:
amounts = []
for amt_str in by_type["euro_amount"]:
clean = re.sub(r"[^\d,.]", "", amt_str).replace(",", ".")
try:
amounts.append(float(clean))
except ValueError:
pass
structured["amounts_eur"] = sorted(amounts)
structured["max_amount_eur"] = max(amounts) if amounts else 0
if "spacy_per" in by_type:
structured["persons_mentioned"] = list(set(by_type["spacy_per"]))
if "spacy_loc" in by_type:
structured["locations_mentioned"] = list(set(by_type["spacy_loc"]))
return structured
def _calculate_confidence(
self, entities: List[ExtractedEntity], doc_type: DocumentType
) -> float:
if not entities:
return 0.1
avg = float(sum(e.confidence for e in entities) / len(entities))
# Penalizza se il tipo e UNKNOWN
if doc_type == DocumentType.UNKNOWN:
avg *= 0.7
return round(min(avg, 1.0), 3)
def _extract_document_date(
self, entities: List[ExtractedEntity]
) -> Optional[date]:
date_entities = [e for e in entities if e.entity_type == "data"]
for de in date_entities:
for fmt in ["%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y", "%d/%m/%y"]:
try:
return datetime.strptime(de.value, fmt).date()
except ValueError:
continue
return None
Orchestrarea fluxului de lucru de decontare
Orchestrația este creierul sistemului: coordonează toate componentele (triajul FNOL, deteriorarea evaluare, extragerea documentelor, scorarea fraudelor) și gestionează tranzițiile de statut ale revendicării în timp, respectând SLA-urile contractuale și regulile de escaladare a afacerii.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
import asyncio
class ClaimStatus(str, Enum):
RECEIVED = "received"
TRIAGED = "triaged"
DOCUMENTS_PENDING = "documents_pending"
ASSESSMENT_IN_PROGRESS = "assessment_in_progress"
FRAUD_REVIEW = "fraud_review"
SETTLEMENT_OFFERED = "settlement_offered"
SETTLEMENT_ACCEPTED = "settlement_accepted"
SETTLEMENT_DISPUTED = "settlement_disputed"
MANUAL_REVIEW = "manual_review"
CLOSED = "closed"
REJECTED = "rejected"
@dataclass
class ClaimWorkflowState:
"""Stato corrente del workflow di un sinistro."""
claim_id: str
status: ClaimStatus
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None # handler umano o "auto"
sla_deadline: Optional[datetime] = None
settlement_amount: Optional[float] = None
status_history: List[Dict] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
def transition_to(self, new_status: ClaimStatus, note: str = "") -> None:
"""Transizione di stato con audit trail."""
self.status_history.append({
"from": self.status.value,
"to": new_status.value,
"timestamp": datetime.now().isoformat(),
"note": note,
})
self.status = new_status
self.updated_at = datetime.now()
if note:
self.notes.append(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}")
class ClaimsWorkflowEngine:
"""
Engine di orchestrazione per il workflow sinistri.
Gestisce le transizioni di stato, gli SLA e le escalation.
In produzione: usare Temporal.io o AWS Step Functions
per la durabilita e il recovery dei workflow.
"""
SLA_HOURS: Dict[str, int] = {
"simple": 4, # 4 ore per sinistri semplici
"standard": 24, # 24 ore per sinistri standard
"complex": 72, # 72 ore per sinistri complessi
"litigious": 168, # 7 giorni per sinistri con contenziosi
}
async def process_claim(
self,
claim_state: ClaimWorkflowState,
fnol: "FNOLSubmission",
triage: "FNOLAssessment",
damage_assessment: Optional["VehicleDamageAssessment"] = None,
doc_extractions: Optional[List["DocumentExtraction"]] = None,
) -> ClaimWorkflowState:
"""Processa un sinistro attraverso il workflow completo."""
# Step 1: Triage e routing
claim_state.transition_to(
ClaimStatus.TRIAGED,
f"Tipo: {triage.claim_type.value}, Complessità: {triage.complexity.value}"
)
sla_hours = self.SLA_HOURS.get(triage.complexity.value, 72)
claim_state.sla_deadline = datetime.now() + timedelta(hours=sla_hours)
# Step 2: Check documenti
if not fnol.documents and not fnol.photos:
claim_state.transition_to(
ClaimStatus.DOCUMENTS_PENDING,
f"Documenti mancanti: {', '.join(triage.required_documents)}"
)
await self._notify_customer_documents_needed(claim_state, triage)
return claim_state
# Step 3: Fraud check
if triage.fraud_risk_score > 0.5:
claim_state.transition_to(
ClaimStatus.FRAUD_REVIEW,
f"Fraud score: {triage.fraud_risk_score:.2f} - invio a SIU"
)
await self._escalate_to_siu(claim_state, triage)
return claim_state
# Step 4: Settlement automatico se eleggibile
if triage.auto_settlement_eligible and damage_assessment:
settlement = damage_assessment.total_estimated_cost
claim_state.settlement_amount = settlement
claim_state.transition_to(
ClaimStatus.SETTLEMENT_OFFERED,
f"Offerta automatica: EUR {settlement:.2f}"
)
await self._send_settlement_offer(claim_state, settlement)
return claim_state
# Step 5: Review manuale
claim_state.transition_to(
ClaimStatus.MANUAL_REVIEW,
"Invio a handler umano per valutazione"
)
await self._assign_human_handler(claim_state, triage)
return claim_state
async def _notify_customer_documents_needed(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Notifica il cliente dei documenti mancanti."""
# In produzione: integrazione con Twilio SMS/WhatsApp o SendGrid
print(f"[NOTIFY] Sinistro {state.claim_id}: richiedere documenti al cliente")
print(f" Documenti necessari: {triage.required_documents}")
async def _escalate_to_siu(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Escalation all'unita investigativa speciale (SIU)."""
print(f"[SIU] Sinistro {state.claim_id} flaggato per fraud review")
print(f" Fraud score: {triage.fraud_risk_score:.2f}")
state.assigned_to = "siu_team"
async def _send_settlement_offer(
self, state: ClaimWorkflowState, amount: float
) -> None:
"""Invia offerta di liquidazione automatica al cliente."""
print(f"[SETTLEMENT] Sinistro {state.claim_id}: offerta EUR {amount:.2f}")
print(f" Deadline accettazione: 10 giorni")
async def _assign_human_handler(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Assegna il sinistro a un handler umano in base alla complessità."""
handler_map = {
"litigious": "legal_team",
"complex": "senior_adjuster",
"standard": "adjuster_pool",
}
state.assigned_to = handler_map.get(triage.complexity.value, "adjuster_pool")
print(f"[ASSIGN] Sinistro {state.claim_id} assegnato a: {state.assigned_to}")
Valori de monitorizare pentru automatizarea reclamațiilor
Monitorizarea performanței unui sistem de automatizare a reclamațiilor necesită valori care să meargă dincolo de valorile simple ML. Valorile de afaceri și operaționale sunt la fel de esențiale pentru a se asigura că sistemul îmbunătățește efectiv experiența clienților și rentabilitatea operațiunii.
KPI al sistemului de automatizare a reclamațiilor
| Metric | Definiţie | Ţintă |
|---|---|---|
| Rata de automatizare | % din cererile închise fără intervenție umană | > 50% |
| Rata de revendicări fără atingere | % accidente de mașină fără evaluare fizică | > 70% (mașină simplă) |
| Timp mediu de decontare | Ora FNOL - Decontare accidente auto | < 24 ore (fast track) |
| Precizia extragerii documentelor | % câmpuri extrase corect | > 90% |
| Deviația estimată a daunelor | % abatere între estimarea AI și costul final | < 15% |
| Rata de fraudă fals pozitivă | % revendicări legitime semnalate ca fraudă | < 2% |
| Satisfacția clienților (NPS) | NPS post-pierdere | > +30 (față de +10 moștenire) |
Cele mai bune practici și anti-modele
Cele mai bune practici pentru automatizarea reclamațiilor
- Începeți cu parbrizul: și cel mai simplu caz de utilizare pentru automatizare completă — daune vizibile clar, costuri standardizate, nicio parte terță implicată
- Obligatoriu uman-in-the-loop pentru vătămare corporală: nu automatizați niciodată complet cererile de vătămare corporală; escaladarea la un handler uman este obligatorie
- Cere întotdeauna cel puțin 4 fotografii: față, spate, partea stângă, partea dreaptă; plus contorul de parcurs și plăcuța de înmatriculare; fotografii insuficiente scad drastic precizia
- Integrați un model de scorare a fraudei în amonte: verificarea fraudei trebuie sa aiba loc inainte de estimarea daunei, nu dupa, pentru a evita procesarea cererilor frauduloase
- Pista de audit imuabilă: fiecare decizie de sistem (automată sau manuală) trebuie să fie înregistrată cu marca temporală, versiunea modelului și valorile de intrare pentru orice dispută
Anti-modele de evitat
- Automatizare fără plasă de siguranță: implementează întotdeauna un prag minim de încredere; sub acest prag revendicarea trebuie să treacă automat la evaluarea umană
- Estimări fără validare geografică: prețurile reparațiilor variază foarte mult în funcție de zona geografică; o cotație bazată pe tarifele medii naționale poate fi foarte îndepărtată de realitatea locală
- Ignorați calitatea fotografiei: Fotografiile neclare, slab iluminate sau parțiale degradează grav acuratețea; implementați un control al calității înainte de procesare
- Oferta de decontare prea rapidă: Oferirea unui acord înainte ca clientul să aibă șansa de a evalua integralitatea daunelor poate duce la dispute ulterioare costisitoare
Concluzii și pașii următori
Automatizarea revendicărilor cu Computer Vision și NLP este una dintre aplicațiile AI cu ROI cel mai ridicat din sectorul asigurărilor: reduce costurile de operare cu 40-60%, se îmbunătățește satisfacția clienților și accelerează timpii de soluționare de la săptămâni la ore pentru cazurile standard.
Cheia succesului este o abordare graduală: începeți cu cele mai simple cazuri (parbriz, daune cosmetice mici), măsoară cu precizie acuratețea sistemului în comparație cu evaluatorii umani, și extinde progresiv automatizarea la cazuri mai complexe, menținând întotdeauna a mecanism robust de escaladare umană.
Următorul articol din serie analizează Detectarea fraudei în asigurări: cum să combinați analizele grafice pentru a identifica rețelele frauduloase și semnalele comportamentale pentru detectează modele anormale în revendicări.
Seria InsurTech Engineering
- 01 - Domeniul de asigurare pentru Dezvoltatori: Produse, Actori și Modele de Date
- 02 - Cloud-Native Policy Management: API-First Architecture
- 03 - Telematics Pipeline: UBI Data Processing la scară
- 04 - Subscriere AI: Ingineria caracteristicilor și scorul de risc
- 05 - Automatizarea revendicărilor: viziune computerizată și NLP (acest articol)
- 06 - Detectarea fraudelor: analiză grafică și semnal comportamental
- 07 - Integrare ACORD Standard și Insurance API
- 08 - Inginerie de conformitate: Solvabilitate II și IFRS 17







