Automatizace pohledávek: Počítačové vidění a NLP pro správu pohledávek
Řízení pojistných událostí je tradičně nejdražší a na zákazníky nejnáročnější proces pojišťovnictví. Průměrná doba vyřízení dopravní nehody trvá 8–15 dní, se 4-7 kontaktními body mezi zákazníkem a společností, dokumenty ke shromažďování v papírové podobě, fyzické hodnocení na vozidle a odskoky mezi různými odděleními. Míra nespokojenosti zákazníků ve fázi reklamací a historicky nejvyšší ze všech interakcí se společností.
Umělá inteligence tento proces od základů přetváří. Průmyslové výsledky v roce 2025 jsou mimořádné: Admirál Seguros dosáhl 90 % automatických odhadů úplně bezdotykový, přičemž 98 % hodnocení bylo dokončeno za méně než 15 minut. Některé společnosti vykazovat sazby end-to-end automatizace až 57 %, s průměrnou dobou Vypořádání zkráceno z týdnů na hodiny u standardních automobilových reklamací. Přesnost při extrakci data z dokumentů dosáhnou 96 %ve srovnání s 65 % u lidského operátora.
Tato příručka vytváří kompletní systém automatizace nároků: od digitální správy FNOL (First Notice of Loss), k posouzení poškození počítačovým viděním, k extrakci informace z dokumentů s NLP, až po orchestraci workflow vypořádání.
Co se naučíte
- Architektura komplexního systému automatizace reklamací
- Digitální FNOL: automatický příjem hlášení a třídění
- Počítačové vidění pro odhad poškození vozidla: CNN modely a vizuální transformátory
- NLP pro extrakci dat z dokumentů pojištění: lékařské zprávy, policejní zprávy
- Pokročilé OCR pro digitalizaci starších dokumentů
- Orchestrace pracovního postupu pro proces vypořádání
- Monitorování metrik pro systémy automatizace reklamací
Architektura systému automatizace pohledávek
Moderní systém automatizace pojistných událostí se skládá z odlišných vrstev s odlišnými odpovědnostmi definované. Architektura mikroslužeb umožňuje komponentám škálovat nezávisle a aktualizovat jednotlivé modely bez dopadu na celé potrubí.
Architektonické vrstvy
| Vrstvy | Komponenty | Technologie |
|---|---|---|
| Požití | Příjem FNOL, nahrávání dokumentů, brána API | FastAPI, S3/GCS, Kafka |
| Zpracování | OCR, NLP extrakce, počítačové vidění | Tesseract, spaCy, PyTorch, Hugging Face |
| Inteligence | Odhad škod, skórování podvodů, výpočet rezerv | YOLOv8, Detectron2, XGBoost |
| Orchestr | Workflow engine, SLA management, eskalace | Temporal.io, Airflow, státní automaty |
| Výstupy | Nabídky vyrovnání, komunikace se zákazníky, protokoly auditu | Twilio, SendGrid, EventStore |
FNOL Digital: První upozornění na inteligentní ztrátu
FNOL (First Notice of Loss) a prvotní nahlášení reklamace zákazníkem. Tradičně se tak dělo telefonicky nebo osobně; v moderních systémech se to děje prostřednictvím aplikací mobil, chatbot nebo webový portál. AI zasahuje od prvního okamžiku do klasifikace typ reklamace, vyhodnotit její složitost a nasměrovat ji do správného pracovního postupu.
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import uuid
class ClaimType(str, Enum):
AUTO_COLLISION = "auto_collision"
AUTO_THEFT = "auto_theft"
AUTO_WINDSHIELD = "auto_windshield"
PROPERTY_WATER = "property_water"
PROPERTY_FIRE = "property_fire"
LIABILITY = "liability"
HEALTH = "health"
UNKNOWN = "unknown"
class ClaimComplexity(str, Enum):
SIMPLE = "simple" # liquidazione automatica
STANDARD = "standard" # processo guidato
COMPLEX = "complex" # intervento umano
LITIGIOUS = "litigious" # legal team
@dataclass
class FNOLSubmission:
"""Rappresenta una segnalazione FNOL ricevuta dal cliente."""
policy_number: str
incident_date: datetime
incident_description: str
location: str
photos: List[str] = field(default_factory=list) # URL foto caricate
documents: List[str] = field(default_factory=list)
contact_phone: str = ""
contact_email: str = ""
third_parties_involved: bool = False
injuries_reported: bool = False
police_report_available: bool = False
claim_id: str = field(default_factory=lambda: str(uuid.uuid4()))
received_at: datetime = field(default_factory=datetime.now)
@dataclass
class FNOLAssessment:
"""Risultato dell'analisi AI della FNOL."""
claim_id: str
claim_type: ClaimType
complexity: ClaimComplexity
estimated_severity: str # low/medium/high
auto_settlement_eligible: bool
required_documents: List[str]
assigned_workflow: str
fraud_risk_score: float # 0-1
priority: int # 1=urgente, 5=normale
routing_notes: str = ""
class FNOLTriageService:
"""
Servizio di triaging automatico per segnalazioni FNOL.
Combina regole di business e modelli ML per classificare
ogni sinistro e assegnarlo al workflow appropriato.
"""
# Keyword per classificazione tipo sinistro (semplificata)
CLAIM_TYPE_KEYWORDS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"incidente", "scontro", "tamponamento", "urto",
"collision", "crash", "accident"
],
ClaimType.AUTO_THEFT: [
"furto", "rubato", "scomparso", "theft", "stolen"
],
ClaimType.AUTO_WINDSHIELD: [
"parabrezza", "vetro", "windshield", "cristallo"
],
ClaimType.PROPERTY_WATER: [
"allagamento", "perdita", "infiltrazione", "acqua",
"flood", "water", "leak"
],
ClaimType.PROPERTY_FIRE: [
"incendio", "fuoco", "fiamme", "fire", "burn"
],
}
REQUIRED_DOCS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"CID/CAI o rapporto polizia",
"foto veicolo (4+ lati)",
"documento identità",
"patente di guida",
"carta di circolazione",
],
ClaimType.AUTO_THEFT: [
"denuncia polizia (entro 48h)",
"documento identità",
"carte del veicolo",
"chiavi originali",
],
ClaimType.PROPERTY_WATER: [
"foto danni",
"intervento idraulico (se disponibile)",
"preventivo riparazione",
],
}
def triage(
self, fnol: FNOLSubmission, fraud_score: float = 0.0
) -> FNOLAssessment:
"""Esegue il triaging automatico della segnalazione FNOL."""
claim_type = self._classify_type(fnol.incident_description)
complexity = self._assess_complexity(fnol, fraud_score)
severity = self._estimate_severity(fnol, claim_type)
auto_eligible = self._is_auto_settlement_eligible(fnol, complexity, fraud_score)
workflow = self._assign_workflow(claim_type, complexity)
priority = self._calculate_priority(fnol, complexity, fraud_score)
return FNOLAssessment(
claim_id=fnol.claim_id,
claim_type=claim_type,
complexity=complexity,
estimated_severity=severity,
auto_settlement_eligible=auto_eligible,
required_documents=self.REQUIRED_DOCS.get(claim_type, [
"documento identità",
"foto danni",
"preventivo riparazione",
]),
assigned_workflow=workflow,
fraud_risk_score=fraud_score,
priority=priority,
routing_notes=self._build_routing_notes(
fnol, complexity, fraud_score
),
)
def _classify_type(self, description: str) -> ClaimType:
"""Classifica il tipo di sinistro dalla descrizione testuale."""
desc_lower = description.lower()
scores: Dict[ClaimType, int] = {}
for claim_type, keywords in self.CLAIM_TYPE_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in desc_lower)
if score > 0:
scores[claim_type] = score
if not scores:
return ClaimType.UNKNOWN
return max(scores, key=lambda k: scores[k])
def _assess_complexity(
self, fnol: FNOLSubmission, fraud_score: float
) -> ClaimComplexity:
"""Determina la complessità del sinistro."""
if fnol.injuries_reported:
return ClaimComplexity.LITIGIOUS
if fraud_score > 0.7:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved and not fnol.police_report_available:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved or fraud_score > 0.4:
return ClaimComplexity.STANDARD
return ClaimComplexity.SIMPLE
def _estimate_severity(
self, fnol: FNOLSubmission, claim_type: ClaimType
) -> str:
"""Stima la severita economica (low/medium/high)."""
if fnol.injuries_reported:
return "high"
if claim_type == ClaimType.AUTO_THEFT:
return "high"
if fnol.third_parties_involved:
return "medium"
return "low"
def _is_auto_settlement_eligible(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> bool:
"""Verifica l'eleggibilita per liquidazione automatica."""
if complexity not in [ClaimComplexity.SIMPLE, ClaimComplexity.STANDARD]:
return False
if fraud_score > 0.3:
return False
if fnol.injuries_reported or fnol.third_parties_involved:
return False
if len(fnol.photos) < 2:
return False
return True
def _assign_workflow(
self, claim_type: ClaimType, complexity: ClaimComplexity
) -> str:
workflow_map = {
(ClaimType.AUTO_COLLISION, ClaimComplexity.SIMPLE): "auto_collision_fast_track",
(ClaimType.AUTO_COLLISION, ClaimComplexity.STANDARD): "auto_collision_standard",
(ClaimType.AUTO_COLLISION, ClaimComplexity.COMPLEX): "auto_collision_manual",
(ClaimType.AUTO_THEFT, ClaimComplexity.SIMPLE): "auto_theft_standard",
(ClaimType.AUTO_WINDSHIELD, ClaimComplexity.SIMPLE): "windshield_auto",
}
return workflow_map.get(
(claim_type, complexity),
f"generic_{complexity.value}_workflow"
)
def _calculate_priority(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> int:
if fnol.injuries_reported:
return 1 # massima priorità
if complexity == ClaimComplexity.LITIGIOUS:
return 1
if fraud_score > 0.7:
return 2 # priorità alta per indagine fraud
if complexity == ClaimComplexity.COMPLEX:
return 3
return 5 # normale
def _build_routing_notes(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> str:
notes = []
if fnol.injuries_reported:
notes.append("ATTENZIONE: lesioni personali dichiarate - escalation legal/medical obbligatoria")
if fraud_score > 0.5:
notes.append(f"Fraud score elevato ({fraud_score:.2f}) - revisione SIU raccomandata")
if not fnol.photos:
notes.append("Nessuna foto allegata - richiedere al cliente prima di procedere")
return "; ".join(notes) if notes else "Nessuna nota particolare"
Počítačové vidění pro odhad poškození auta
Automatický odhad škod je nejpůsobivější komponentou AI v automatizaci škod. Zákazník vyfotografuje poškozené vozidlo chytrým telefonem a systém snímky analyzuje k identifikaci poškozených dílů, odhadnutí typu nutného zásahu (oprava vs výměna) a vypočítat odhad nákladů na základě aktualizovaných databází cen.
Nejpoužívanější modely v oboru kombinovat detekce objektu (identifikovat poškozené části) s klasifikace závažnosti poškození (classify the entity malých až totálních škod). Přístupy jako Tractable, lídr na trhu, to prokázaly že tyto systémy mohou odpovídat nebo překračovat přesnost odborného lidského odhadce.
import torch
import torchvision.transforms as T
from torchvision.models import resnet50, ResNet50_Weights
import numpy as np
from PIL import Image
from typing import Dict, List, Tuple
from dataclasses import dataclass
import io
@dataclass
class DamageRegion:
"""Regione di danno identificata nell'immagine."""
part_name: str # es. "bumper_front", "door_left"
damage_type: str # scratch, dent, crack, broken
severity: str # minor, moderate, severe, total_loss
confidence: float # 0-1
repair_vs_replace: str # "repair" o "replace"
estimated_cost_eur: float
@dataclass
class VehicleDamageAssessment:
"""Risultato completo dell'analisi danni veicolo."""
claim_id: str
images_analyzed: int
damage_regions: List[DamageRegion]
total_estimated_cost: float
total_loss_likelihood: float # 0-1
settlement_recommendation: str
confidence_overall: float
requires_physical_inspection: bool
assessment_notes: str
class VehicleDamageClassifier:
"""
Classificatore di danni veicolo basato su transfer learning.
Architettura: ResNet-50 fine-tuned su dataset proprietario di danni auto
Output: classificazione per parte + severita + tipo danno
In produzione: usare modelli specializzati come quelli di Tractable,
Mitchell, o modelli addestrati su dataset interni.
"""
# Parti veicolo monitorabili
VEHICLE_PARTS = [
"bumper_front", "bumper_rear",
"hood", "trunk",
"door_front_left", "door_front_right",
"door_rear_left", "door_rear_right",
"fender_front_left", "fender_front_right",
"windshield_front", "windshield_rear",
"headlight_left", "headlight_right",
"mirror_left", "mirror_right",
"roof", "pillar",
]
# Tabella costi di riferimento (EUR, valori indicativi 2025)
REPAIR_COST_TABLE: Dict[str, Dict[str, float]] = {
"bumper_front": {"minor": 200, "moderate": 600, "severe": 1200, "replace": 900},
"bumper_rear": {"minor": 200, "moderate": 550, "severe": 1100, "replace": 850},
"hood": {"minor": 300, "moderate": 800, "severe": 1500, "replace": 1200},
"door_front_left": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"door_front_right": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"windshield_front": {"minor": 0, "moderate": 350, "severe": 600, "replace": 450},
"headlight_left": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
"headlight_right": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
}
DEFAULT_COST = {"minor": 150, "moderate": 450, "severe": 900, "replace": 700}
def __init__(self, model_path: str = "damage_classifier.pt") -> None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model(model_path)
self.transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _load_model(self, model_path: str) -> torch.nn.Module:
"""Carica il modello fine-tuned da file o usa il pretrained come fallback."""
try:
model = resnet50(weights=None)
# Adatta la testa di classificazione al numero di parti*severita
num_classes = len(self.VEHICLE_PARTS) * 4 # 4 livelli severita
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
state = torch.load(model_path, map_location=self.device)
model.load_state_dict(state)
print(f"Modello caricato da {model_path}")
except FileNotFoundError:
print("Modello fine-tuned non trovato, uso pretrained ResNet50 (demo only)")
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.eval()
return model.to(self.device)
def analyze_image(self, image_bytes: bytes) -> List[Tuple[str, str, float]]:
"""
Analizza una singola immagine.
Returns: lista di (part_name, severity, confidence)
"""
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
tensor = self.transform(image).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
# Soglia: riporta solo danni con confidence > 30%
results = []
threshold = 0.30
for i, part in enumerate(self.VEHICLE_PARTS):
severities = ["minor", "moderate", "severe", "total_loss"]
for j, sev in enumerate(severities):
idx = i * 4 + j
if idx < len(probs) and probs[idx] > threshold:
results.append((part, sev, float(probs[idx])))
return sorted(results, key=lambda x: x[2], reverse=True)
def estimate_repair_cost(self, part: str, severity: str) -> Tuple[float, str]:
"""
Stima il costo di riparazione e determina repair vs replace.
Returns: (cost_eur, repair_or_replace)
"""
cost_table = self.REPAIR_COST_TABLE.get(part, self.DEFAULT_COST)
if severity in ["severe", "total_loss"]:
replace_cost = cost_table.get("replace", 700)
repair_cost = cost_table.get("severe", 900)
if replace_cost < repair_cost * 0.8:
return replace_cost, "replace"
return repair_cost, "repair"
repair_cost = cost_table.get(severity, 150)
return repair_cost, "repair"
def assess_multiple_images(
self, claim_id: str, image_bytes_list: List[bytes]
) -> VehicleDamageAssessment:
"""
Valutazione completa su più immagini del veicolo.
Aggrega i risultati di tutte le foto per una stima robusta.
"""
# Raccoglie rilevazioni da tutte le immagini
all_detections: Dict[str, List[Tuple[str, float]]] = {}
for img_bytes in image_bytes_list:
detections = self.analyze_image(img_bytes)
for part, severity, confidence in detections:
if part not in all_detections:
all_detections[part] = []
all_detections[part].append((severity, confidence))
# Consolida: per ogni parte prende la severita più alta con confidence media
damage_regions: List[DamageRegion] = []
total_cost = 0.0
severity_order = {"minor": 0, "moderate": 1, "severe": 2, "total_loss": 3}
for part, severity_confidences in all_detections.items():
# Prendi la severita più alta rilevata
max_sev = max(severity_confidences, key=lambda x: severity_order.get(x[0], 0))
severity, _ = max_sev
avg_confidence = np.mean([c for _, c in severity_confidences])
cost, repair_replace = self.estimate_repair_cost(part, severity)
total_cost += cost
damage_regions.append(DamageRegion(
part_name=part,
damage_type="structural" if severity in ["severe", "total_loss"] else "cosmetic",
severity=severity,
confidence=round(float(avg_confidence), 3),
repair_vs_replace=repair_replace,
estimated_cost_eur=cost,
))
total_loss_likelihood = self._estimate_total_loss(damage_regions, total_cost)
requires_inspection = (
total_loss_likelihood > 0.5 or
total_cost > 8000 or
any(d.severity == "severe" and "windshield" in d.part_name
for d in damage_regions)
)
return VehicleDamageAssessment(
claim_id=claim_id,
images_analyzed=len(image_bytes_list),
damage_regions=damage_regions,
total_estimated_cost=round(total_cost, 2),
total_loss_likelihood=round(total_loss_likelihood, 3),
settlement_recommendation=self._settlement_recommendation(
total_cost, total_loss_likelihood
),
confidence_overall=round(
float(np.mean([d.confidence for d in damage_regions])) if damage_regions else 0.0,
3
),
requires_physical_inspection=requires_inspection,
assessment_notes=self._build_notes(damage_regions, total_loss_likelihood),
)
def _estimate_total_loss(
self, regions: List[DamageRegion], total_cost: float
) -> float:
"""Stima la probabilità di perdita totale."""
if total_cost > 15000:
return 0.95
if total_cost > 10000:
return 0.70
severe_count = sum(1 for r in regions if r.severity in ["severe", "total_loss"])
if severe_count >= 4:
return 0.80
if severe_count >= 2:
return 0.40
return max(0.0, (total_cost - 5000) / 10000) if total_cost > 5000 else 0.05
def _settlement_recommendation(self, cost: float, total_loss_prob: float) -> str:
if total_loss_prob > 0.7:
return "TOTAL_LOSS_SETTLEMENT"
if cost > 8000:
return "HIGH_VALUE_REPAIR_AUTHORIZATION"
if cost > 3000:
return "STANDARD_REPAIR_AUTHORIZATION"
return "FAST_TRACK_SETTLEMENT"
def _build_notes(
self, regions: List[DamageRegion], total_loss_prob: float
) -> str:
notes = []
if total_loss_prob > 0.5:
notes.append("Elevata probabilità di perdita totale - verificare valore veicolo")
severe = [r.part_name for r in regions if r.severity == "severe"]
if severe:
notes.append(f"Danni severi rilevati su: {', '.join(severe)}")
return "; ".join(notes) if notes else "Assessment completato senza anomalie"
NLP pro extrahování informací z dokumentů
Každá nehoda generuje desítky dokumentů: policejní zprávy, lékařské zprávy, odhady workshop, výpovědi svědků, zprávy CID/CAI. Ruční zpracování těchto dokumentů a pomalé (1-3 dny) a náchylné k chybám. Moderní NLP systémy v kombinaci s pokročilým OCR, automaticky extrahují strukturované informace s přesností 90–96 %.
import spacy
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, date
from enum import Enum
class DocumentType(str, Enum):
POLICE_REPORT = "police_report"
MEDICAL_REPORT = "medical_report"
REPAIR_ESTIMATE = "repair_estimate"
CID_CAI = "cid_cai" # Constatazione Amichevole di Incidente
WITNESS_STATEMENT = "witness_statement"
INVOICE = "invoice"
UNKNOWN = "unknown"
@dataclass
class ExtractedEntity:
"""Entità estratta da un documento."""
entity_type: str
value: str
confidence: float
source_text: str # testo originale da cui e stata estratta
position: Tuple[int, int] # start, end nel documento
@dataclass
class DocumentExtraction:
"""Risultato dell'estrazione NLP da un documento assicurativo."""
document_type: DocumentType
document_date: Optional[date]
entities: List[ExtractedEntity]
structured_data: Dict
extraction_confidence: float
raw_text: str
class InsuranceDocumentExtractor:
"""
Estrattore NLP per documenti assicurativi.
Combina:
- spaCy per NER (Named Entity Recognition)
- Regex per pattern specifici del dominio assicurativo
- Regole di business per estrazione strutturata
"""
# Pattern regex per entità assicurative italiane
PATTERNS: Dict[str, str] = {
"targa": r"\b[A-Z]{2}\d{3}[A-Z]{2}\b|\b[A-Z]{2}\d{5}\b",
"polizza": r"(?:polizza|n\.\s*pol\.?)\s*[:\.]?\s*([A-Z0-9/-]{6,20})",
"codice_fiscale": r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b",
"partita_iva": r"\b(?:IT)?\d{11}\b",
"euro_amount": r"(?:EUR?|€)\s*[\d.,]{1,10}|[\d.,]{1,10}\s*(?:EUR?|€)",
"data": r"\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b",
"phone": r"(?:\+39|0039)?\s*(?:\d{2,4}[\s\-]?){2,4}\d{4}",
"email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,6}",
}
def __init__(self, spacy_model: str = "it_core_news_lg") -> None:
try:
self.nlp = spacy.load(spacy_model)
except OSError:
print(f"Modello spaCy '{spacy_model}' non trovato - installa con:")
print(f"python -m spacy download {spacy_model}")
self.nlp = None
def classify_document(self, text: str) -> DocumentType:
"""Classifica il tipo di documento basandosi sul contenuto."""
text_lower = text.lower()
classification_rules = [
(DocumentType.POLICE_REPORT, ["verbale", "polizia stradale", "carabinieri", "codice della strada", "accertamento"]),
(DocumentType.MEDICAL_REPORT, ["diagnosi", "prognosi", "lesioni", "ospedale", "pronto soccorso", "medico"]),
(DocumentType.CID_CAI, ["constatazione amichevole", "cid", "cai", "modulo blu"]),
(DocumentType.REPAIR_ESTIMATE, ["preventivo", "officina", "carrozzeria", "ricambi", "manodopera"]),
(DocumentType.INVOICE, ["fattura", "ricevuta", "importo totale", "iva", "imponibile"]),
]
scores: Dict[DocumentType, int] = {}
for doc_type, keywords in classification_rules:
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[doc_type] = score
return max(scores, key=lambda k: scores[k]) if scores else DocumentType.UNKNOWN
def extract(self, raw_text: str) -> DocumentExtraction:
"""Estrae tutte le informazioni rilevanti dal documento."""
doc_type = self.classify_document(raw_text)
entities = self._extract_entities(raw_text)
structured = self._build_structured_data(entities, doc_type, raw_text)
confidence = self._calculate_confidence(entities, doc_type)
doc_date = self._extract_document_date(entities)
return DocumentExtraction(
document_type=doc_type,
document_date=doc_date,
entities=entities,
structured_data=structured,
extraction_confidence=confidence,
raw_text=raw_text,
)
def _extract_entities(self, text: str) -> List[ExtractedEntity]:
"""Estrae entità con regex e NER spaCy."""
entities: List[ExtractedEntity] = []
# Estrazione con regex
for entity_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text, re.IGNORECASE):
entities.append(ExtractedEntity(
entity_type=entity_type,
value=match.group().strip(),
confidence=0.85, # alta confidence per regex su formato noto
source_text=text[max(0, match.start()-20):match.end()+20],
position=(match.start(), match.end()),
))
# Estrazione NER con spaCy
if self.nlp:
doc = self.nlp(text[:100000]) # limit per performance
for ent in doc.ents:
if ent.label_ in ["PER", "ORG", "LOC", "DATE", "MONEY"]:
entities.append(ExtractedEntity(
entity_type=f"spacy_{ent.label_.lower()}",
value=ent.text.strip(),
confidence=0.75,
source_text=text[max(0, ent.start_char-20):ent.end_char+20],
position=(ent.start_char, ent.end_char),
))
return entities
def _build_structured_data(
self,
entities: List[ExtractedEntity],
doc_type: DocumentType,
text: str,
) -> Dict:
"""Costruisce un dizionario strutturato dal documento."""
structured: Dict = {"document_type": doc_type.value}
# Raggruppa entità per tipo
by_type: Dict[str, List[str]] = {}
for ent in entities:
by_type.setdefault(ent.entity_type, []).append(ent.value)
# Mappa entità in campi strutturati
if "targa" in by_type:
structured["vehicle_plates"] = list(set(by_type["targa"]))
if "polizza" in by_type:
structured["policy_numbers"] = list(set(by_type["polizza"]))
if "euro_amount" in by_type:
amounts = []
for amt_str in by_type["euro_amount"]:
clean = re.sub(r"[^\d,.]", "", amt_str).replace(",", ".")
try:
amounts.append(float(clean))
except ValueError:
pass
structured["amounts_eur"] = sorted(amounts)
structured["max_amount_eur"] = max(amounts) if amounts else 0
if "spacy_per" in by_type:
structured["persons_mentioned"] = list(set(by_type["spacy_per"]))
if "spacy_loc" in by_type:
structured["locations_mentioned"] = list(set(by_type["spacy_loc"]))
return structured
def _calculate_confidence(
self, entities: List[ExtractedEntity], doc_type: DocumentType
) -> float:
if not entities:
return 0.1
avg = float(sum(e.confidence for e in entities) / len(entities))
# Penalizza se il tipo e UNKNOWN
if doc_type == DocumentType.UNKNOWN:
avg *= 0.7
return round(min(avg, 1.0), 3)
def _extract_document_date(
self, entities: List[ExtractedEntity]
) -> Optional[date]:
date_entities = [e for e in entities if e.entity_type == "data"]
for de in date_entities:
for fmt in ["%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y", "%d/%m/%y"]:
try:
return datetime.strptime(de.value, fmt).date()
except ValueError:
continue
return None
Organizace workflow vypořádání
Orchestrace je mozkem systému: koordinuje všechny komponenty (třídění FNOL, poškození hodnocení, vytěžování dokumentů, bodování podvodů) a spravuje přechody stavu nároku v průběhu času, respektování smluvních SLA a pravidel obchodní eskalace.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
import asyncio
class ClaimStatus(str, Enum):
RECEIVED = "received"
TRIAGED = "triaged"
DOCUMENTS_PENDING = "documents_pending"
ASSESSMENT_IN_PROGRESS = "assessment_in_progress"
FRAUD_REVIEW = "fraud_review"
SETTLEMENT_OFFERED = "settlement_offered"
SETTLEMENT_ACCEPTED = "settlement_accepted"
SETTLEMENT_DISPUTED = "settlement_disputed"
MANUAL_REVIEW = "manual_review"
CLOSED = "closed"
REJECTED = "rejected"
@dataclass
class ClaimWorkflowState:
"""Stato corrente del workflow di un sinistro."""
claim_id: str
status: ClaimStatus
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None # handler umano o "auto"
sla_deadline: Optional[datetime] = None
settlement_amount: Optional[float] = None
status_history: List[Dict] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
def transition_to(self, new_status: ClaimStatus, note: str = "") -> None:
"""Transizione di stato con audit trail."""
self.status_history.append({
"from": self.status.value,
"to": new_status.value,
"timestamp": datetime.now().isoformat(),
"note": note,
})
self.status = new_status
self.updated_at = datetime.now()
if note:
self.notes.append(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}")
class ClaimsWorkflowEngine:
"""
Engine di orchestrazione per il workflow sinistri.
Gestisce le transizioni di stato, gli SLA e le escalation.
In produzione: usare Temporal.io o AWS Step Functions
per la durabilita e il recovery dei workflow.
"""
SLA_HOURS: Dict[str, int] = {
"simple": 4, # 4 ore per sinistri semplici
"standard": 24, # 24 ore per sinistri standard
"complex": 72, # 72 ore per sinistri complessi
"litigious": 168, # 7 giorni per sinistri con contenziosi
}
async def process_claim(
self,
claim_state: ClaimWorkflowState,
fnol: "FNOLSubmission",
triage: "FNOLAssessment",
damage_assessment: Optional["VehicleDamageAssessment"] = None,
doc_extractions: Optional[List["DocumentExtraction"]] = None,
) -> ClaimWorkflowState:
"""Processa un sinistro attraverso il workflow completo."""
# Step 1: Triage e routing
claim_state.transition_to(
ClaimStatus.TRIAGED,
f"Tipo: {triage.claim_type.value}, Complessità: {triage.complexity.value}"
)
sla_hours = self.SLA_HOURS.get(triage.complexity.value, 72)
claim_state.sla_deadline = datetime.now() + timedelta(hours=sla_hours)
# Step 2: Check documenti
if not fnol.documents and not fnol.photos:
claim_state.transition_to(
ClaimStatus.DOCUMENTS_PENDING,
f"Documenti mancanti: {', '.join(triage.required_documents)}"
)
await self._notify_customer_documents_needed(claim_state, triage)
return claim_state
# Step 3: Fraud check
if triage.fraud_risk_score > 0.5:
claim_state.transition_to(
ClaimStatus.FRAUD_REVIEW,
f"Fraud score: {triage.fraud_risk_score:.2f} - invio a SIU"
)
await self._escalate_to_siu(claim_state, triage)
return claim_state
# Step 4: Settlement automatico se eleggibile
if triage.auto_settlement_eligible and damage_assessment:
settlement = damage_assessment.total_estimated_cost
claim_state.settlement_amount = settlement
claim_state.transition_to(
ClaimStatus.SETTLEMENT_OFFERED,
f"Offerta automatica: EUR {settlement:.2f}"
)
await self._send_settlement_offer(claim_state, settlement)
return claim_state
# Step 5: Review manuale
claim_state.transition_to(
ClaimStatus.MANUAL_REVIEW,
"Invio a handler umano per valutazione"
)
await self._assign_human_handler(claim_state, triage)
return claim_state
async def _notify_customer_documents_needed(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Notifica il cliente dei documenti mancanti."""
# In produzione: integrazione con Twilio SMS/WhatsApp o SendGrid
print(f"[NOTIFY] Sinistro {state.claim_id}: richiedere documenti al cliente")
print(f" Documenti necessari: {triage.required_documents}")
async def _escalate_to_siu(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Escalation all'unita investigativa speciale (SIU)."""
print(f"[SIU] Sinistro {state.claim_id} flaggato per fraud review")
print(f" Fraud score: {triage.fraud_risk_score:.2f}")
state.assigned_to = "siu_team"
async def _send_settlement_offer(
self, state: ClaimWorkflowState, amount: float
) -> None:
"""Invia offerta di liquidazione automatica al cliente."""
print(f"[SETTLEMENT] Sinistro {state.claim_id}: offerta EUR {amount:.2f}")
print(f" Deadline accettazione: 10 giorni")
async def _assign_human_handler(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Assegna il sinistro a un handler umano in base alla complessità."""
handler_map = {
"litigious": "legal_team",
"complex": "senior_adjuster",
"standard": "adjuster_pool",
}
state.assigned_to = handler_map.get(triage.complexity.value, "adjuster_pool")
print(f"[ASSIGN] Sinistro {state.claim_id} assegnato a: {state.assigned_to}")
Monitorování metrik pro automatizaci reklamací
Monitorování výkonu systému automatizace nároků vyžaduje metriky, které fungují nad rámec jednoduchých metrik ML. Obchodní a provozní metriky jsou stejně důležité zajistit, aby systém skutečně zlepšoval zákaznickou zkušenost a ziskovost provozu.
KPI systému automatizace reklamací
| Metrický | Definice | Cíl |
|---|---|---|
| Rychlost automatizace | % pojistných událostí uzavřených bez lidského zásahu | > 50 % |
| Míra bezdotykových reklamací | % dopravních nehod bez fyzického posouzení | > 70 % (jednoduché auto) |
| Prům. doba vypořádání | FNOL Time - Auto Accident Settlement | < 24h (zrychlená) |
| Přesnost extrakce dokumentu | % polí extrahována správně | > 90 % |
| Odchylka odhadu poškození | % odchylka mezi odhadem AI a konečnými náklady | < 15 % |
| Míra falešně pozitivních podvodů | % oprávněných nároků označených jako podvod | < 2 % |
| spokojenost zákazníků (NPS) | NPS po ztrátě | > +30 (oproti starším +10) |
Osvědčené postupy a anti-vzory
Nejlepší postupy pro automatizaci reklamací
- Začněte čelním sklem: a nejjednodušší případ použití pro plnou automatizaci – jasně viditelné poškození, standardizované náklady, bez účasti třetí strany
- Povinné zapojení člověka do smyčky při zranění osob: nikdy zcela automatizovat nároky na újmu na zdraví; eskalace na lidskou obsluhu je povinná
- Vždy požadujte alespoň 4 fotografie: přední, zadní, levá strana, pravá strana; plus počítadlo kilometrů a SPZ; nedostatečné fotografie drasticky snižují přesnost
- Integrujte předřazený model hodnocení podvodů: kontrola podvodu musí proběhnout před odhadem škody, nikoli po něm, aby se zabránilo zpracování podvodných nároků
- Neměnný auditní záznam: každé systémové rozhodnutí (automatické nebo ruční) musí být zaprotokolováno s časovým razítkem, verzí modelu a vstupními hodnotami pro případné spory
Anti-vzory, kterým je třeba se vyhnout
- Automatizace bez ochranné sítě: vždy zavádí minimální práh spolehlivosti; pod touto hranicí musí nárok automaticky projít kontrolou člověkem
- Odhady bez geografického ověření: ceny oprav se velmi liší podle zeměpisné oblasti; nabídka založená na průměrných národních sazbách může být velmi vzdálená místní realitě
- Ignorovat kvalitu fotografie: rozmazané, špatně osvětlené nebo částečné fotografie vážně snižují přesnost; provést kontrolu kvality před zpracováním
- Nabídka vyrovnání je příliš rychlá: Nabídka urovnání dříve, než bude mít zákazník možnost posoudit úplnost škod, může vést k nákladným následným sporům
Závěry a další kroky
Automatizace reklamací pomocí Computer Vision a NLP je jednou z aplikací AI s ROI nejvyšší v pojišťovnictví: snižuje provozní náklady o 40-60 %, zlepšuje spokojenost zákazníků a zrychluje dobu vyřízení z týdnů na hodiny u standardních případů.
Klíčem k úspěchu je postupný přístup: začněte s nejjednoduššími případy (čelní sklo, malé kosmetické poškození), přesně měřit přesnost systému ve srovnání s lidskými odhadci, a postupně rozšiřovat automatizaci na složitější případy při zachování a robustní mechanismus lidské eskalace.
Další článek v sérii se ponoří do Odhalování pojistných podvodů: jak zkombinovat grafovou analýzu k identifikaci podvodných sítí a behaviorálních signálů detekovat anomální vzory v nárocích.
InsurTech Engineering Series
- 01 – Pojistná doména pro vývojáře: Produkty, herci a datové modely
- 02 - Cloud-Native Policy Management: API-First Architecture
- 03 - Telematics Pipeline: Zpracování dat UBI v měřítku
- 04 - AI Underwriting: Feature Engineering a hodnocení rizik
- 05 – Automatizace pohledávek: Počítačové vidění a NLP (tento článek)
- 06 - Detekce podvodů: Analýza grafů a behaviorální signál
- 07 - Integrace standardu ACORD a Insurance API
- 08 – Compliance Engineering: Solvency II a IFRS 17







