Talep Otomasyonu: Talep Yönetimi için Bilgisayarlı Görme ve NLP
Talep yönetimi geleneksel olarak en pahalı ve müşteri yoğun süreçtir. sigorta sektörünün. Ortalama bir araba kazasının çözülmesi 8-15 gün sürer, Müşteri ile firma arasında 4-7 temas noktası bulunan, dokümanların kağıt formatında toplanması, araç üzerinde fiziksel değerlendirmeler ve farklı departmanlar arasındaki geçişler. Memnuniyetsizlik oranı Talep aşamasındaki müşteriler ve tarihsel olarak şirketle olan tüm etkileşimlerin en yüksek seviyesi.
Yapay zeka bu süreci sıfırdan dönüştürüyor. Sektör sonuçları 2025 yılı olağanüstü: Amiral Seguros Otomatik tahminlerin %90'ı tamamen temassızDeğerlendirmelerin %98'i 15 dakikadan kısa sürede tamamlandı. Bazı şirketler kadar uçtan uca otomasyon oranlarını raporlayın %57, ortalama süre ile Standart otomobil talepleri için uzlaşma haftalardan saatlere düşürüldü. Ekstraksiyonda hassasiyet belgelerdeki veriler ulaşır %96İnsan operatör için bu oran %65'tir.
Bu kılavuz eksiksiz bir talep otomasyon sistemi oluşturur: dijital yönetimden FNOL (İlk Kayıp İhbarı), bilgisayar görüşüyle hasar tespitine, çıkarmaya NLP ile belgelerdeki bilgilerden, uzlaşma iş akışının düzenlenmesine kadar.
Ne Öğreneceksiniz
- Uçtan uca talep otomasyon sisteminin mimarisi
- Dijital FNOL: otomatik raporlama alımı ve önceliklendirme
- Araç hasarı tahmini için bilgisayarlı görme: CNN modelleri ve görsel transformatörler
- Sigorta belgelerinden veri çıkarmak için NLP: tıbbi raporlar, polis raporları
- Eski belgeleri dijitalleştirmek için gelişmiş OCR
- Ödeme süreci için iş akışının düzenlenmesi
- Talep otomasyon sistemleri için izleme ölçümleri
Hasar Otomasyon Sistemi Mimarisi
Modern bir talep otomasyon sistemi, farklı sorumluluklara sahip farklı katmanlardan oluşur tanımlandı. Mikro hizmet mimarisi, bileşenlerin bağımsız olarak ölçeklenmesine olanak tanır ve tüm üretim hattını etkilemeden bireysel modelleri güncellemek.
Mimari Katmanlar
| Katmanlar | Bileşenler | Teknolojiler |
|---|---|---|
| Yutma | FNOL alımı, belge yükleme, API ağ geçidi | FastAPI, S3/GCS, Kafka |
| İşleme | OCR, NLP çıkarma, Bilgisayarla Görme | Tesseract, spaCy, PyTorch, Sarılma Yüzü |
| İstihbarat | Hasar tahmini, dolandırıcılık puanlaması, rezerv hesaplaması | YOLOv8, Detectron2, XGBoost |
| Orkestrasyon | İş akışı motoru, SLA yönetimi, üst kademeye iletme | Temporal.io, Hava akışı, durum makineleri |
| Çıkışlar | Uzlaşma teklifleri, müşteri iletişimleri, denetim kayıtları | Twilio, SendGrid, EventStore |
FNOL Digital: Akıllı Kayıplara İlişkin İlk Bildirim
FNOL (İlk Kayıp İhbarı) ve talebin müşteri tarafından ilk raporlanması. Geleneksel olarak bu, telefonla veya şahsen yapılıyordu; modern sistemlerde bu, uygulamalar aracılığıyla gerçekleşir mobil, chatbot veya web portalı. Yapay zeka, sınıflandırmak için ilk andan itibaren müdahale eder. Talep türünü belirleyin, karmaşıklığını değerlendirin ve doğru iş akışına yönlendirin.
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional, Dict
import uuid
class ClaimType(str, Enum):
AUTO_COLLISION = "auto_collision"
AUTO_THEFT = "auto_theft"
AUTO_WINDSHIELD = "auto_windshield"
PROPERTY_WATER = "property_water"
PROPERTY_FIRE = "property_fire"
LIABILITY = "liability"
HEALTH = "health"
UNKNOWN = "unknown"
class ClaimComplexity(str, Enum):
SIMPLE = "simple" # liquidazione automatica
STANDARD = "standard" # processo guidato
COMPLEX = "complex" # intervento umano
LITIGIOUS = "litigious" # legal team
@dataclass
class FNOLSubmission:
"""Rappresenta una segnalazione FNOL ricevuta dal cliente."""
policy_number: str
incident_date: datetime
incident_description: str
location: str
photos: List[str] = field(default_factory=list) # URL foto caricate
documents: List[str] = field(default_factory=list)
contact_phone: str = ""
contact_email: str = ""
third_parties_involved: bool = False
injuries_reported: bool = False
police_report_available: bool = False
claim_id: str = field(default_factory=lambda: str(uuid.uuid4()))
received_at: datetime = field(default_factory=datetime.now)
@dataclass
class FNOLAssessment:
"""Risultato dell'analisi AI della FNOL."""
claim_id: str
claim_type: ClaimType
complexity: ClaimComplexity
estimated_severity: str # low/medium/high
auto_settlement_eligible: bool
required_documents: List[str]
assigned_workflow: str
fraud_risk_score: float # 0-1
priority: int # 1=urgente, 5=normale
routing_notes: str = ""
class FNOLTriageService:
"""
Servizio di triaging automatico per segnalazioni FNOL.
Combina regole di business e modelli ML per classificare
ogni sinistro e assegnarlo al workflow appropriato.
"""
# Keyword per classificazione tipo sinistro (semplificata)
CLAIM_TYPE_KEYWORDS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"incidente", "scontro", "tamponamento", "urto",
"collision", "crash", "accident"
],
ClaimType.AUTO_THEFT: [
"furto", "rubato", "scomparso", "theft", "stolen"
],
ClaimType.AUTO_WINDSHIELD: [
"parabrezza", "vetro", "windshield", "cristallo"
],
ClaimType.PROPERTY_WATER: [
"allagamento", "perdita", "infiltrazione", "acqua",
"flood", "water", "leak"
],
ClaimType.PROPERTY_FIRE: [
"incendio", "fuoco", "fiamme", "fire", "burn"
],
}
REQUIRED_DOCS: Dict[ClaimType, List[str]] = {
ClaimType.AUTO_COLLISION: [
"CID/CAI o rapporto polizia",
"foto veicolo (4+ lati)",
"documento identità",
"patente di guida",
"carta di circolazione",
],
ClaimType.AUTO_THEFT: [
"denuncia polizia (entro 48h)",
"documento identità",
"carte del veicolo",
"chiavi originali",
],
ClaimType.PROPERTY_WATER: [
"foto danni",
"intervento idraulico (se disponibile)",
"preventivo riparazione",
],
}
def triage(
self, fnol: FNOLSubmission, fraud_score: float = 0.0
) -> FNOLAssessment:
"""Esegue il triaging automatico della segnalazione FNOL."""
claim_type = self._classify_type(fnol.incident_description)
complexity = self._assess_complexity(fnol, fraud_score)
severity = self._estimate_severity(fnol, claim_type)
auto_eligible = self._is_auto_settlement_eligible(fnol, complexity, fraud_score)
workflow = self._assign_workflow(claim_type, complexity)
priority = self._calculate_priority(fnol, complexity, fraud_score)
return FNOLAssessment(
claim_id=fnol.claim_id,
claim_type=claim_type,
complexity=complexity,
estimated_severity=severity,
auto_settlement_eligible=auto_eligible,
required_documents=self.REQUIRED_DOCS.get(claim_type, [
"documento identità",
"foto danni",
"preventivo riparazione",
]),
assigned_workflow=workflow,
fraud_risk_score=fraud_score,
priority=priority,
routing_notes=self._build_routing_notes(
fnol, complexity, fraud_score
),
)
def _classify_type(self, description: str) -> ClaimType:
"""Classifica il tipo di sinistro dalla descrizione testuale."""
desc_lower = description.lower()
scores: Dict[ClaimType, int] = {}
for claim_type, keywords in self.CLAIM_TYPE_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in desc_lower)
if score > 0:
scores[claim_type] = score
if not scores:
return ClaimType.UNKNOWN
return max(scores, key=lambda k: scores[k])
def _assess_complexity(
self, fnol: FNOLSubmission, fraud_score: float
) -> ClaimComplexity:
"""Determina la complessità del sinistro."""
if fnol.injuries_reported:
return ClaimComplexity.LITIGIOUS
if fraud_score > 0.7:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved and not fnol.police_report_available:
return ClaimComplexity.COMPLEX
if fnol.third_parties_involved or fraud_score > 0.4:
return ClaimComplexity.STANDARD
return ClaimComplexity.SIMPLE
def _estimate_severity(
self, fnol: FNOLSubmission, claim_type: ClaimType
) -> str:
"""Stima la severita economica (low/medium/high)."""
if fnol.injuries_reported:
return "high"
if claim_type == ClaimType.AUTO_THEFT:
return "high"
if fnol.third_parties_involved:
return "medium"
return "low"
def _is_auto_settlement_eligible(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> bool:
"""Verifica l'eleggibilita per liquidazione automatica."""
if complexity not in [ClaimComplexity.SIMPLE, ClaimComplexity.STANDARD]:
return False
if fraud_score > 0.3:
return False
if fnol.injuries_reported or fnol.third_parties_involved:
return False
if len(fnol.photos) < 2:
return False
return True
def _assign_workflow(
self, claim_type: ClaimType, complexity: ClaimComplexity
) -> str:
workflow_map = {
(ClaimType.AUTO_COLLISION, ClaimComplexity.SIMPLE): "auto_collision_fast_track",
(ClaimType.AUTO_COLLISION, ClaimComplexity.STANDARD): "auto_collision_standard",
(ClaimType.AUTO_COLLISION, ClaimComplexity.COMPLEX): "auto_collision_manual",
(ClaimType.AUTO_THEFT, ClaimComplexity.SIMPLE): "auto_theft_standard",
(ClaimType.AUTO_WINDSHIELD, ClaimComplexity.SIMPLE): "windshield_auto",
}
return workflow_map.get(
(claim_type, complexity),
f"generic_{complexity.value}_workflow"
)
def _calculate_priority(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> int:
if fnol.injuries_reported:
return 1 # massima priorità
if complexity == ClaimComplexity.LITIGIOUS:
return 1
if fraud_score > 0.7:
return 2 # priorità alta per indagine fraud
if complexity == ClaimComplexity.COMPLEX:
return 3
return 5 # normale
def _build_routing_notes(
self,
fnol: FNOLSubmission,
complexity: ClaimComplexity,
fraud_score: float,
) -> str:
notes = []
if fnol.injuries_reported:
notes.append("ATTENZIONE: lesioni personali dichiarate - escalation legal/medical obbligatoria")
if fraud_score > 0.5:
notes.append(f"Fraud score elevato ({fraud_score:.2f}) - revisione SIU raccomandata")
if not fnol.photos:
notes.append("Nessuna foto allegata - richiedere al cliente prima di procedere")
return "; ".join(notes) if notes else "Nessuna nota particolare"
Araba Hasar Tahmini için Bilgisayarlı Görme
Otomatik hasar tahmini, hasar otomasyonunda en etkili yapay zeka bileşenidir. Müşteri hasarlı aracı akıllı telefonuyla fotoğraflıyor, sistem görüntüleri analiz ediyor Hasarlı parçaları tanımlamak, gerekli müdahale türünü tahmin etmek (onarım vs.) değiştirme) ve güncellenmiş fiyat veritabanlarına dayalı olarak bir maliyet tahmini hesaplayın.
Sektörde en çok kullanılan modeller bir araya geliyor nesne algılama (tanımlayın hasarlı parçalar) ile hasar şiddeti sınıflandırması (varlığı sınıflandırın küçükten toplam hasara kadar). Pazar lideri Tractable gibi yaklaşımlar bunu kanıtladı Bu sistemlerin uzman bir değerleme uzmanının doğruluğuyla eşleşebileceği veya bu doğruluğu aşabileceği.
import torch
import torchvision.transforms as T
from torchvision.models import resnet50, ResNet50_Weights
import numpy as np
from PIL import Image
from typing import Dict, List, Tuple
from dataclasses import dataclass
import io
@dataclass
class DamageRegion:
"""Regione di danno identificata nell'immagine."""
part_name: str # es. "bumper_front", "door_left"
damage_type: str # scratch, dent, crack, broken
severity: str # minor, moderate, severe, total_loss
confidence: float # 0-1
repair_vs_replace: str # "repair" o "replace"
estimated_cost_eur: float
@dataclass
class VehicleDamageAssessment:
"""Risultato completo dell'analisi danni veicolo."""
claim_id: str
images_analyzed: int
damage_regions: List[DamageRegion]
total_estimated_cost: float
total_loss_likelihood: float # 0-1
settlement_recommendation: str
confidence_overall: float
requires_physical_inspection: bool
assessment_notes: str
class VehicleDamageClassifier:
"""
Classificatore di danni veicolo basato su transfer learning.
Architettura: ResNet-50 fine-tuned su dataset proprietario di danni auto
Output: classificazione per parte + severita + tipo danno
In produzione: usare modelli specializzati come quelli di Tractable,
Mitchell, o modelli addestrati su dataset interni.
"""
# Parti veicolo monitorabili
VEHICLE_PARTS = [
"bumper_front", "bumper_rear",
"hood", "trunk",
"door_front_left", "door_front_right",
"door_rear_left", "door_rear_right",
"fender_front_left", "fender_front_right",
"windshield_front", "windshield_rear",
"headlight_left", "headlight_right",
"mirror_left", "mirror_right",
"roof", "pillar",
]
# Tabella costi di riferimento (EUR, valori indicativi 2025)
REPAIR_COST_TABLE: Dict[str, Dict[str, float]] = {
"bumper_front": {"minor": 200, "moderate": 600, "severe": 1200, "replace": 900},
"bumper_rear": {"minor": 200, "moderate": 550, "severe": 1100, "replace": 850},
"hood": {"minor": 300, "moderate": 800, "severe": 1500, "replace": 1200},
"door_front_left": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"door_front_right": {"minor": 250, "moderate": 700, "severe": 1400, "replace": 1100},
"windshield_front": {"minor": 0, "moderate": 350, "severe": 600, "replace": 450},
"headlight_left": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
"headlight_right": {"minor": 80, "moderate": 200, "severe": 400, "replace": 350},
}
DEFAULT_COST = {"minor": 150, "moderate": 450, "severe": 900, "replace": 700}
def __init__(self, model_path: str = "damage_classifier.pt") -> None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model(model_path)
self.transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def _load_model(self, model_path: str) -> torch.nn.Module:
"""Carica il modello fine-tuned da file o usa il pretrained come fallback."""
try:
model = resnet50(weights=None)
# Adatta la testa di classificazione al numero di parti*severita
num_classes = len(self.VEHICLE_PARTS) * 4 # 4 livelli severita
model.fc = torch.nn.Linear(model.fc.in_features, num_classes)
state = torch.load(model_path, map_location=self.device)
model.load_state_dict(state)
print(f"Modello caricato da {model_path}")
except FileNotFoundError:
print("Modello fine-tuned non trovato, uso pretrained ResNet50 (demo only)")
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.eval()
return model.to(self.device)
def analyze_image(self, image_bytes: bytes) -> List[Tuple[str, str, float]]:
"""
Analizza una singola immagine.
Returns: lista di (part_name, severity, confidence)
"""
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
tensor = self.transform(image).unsqueeze(0).to(self.device)
with torch.no_grad():
logits = self.model(tensor)
probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
# Soglia: riporta solo danni con confidence > 30%
results = []
threshold = 0.30
for i, part in enumerate(self.VEHICLE_PARTS):
severities = ["minor", "moderate", "severe", "total_loss"]
for j, sev in enumerate(severities):
idx = i * 4 + j
if idx < len(probs) and probs[idx] > threshold:
results.append((part, sev, float(probs[idx])))
return sorted(results, key=lambda x: x[2], reverse=True)
def estimate_repair_cost(self, part: str, severity: str) -> Tuple[float, str]:
"""
Stima il costo di riparazione e determina repair vs replace.
Returns: (cost_eur, repair_or_replace)
"""
cost_table = self.REPAIR_COST_TABLE.get(part, self.DEFAULT_COST)
if severity in ["severe", "total_loss"]:
replace_cost = cost_table.get("replace", 700)
repair_cost = cost_table.get("severe", 900)
if replace_cost < repair_cost * 0.8:
return replace_cost, "replace"
return repair_cost, "repair"
repair_cost = cost_table.get(severity, 150)
return repair_cost, "repair"
def assess_multiple_images(
self, claim_id: str, image_bytes_list: List[bytes]
) -> VehicleDamageAssessment:
"""
Valutazione completa su più immagini del veicolo.
Aggrega i risultati di tutte le foto per una stima robusta.
"""
# Raccoglie rilevazioni da tutte le immagini
all_detections: Dict[str, List[Tuple[str, float]]] = {}
for img_bytes in image_bytes_list:
detections = self.analyze_image(img_bytes)
for part, severity, confidence in detections:
if part not in all_detections:
all_detections[part] = []
all_detections[part].append((severity, confidence))
# Consolida: per ogni parte prende la severita più alta con confidence media
damage_regions: List[DamageRegion] = []
total_cost = 0.0
severity_order = {"minor": 0, "moderate": 1, "severe": 2, "total_loss": 3}
for part, severity_confidences in all_detections.items():
# Prendi la severita più alta rilevata
max_sev = max(severity_confidences, key=lambda x: severity_order.get(x[0], 0))
severity, _ = max_sev
avg_confidence = np.mean([c for _, c in severity_confidences])
cost, repair_replace = self.estimate_repair_cost(part, severity)
total_cost += cost
damage_regions.append(DamageRegion(
part_name=part,
damage_type="structural" if severity in ["severe", "total_loss"] else "cosmetic",
severity=severity,
confidence=round(float(avg_confidence), 3),
repair_vs_replace=repair_replace,
estimated_cost_eur=cost,
))
total_loss_likelihood = self._estimate_total_loss(damage_regions, total_cost)
requires_inspection = (
total_loss_likelihood > 0.5 or
total_cost > 8000 or
any(d.severity == "severe" and "windshield" in d.part_name
for d in damage_regions)
)
return VehicleDamageAssessment(
claim_id=claim_id,
images_analyzed=len(image_bytes_list),
damage_regions=damage_regions,
total_estimated_cost=round(total_cost, 2),
total_loss_likelihood=round(total_loss_likelihood, 3),
settlement_recommendation=self._settlement_recommendation(
total_cost, total_loss_likelihood
),
confidence_overall=round(
float(np.mean([d.confidence for d in damage_regions])) if damage_regions else 0.0,
3
),
requires_physical_inspection=requires_inspection,
assessment_notes=self._build_notes(damage_regions, total_loss_likelihood),
)
def _estimate_total_loss(
self, regions: List[DamageRegion], total_cost: float
) -> float:
"""Stima la probabilità di perdita totale."""
if total_cost > 15000:
return 0.95
if total_cost > 10000:
return 0.70
severe_count = sum(1 for r in regions if r.severity in ["severe", "total_loss"])
if severe_count >= 4:
return 0.80
if severe_count >= 2:
return 0.40
return max(0.0, (total_cost - 5000) / 10000) if total_cost > 5000 else 0.05
def _settlement_recommendation(self, cost: float, total_loss_prob: float) -> str:
if total_loss_prob > 0.7:
return "TOTAL_LOSS_SETTLEMENT"
if cost > 8000:
return "HIGH_VALUE_REPAIR_AUTHORIZATION"
if cost > 3000:
return "STANDARD_REPAIR_AUTHORIZATION"
return "FAST_TRACK_SETTLEMENT"
def _build_notes(
self, regions: List[DamageRegion], total_loss_prob: float
) -> str:
notes = []
if total_loss_prob > 0.5:
notes.append("Elevata probabilità di perdita totale - verificare valore veicolo")
severe = [r.part_name for r in regions if r.severity == "severe"]
if severe:
notes.append(f"Danni severi rilevati su: {', '.join(severe)}")
return "; ".join(notes) if notes else "Assessment completato senza anomalie"
Belgelerden Bilgi Çıkarmak için NLP
Her kaza düzinelerce belge üretir: polis raporları, tıbbi raporlar, tahminler çalıştay, tanık ifadeleri, CID/CAI raporları. Bu belgelerin manuel olarak işlenmesi yavaştır (1-3 gün) ve hataya açıktır. Gelişmiş OCR ile birleştirilmiş modern NLP sistemleri, yapılandırılmış bilgileri %90-96 doğrulukla otomatik olarak çıkarırlar.
import spacy
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, date
from enum import Enum
class DocumentType(str, Enum):
POLICE_REPORT = "police_report"
MEDICAL_REPORT = "medical_report"
REPAIR_ESTIMATE = "repair_estimate"
CID_CAI = "cid_cai" # Constatazione Amichevole di Incidente
WITNESS_STATEMENT = "witness_statement"
INVOICE = "invoice"
UNKNOWN = "unknown"
@dataclass
class ExtractedEntity:
"""Entità estratta da un documento."""
entity_type: str
value: str
confidence: float
source_text: str # testo originale da cui e stata estratta
position: Tuple[int, int] # start, end nel documento
@dataclass
class DocumentExtraction:
"""Risultato dell'estrazione NLP da un documento assicurativo."""
document_type: DocumentType
document_date: Optional[date]
entities: List[ExtractedEntity]
structured_data: Dict
extraction_confidence: float
raw_text: str
class InsuranceDocumentExtractor:
"""
Estrattore NLP per documenti assicurativi.
Combina:
- spaCy per NER (Named Entity Recognition)
- Regex per pattern specifici del dominio assicurativo
- Regole di business per estrazione strutturata
"""
# Pattern regex per entità assicurative italiane
PATTERNS: Dict[str, str] = {
"targa": r"\b[A-Z]{2}\d{3}[A-Z]{2}\b|\b[A-Z]{2}\d{5}\b",
"polizza": r"(?:polizza|n\.\s*pol\.?)\s*[:\.]?\s*([A-Z0-9/-]{6,20})",
"codice_fiscale": r"\b[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]\b",
"partita_iva": r"\b(?:IT)?\d{11}\b",
"euro_amount": r"(?:EUR?|€)\s*[\d.,]{1,10}|[\d.,]{1,10}\s*(?:EUR?|€)",
"data": r"\b\d{1,2}[/\-\.]\d{1,2}[/\-\.]\d{2,4}\b",
"phone": r"(?:\+39|0039)?\s*(?:\d{2,4}[\s\-]?){2,4}\d{4}",
"email": r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,6}",
}
def __init__(self, spacy_model: str = "it_core_news_lg") -> None:
try:
self.nlp = spacy.load(spacy_model)
except OSError:
print(f"Modello spaCy '{spacy_model}' non trovato - installa con:")
print(f"python -m spacy download {spacy_model}")
self.nlp = None
def classify_document(self, text: str) -> DocumentType:
"""Classifica il tipo di documento basandosi sul contenuto."""
text_lower = text.lower()
classification_rules = [
(DocumentType.POLICE_REPORT, ["verbale", "polizia stradale", "carabinieri", "codice della strada", "accertamento"]),
(DocumentType.MEDICAL_REPORT, ["diagnosi", "prognosi", "lesioni", "ospedale", "pronto soccorso", "medico"]),
(DocumentType.CID_CAI, ["constatazione amichevole", "cid", "cai", "modulo blu"]),
(DocumentType.REPAIR_ESTIMATE, ["preventivo", "officina", "carrozzeria", "ricambi", "manodopera"]),
(DocumentType.INVOICE, ["fattura", "ricevuta", "importo totale", "iva", "imponibile"]),
]
scores: Dict[DocumentType, int] = {}
for doc_type, keywords in classification_rules:
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[doc_type] = score
return max(scores, key=lambda k: scores[k]) if scores else DocumentType.UNKNOWN
def extract(self, raw_text: str) -> DocumentExtraction:
"""Estrae tutte le informazioni rilevanti dal documento."""
doc_type = self.classify_document(raw_text)
entities = self._extract_entities(raw_text)
structured = self._build_structured_data(entities, doc_type, raw_text)
confidence = self._calculate_confidence(entities, doc_type)
doc_date = self._extract_document_date(entities)
return DocumentExtraction(
document_type=doc_type,
document_date=doc_date,
entities=entities,
structured_data=structured,
extraction_confidence=confidence,
raw_text=raw_text,
)
def _extract_entities(self, text: str) -> List[ExtractedEntity]:
"""Estrae entità con regex e NER spaCy."""
entities: List[ExtractedEntity] = []
# Estrazione con regex
for entity_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text, re.IGNORECASE):
entities.append(ExtractedEntity(
entity_type=entity_type,
value=match.group().strip(),
confidence=0.85, # alta confidence per regex su formato noto
source_text=text[max(0, match.start()-20):match.end()+20],
position=(match.start(), match.end()),
))
# Estrazione NER con spaCy
if self.nlp:
doc = self.nlp(text[:100000]) # limit per performance
for ent in doc.ents:
if ent.label_ in ["PER", "ORG", "LOC", "DATE", "MONEY"]:
entities.append(ExtractedEntity(
entity_type=f"spacy_{ent.label_.lower()}",
value=ent.text.strip(),
confidence=0.75,
source_text=text[max(0, ent.start_char-20):ent.end_char+20],
position=(ent.start_char, ent.end_char),
))
return entities
def _build_structured_data(
self,
entities: List[ExtractedEntity],
doc_type: DocumentType,
text: str,
) -> Dict:
"""Costruisce un dizionario strutturato dal documento."""
structured: Dict = {"document_type": doc_type.value}
# Raggruppa entità per tipo
by_type: Dict[str, List[str]] = {}
for ent in entities:
by_type.setdefault(ent.entity_type, []).append(ent.value)
# Mappa entità in campi strutturati
if "targa" in by_type:
structured["vehicle_plates"] = list(set(by_type["targa"]))
if "polizza" in by_type:
structured["policy_numbers"] = list(set(by_type["polizza"]))
if "euro_amount" in by_type:
amounts = []
for amt_str in by_type["euro_amount"]:
clean = re.sub(r"[^\d,.]", "", amt_str).replace(",", ".")
try:
amounts.append(float(clean))
except ValueError:
pass
structured["amounts_eur"] = sorted(amounts)
structured["max_amount_eur"] = max(amounts) if amounts else 0
if "spacy_per" in by_type:
structured["persons_mentioned"] = list(set(by_type["spacy_per"]))
if "spacy_loc" in by_type:
structured["locations_mentioned"] = list(set(by_type["spacy_loc"]))
return structured
def _calculate_confidence(
self, entities: List[ExtractedEntity], doc_type: DocumentType
) -> float:
if not entities:
return 0.1
avg = float(sum(e.confidence for e in entities) / len(entities))
# Penalizza se il tipo e UNKNOWN
if doc_type == DocumentType.UNKNOWN:
avg *= 0.7
return round(min(avg, 1.0), 3)
def _extract_document_date(
self, entities: List[ExtractedEntity]
) -> Optional[date]:
date_entities = [e for e in entities if e.entity_type == "data"]
for de in date_entities:
for fmt in ["%d/%m/%Y", "%d-%m-%Y", "%d.%m.%Y", "%d/%m/%y"]:
try:
return datetime.strptime(de.value, fmt).date()
except ValueError:
continue
return None
Mutabakat İş Akışının Düzenlenmesi
Orkestrasyon sistemin beynidir: tüm bileşenleri koordine eder (FNOL triyajı, hasar değerlendirme, belge çıkarma, dolandırıcılık puanlaması) ve talebin durum geçişlerini yönetir zamanla, sözleşmeye dayalı SLA'lara ve iş yükseltme kurallarına uyarak.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
import asyncio
class ClaimStatus(str, Enum):
RECEIVED = "received"
TRIAGED = "triaged"
DOCUMENTS_PENDING = "documents_pending"
ASSESSMENT_IN_PROGRESS = "assessment_in_progress"
FRAUD_REVIEW = "fraud_review"
SETTLEMENT_OFFERED = "settlement_offered"
SETTLEMENT_ACCEPTED = "settlement_accepted"
SETTLEMENT_DISPUTED = "settlement_disputed"
MANUAL_REVIEW = "manual_review"
CLOSED = "closed"
REJECTED = "rejected"
@dataclass
class ClaimWorkflowState:
"""Stato corrente del workflow di un sinistro."""
claim_id: str
status: ClaimStatus
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None # handler umano o "auto"
sla_deadline: Optional[datetime] = None
settlement_amount: Optional[float] = None
status_history: List[Dict] = field(default_factory=list)
notes: List[str] = field(default_factory=list)
def transition_to(self, new_status: ClaimStatus, note: str = "") -> None:
"""Transizione di stato con audit trail."""
self.status_history.append({
"from": self.status.value,
"to": new_status.value,
"timestamp": datetime.now().isoformat(),
"note": note,
})
self.status = new_status
self.updated_at = datetime.now()
if note:
self.notes.append(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] {note}")
class ClaimsWorkflowEngine:
"""
Engine di orchestrazione per il workflow sinistri.
Gestisce le transizioni di stato, gli SLA e le escalation.
In produzione: usare Temporal.io o AWS Step Functions
per la durabilita e il recovery dei workflow.
"""
SLA_HOURS: Dict[str, int] = {
"simple": 4, # 4 ore per sinistri semplici
"standard": 24, # 24 ore per sinistri standard
"complex": 72, # 72 ore per sinistri complessi
"litigious": 168, # 7 giorni per sinistri con contenziosi
}
async def process_claim(
self,
claim_state: ClaimWorkflowState,
fnol: "FNOLSubmission",
triage: "FNOLAssessment",
damage_assessment: Optional["VehicleDamageAssessment"] = None,
doc_extractions: Optional[List["DocumentExtraction"]] = None,
) -> ClaimWorkflowState:
"""Processa un sinistro attraverso il workflow completo."""
# Step 1: Triage e routing
claim_state.transition_to(
ClaimStatus.TRIAGED,
f"Tipo: {triage.claim_type.value}, Complessità: {triage.complexity.value}"
)
sla_hours = self.SLA_HOURS.get(triage.complexity.value, 72)
claim_state.sla_deadline = datetime.now() + timedelta(hours=sla_hours)
# Step 2: Check documenti
if not fnol.documents and not fnol.photos:
claim_state.transition_to(
ClaimStatus.DOCUMENTS_PENDING,
f"Documenti mancanti: {', '.join(triage.required_documents)}"
)
await self._notify_customer_documents_needed(claim_state, triage)
return claim_state
# Step 3: Fraud check
if triage.fraud_risk_score > 0.5:
claim_state.transition_to(
ClaimStatus.FRAUD_REVIEW,
f"Fraud score: {triage.fraud_risk_score:.2f} - invio a SIU"
)
await self._escalate_to_siu(claim_state, triage)
return claim_state
# Step 4: Settlement automatico se eleggibile
if triage.auto_settlement_eligible and damage_assessment:
settlement = damage_assessment.total_estimated_cost
claim_state.settlement_amount = settlement
claim_state.transition_to(
ClaimStatus.SETTLEMENT_OFFERED,
f"Offerta automatica: EUR {settlement:.2f}"
)
await self._send_settlement_offer(claim_state, settlement)
return claim_state
# Step 5: Review manuale
claim_state.transition_to(
ClaimStatus.MANUAL_REVIEW,
"Invio a handler umano per valutazione"
)
await self._assign_human_handler(claim_state, triage)
return claim_state
async def _notify_customer_documents_needed(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Notifica il cliente dei documenti mancanti."""
# In produzione: integrazione con Twilio SMS/WhatsApp o SendGrid
print(f"[NOTIFY] Sinistro {state.claim_id}: richiedere documenti al cliente")
print(f" Documenti necessari: {triage.required_documents}")
async def _escalate_to_siu(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Escalation all'unita investigativa speciale (SIU)."""
print(f"[SIU] Sinistro {state.claim_id} flaggato per fraud review")
print(f" Fraud score: {triage.fraud_risk_score:.2f}")
state.assigned_to = "siu_team"
async def _send_settlement_offer(
self, state: ClaimWorkflowState, amount: float
) -> None:
"""Invia offerta di liquidazione automatica al cliente."""
print(f"[SETTLEMENT] Sinistro {state.claim_id}: offerta EUR {amount:.2f}")
print(f" Deadline accettazione: 10 giorni")
async def _assign_human_handler(
self,
state: ClaimWorkflowState,
triage: "FNOLAssessment",
) -> None:
"""Assegna il sinistro a un handler umano in base alla complessità."""
handler_map = {
"litigious": "legal_team",
"complex": "senior_adjuster",
"standard": "adjuster_pool",
}
state.assigned_to = handler_map.get(triage.complexity.value, "adjuster_pool")
print(f"[ASSIGN] Sinistro {state.claim_id} assegnato a: {state.assigned_to}")
Talep Otomasyonu için Metrikleri İzleme
Bir talep otomasyon sisteminin performansını izlemek, aşağıdaki ölçümleri gerektirir: basit makine öğrenimi ölçümlerinin ötesinde. İş ve operasyonel ölçümler eşit derecede önemlidir sistemin gerçekten müşteri deneyimini iyileştirdiğinden emin olmak ve operasyonun karlılığı.
Hasar Otomasyon Sistemi KPI'sı
| Metrik | Tanım | Hedef |
|---|---|---|
| Otomasyon Oranı | İnsan müdahalesi olmadan kapatılan taleplerin yüzdesi | > %50 |
| Temassız Talep Oranı | Fiziksel değerlendirme yapılmayan araba kazalarının yüzdesi | > %70 (basit araba) |
| Ortalama Uzlaşma Süresi | FNOL Zamanı - Otomatik Kaza Çözümü | < 24 saat (hızlı yol) |
| Belge Çıkarma Doğruluğu | Alanların %'si doğru şekilde çıkartıldı | > %90 |
| Hasar Tahmini Sapması | AI tahmini ile nihai maliyet arasındaki sapma yüzdesi | < %15 |
| Yanlış Pozitif Dolandırıcılık Oranı | Dolandırıcılık olarak işaretlenen meşru iddiaların yüzdesi | < %2 |
| Müşteri Memnuniyeti (NPS) | Kayıp sonrası NPS | > +30 (eski +10'a kıyasla) |
En İyi Uygulamalar ve Anti-kalıplar
Talep Otomasyonu için En İyi Uygulamalar
- Ön camla başlayın: ve tam otomasyon için en basit kullanım durumu — açıkça görülebilen hasar, standartlaştırılmış maliyetler, üçüncü tarafların dahil olmaması
- Kişisel yaralanma durumunda zorunlu döngüde insan: kişisel yaralanma taleplerini asla tamamen otomatikleştirmeyin; Bir insan işleyiciye iletilmesi zorunludur
- Her zaman en az 4 fotoğraf isteyin: ön, arka, sol taraf, sağ taraf; artı kilometre sayacı ve plaka; yetersiz fotoğraf doğruluğu büyük ölçüde azaltır
- Yukarı yönlü bir dolandırıcılık puanlama modelini entegre edin: Dolandırıcılık iddialarının işleme konulmasını önlemek için, dolandırıcılık kontrolü hasar tahmininden önce yapılmalıdır, sonra değil
- Değişmez denetim izi: Her sistem kararı (otomatik veya manuel), herhangi bir anlaşmazlık için zaman damgası, model sürümü ve giriş değerleriyle birlikte kaydedilmelidir
Kaçınılması Gereken Anti-örüntüler
- Güvenlik ağı olmayan otomasyon: her zaman bir minimum güven eşiği uygular; bu eşiğin altında hak talebinin otomatik olarak gerçek kişi tarafından incelemeye alınması gerekir
- Coğrafi doğrulama olmadan tahminler: onarım fiyatları coğrafi bölgeye göre büyük ölçüde farklılık gösterir; ortalama ulusal oranlara dayalı bir fiyat teklifi yerel gerçeklikten çok uzak olabilir
- Fotoğraf kalitesini göz ardı edin: bulanık, zayıf aydınlatılmış veya kısmi fotoğraflar doğruluğu ciddi şekilde azaltır; işlemeden önce kalite kontrolü uygulayın
- Uzlaşma teklifi çok hızlı: Müşteriye, hasarların tam olup olmadığını değerlendirme şansı verilmeden önce bir çözüm teklif edilmesi, sonradan maliyetli anlaşmazlıklara yol açabilir
Sonuçlar ve Sonraki Adımlar
Görüntü İşleme ve NLP ile talep otomasyonu, yatırım getirisi olan yapay zeka uygulamalarından biridir sigorta sektöründe en yüksek: işletme maliyetlerini %40-60 oranında azaltır, iyileştirir müşteri memnuniyeti sağlar ve standart vakalarda ödeme sürelerini haftalardan saatlere indirir.
Başarının anahtarı aşamalı bir yaklaşımdır: en basit durumlarla başlayın (ön cam, küçük kozmetik hasar), insan değerlendiricilerle karşılaştırıldığında sistemin doğruluğunu doğru bir şekilde ölçer, ve otomasyonu giderek daha karmaşık vakalara genişletirken her zaman Sağlam insan yükselme mekanizması.
Serinin bir sonraki makalesi konuyu ele alıyor Sigorta Dolandırıcılığı Tespiti: Sahte ağları ve davranışsal sinyalleri tanımlamak için grafik analitiğinin nasıl birleştirileceği Taleplerdeki anormal kalıpları tespit edin.
InsurTech Mühendislik Serisi
- 01 - Geliştiriciler için Sigorta Alanı: Ürünler, Aktörler ve Veri Modelleri
- 02 - Bulutta Yerel Politika Yönetimi: API Öncelikli Mimari
- 03 - Telematik İşlem Hattı: Geniş Ölçekte UBI Veri İşleme
- 04 - Yapay Zeka Sigortacılığı: Özellik Mühendisliği ve Risk Puanlaması
- 05 - Talep Otomasyonu: Bilgisayarla Görme ve NLP (bu makale)
- 06 - Dolandırıcılık Tespiti: Grafik Analizi ve Davranışsal Sinyal
- 07 - ACORD Standardı ve Sigorta API Entegrasyonu
- 08 - Uyumluluk Mühendisliği: Solvency II ve UFRS 17







