Cloud-Native Policy Management: API-First Architecture pentru platforme de asigurări
Sistemul de administrare a politicilor (PAS) este inima care bate companie de asigurări. Și sistemul care definește ce a cumpărat un client, pentru cât timp, a ce pret si in ce conditii. Cu toate acestea, în marea majoritate a companiilor de asigurări Italian și european, acest sistem critic funcționează încă pe mainframe din anii 1990, cu loturi de noapte care durează ore și API-uri care nu există sau sunt reduse la fișiere SFTP programate.
Tranziția către arhitecturi API-ul nativ în cloud-în primul rând nu este doar o întrebare tehnologic: este o necesitate competitivă. Noii jucători InsurTech lansează produse în câteva săptămâni în loc de luni, deservesc clienții în timp real și integrează date din zeci de surse externe. Piata Global InsurTech, evaluat la 5,3 miliarde USD în 2024, va crește la peste 132,9 miliarde USD până la 2034 (CAGR 22%). 74% dintre asigurătorii tradiționali investesc deja în modernizarea sistem de bază (Capgemini World InsurTech Report 2024).
În acest articol construim un sistem de management al politicilor nativ din cloud de la zero: de la modelare a domeniului până la proiectarea API, de la managementul ciclului de viață al politicii până la integrări cu brokeri, canale digitale și sisteme de reglementare.
Ce vei învăța
- Arhitectură API-first pentru administrarea politicilor native în cloud
- Modelarea ciclului de viață complet al unei politici
- Design API REST pentru cotare, legare, aproba, anulare, reînnoire
- Event sourcing și CQRS în contextul asigurărilor
- Integrare cu portaluri de brokeri, comerț electronic și intermediari
- Versiunea politicilor și gestionarea aprobărilor
- Modele de testare pentru sisteme complexe de asigurare
1. Ciclul de viață al unei politici: de la cote la expirare
Înainte de a scrie o singură linie de cod, trebuie să înțelegem ciclul de viață complet al unei politici. Modelul standard al industriei are următoarele stări principale:
Statele ciclului de viață al politicii
- CITAT: Clientul a primit o ofertă, dar nu a cumpărat încă
- APLICARE: Aplicația se află în faza de revizuire a subscrierii
- LEGAT: Acoperirea este activă, în așteptarea emiterii oficiale
- ÎN VIGOARE: Politică activă și pe deplin eficientă
- SUSPENDAT: Acoperire suspendată temporar (de exemplu, neplată)
- REÎNNOIT: Politică reînnoită pentru o nouă perioadă
- ANULAT: Polița anulată înainte de expirarea naturală
- EXPIRAT / EXPIRAT: Politică care a expirat în mod natural sau a expirat
Fiecare tranziție de stat are reguli de afaceri precise: cine o poate activa, care validări sunt solicitate, ce evenimente sunt generate, ce notificări sunt trimise. Codul trebuie exprima aceste reguli într-un mod explicit și testabil.
2. Model de domeniu: Entități fundamentale
Un sistem bun de management al politicilor se bazează pe un model de domeniu precis. Iată entitățile fundamentale și relațiile lor în Python cu adnotări de tip:
from dataclasses import dataclass, field
from datetime import date, datetime
from decimal import Decimal
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
class PolicyStatus(str, Enum):
QUOTED = "QUOTED"
APPLICATION = "APPLICATION"
BOUND = "BOUND"
IN_FORCE = "IN_FORCE"
SUSPENDED = "SUSPENDED"
RENEWED = "RENEWED"
CANCELLED = "CANCELLED"
EXPIRED = "EXPIRED"
class CoverageType(str, Enum):
LIABILITY = "LIABILITY"
COMPREHENSIVE = "COMPREHENSIVE"
COLLISION = "COLLISION"
PERSONAL_INJURY = "PERSONAL_INJURY"
PROPERTY = "PROPERTY"
BUSINESS_INTERRUPTION = "BUSINESS_INTERRUPTION"
@dataclass(frozen=True)
class Money:
"""Value object immutabile per importi monetari."""
amount: Decimal
currency: str = "EUR"
def __add__(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError(f"Cannot add {self.currency} and {other.currency}")
return Money(self.amount + other.amount, self.currency)
def __mul__(self, factor: Decimal) -> "Money":
return Money(self.amount * factor, self.currency)
@dataclass(frozen=True)
class Coverage:
"""Una singola copertura assicurativa all'interno di una polizza."""
coverage_id: UUID
coverage_type: CoverageType
limit: Money # Massimale di indennizzo
deductible: Money # Franchigia
premium: Money # Premio per questa copertura
effective_date: date
expiry_date: date
is_active: bool = True
@dataclass
class PolicyHolder:
"""Il contraente/assicurato della polizza."""
party_id: UUID
first_name: str
last_name: str
fiscal_code: str # Codice fiscale IT
date_of_birth: date
email: str
phone: Optional[str]
address: dict # Indirizzo strutturato
@dataclass
class Endorsement:
"""Una modifica alla polizza originale (endorsement / appendice)."""
endorsement_id: UUID
policy_id: UUID
endorsement_type: str # ADD_COVERAGE, REMOVE_COVERAGE, CHANGE_LIMIT, etc.
effective_date: date
description: str
premium_adjustment: Money # Positivo = aumento, negativo = rimborso
previous_state: dict # Snapshot dello stato prima dell'endorsement
new_state: dict # Snapshot dello stato dopo l'endorsement
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Policy:
"""Entità radice dell'aggregato polizza."""
policy_id: UUID
policy_number: str # Es. "AUTO-IT-2024-0001234"
product_code: str # Es. "AUTO_RC", "HOME_ALL_RISK"
status: PolicyStatus
holder: PolicyHolder
coverages: list[Coverage]
endorsements: list[Endorsement]
effective_date: date
expiry_date: date
annual_premium: Money
created_at: datetime
updated_at: datetime
# Metadata di distribuzione
channel: str # "DIRECT", "BROKER", "AGGREGATOR"
agent_code: Optional[str]
broker_code: Optional[str]
def total_premium(self) -> Money:
base = sum(
(c.premium for c in self.coverages if c.is_active),
start=Money(Decimal("0"))
)
adjustments = sum(
(e.premium_adjustment for e in self.endorsements),
start=Money(Decimal("0"))
)
return base + adjustments
def is_active(self) -> bool:
return self.status == PolicyStatus.IN_FORCE
def can_file_claim(self) -> bool:
today = date.today()
return (
self.is_active()
and self.effective_date <= today <= self.expiry_date
)
3. Design API: API REST pentru operațiuni de politică
O arhitectură API-first înseamnă că API-urile sunt primul lucru pe care îl proiectați, chiar înainte de asta de implementare. Contractul API trebuie să fie clar, versiuneat și compatibil cu așteptările industriei (standarde ACORD, OpenAPI 3.1).
Mai jos este structura principalelor API-uri endpoint pentru un serviciu de management al politicilor:
| Metodă | Puncte finale | Operațiunea | Corp / Răspuns |
|---|---|---|---|
| POST | /v1/ghilimele | Generați o cotație | QuoteRequest -> QuoteResponse |
| POST | /v1/quotes/{quoteId}/bind | Transformați oferta în politică | BindRequest -> PolicyResponse |
| OBŢINE | /v1/policies/{policyId} | Politica de recuperare | -> PolicyResponse |
| POST | /v1/policies/{policyId}/endorse | Emite aprobări | EndorsementRequest -> EndorsementResponse |
| POST | /v1/policies/{policyId}/cancel | Politica de anulare | CancellationRequest -> PolicyResponse |
| POST | /v1/policies/{policyId}/renew | Reînnoiți politica | RenewalRequest -> PolicyResponse |
| OBŢINE | /v1/policies/{policyId}/documents | Documente de politică | -> DocumentListResponse |
Implementarea FastAPI cu validare Pydantic:
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional
from decimal import Decimal
from datetime import date
from uuid import UUID
import uuid
app = FastAPI(
title="Policy Management API",
version="1.0.0",
description="Cloud-native insurance policy management"
)
# --- Request/Response Models ---
class VehicleInfo(BaseModel):
plate: str = Field(..., pattern=r"^[A-Z]{2}[0-9]{3}[A-Z]{2}$|^[A-Z]{2}[0-9]{5}$")
make: str
model: str
year: int = Field(..., ge=1900, le=2026)
value: Decimal = Field(..., gt=0)
engine_cc: int
class QuoteRequest(BaseModel):
product_code: str = Field(..., pattern="^[A-Z_]+$")
holder_fiscal_code: str
vehicle: Optional[VehicleInfo]
effective_date: date
desired_coverages: list[str]
channel: str = Field(default="DIRECT")
@validator("effective_date")
def effective_date_not_past(cls, v: date) -> date:
if v < date.today():
raise ValueError("Effective date cannot be in the past")
return v
class CoverageQuote(BaseModel):
coverage_type: str
limit: Decimal
deductible: Decimal
annual_premium: Decimal
currency: str = "EUR"
class QuoteResponse(BaseModel):
quote_id: UUID
product_code: str
coverages: list[CoverageQuote]
total_annual_premium: Decimal
currency: str = "EUR"
valid_until: date
terms_version: str
class BindRequest(BaseModel):
payment_reference: str
holder_data: dict # Validated separately by KYC service
selected_coverages: list[str]
broker_code: Optional[str]
class PolicyResponse(BaseModel):
policy_id: UUID
policy_number: str
status: str
product_code: str
effective_date: date
expiry_date: date
annual_premium: Decimal
currency: str = "EUR"
created_at: str
# --- Service Layer ---
class PolicyService:
def __init__(self, repo, pricer, underwriter, event_bus):
self.repo = repo
self.pricer = pricer
self.underwriter = underwriter
self.event_bus = event_bus
async def create_quote(self, request: QuoteRequest) -> QuoteResponse:
# 1. Validate product exists
product = await self.repo.get_product(request.product_code)
if not product:
raise ValueError(f"Unknown product: {request.product_code}")
# 2. Score the risk
risk_score = await self.underwriter.score(request)
# 3. Calculate premium
coverages = await self.pricer.calculate(product, risk_score, request)
# 4. Persist quote with TTL
quote = await self.repo.save_quote(
product_code=request.product_code,
coverages=coverages,
valid_hours=72
)
return QuoteResponse(
quote_id=quote.id,
product_code=request.product_code,
coverages=coverages,
total_annual_premium=sum(c.annual_premium for c in coverages),
valid_until=quote.valid_until,
terms_version=product.terms_version
)
async def bind_policy(
self,
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks
) -> PolicyResponse:
# 1. Retrieve and validate quote
quote = await self.repo.get_quote(quote_id)
if not quote or quote.is_expired():
raise ValueError("Quote expired or not found")
# 2. Final underwriting check
uw_result = await self.underwriter.final_check(quote, request.holder_data)
if uw_result.is_declined:
raise ValueError(f"Policy declined: {uw_result.reason}")
# 3. Create policy
policy = await self.repo.create_policy(quote, request, uw_result)
# 4. Async operations (non-blocking)
background_tasks.add_task(self._post_bind_workflow, policy)
return PolicyResponse(
policy_id=policy.id,
policy_number=policy.number,
status=policy.status,
product_code=policy.product_code,
effective_date=policy.effective_date,
expiry_date=policy.expiry_date,
annual_premium=policy.annual_premium.amount,
currency=policy.annual_premium.currency,
created_at=policy.created_at.isoformat()
)
async def _post_bind_workflow(self, policy):
"""Operazioni asincrone post-bind."""
await self.event_bus.publish("policy.bound", {"policy_id": str(policy.id)})
# Trigger: document generation, CRM update, payment setup
# --- API Routes ---
@app.post("/v1/quotes", response_model=QuoteResponse, status_code=201)
async def create_quote(
request: QuoteRequest,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.create_quote(request)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
@app.post("/v1/quotes/{quote_id}/bind", response_model=PolicyResponse, status_code=201)
async def bind_policy(
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.bind_policy(quote_id, request, background_tasks)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
4. Aprovizionare cu evenimente pentru traseul de audit al asigurărilor
Industria asigurărilor are cerințe stricte de audit și trasabilitate: fiecare modificare a unei polițe trebuie urmărite într-un mod imuabil din motive de reglementare (IVASS, EIOPA) și legale. Evenimentul sursă și modelul arhitectural cel mai potrivit pentru această cerință.
În loc să actualizeze starea direct, fiecare operațiune asupra politicii generează un eveniment imuabil. Starea actuală și proiecția tuturor evenimentelor trecute:
from dataclasses import dataclass
from datetime import datetime
from typing import Union
from uuid import UUID
# --- Policy Domain Events ---
@dataclass(frozen=True)
class PolicyQuoted:
event_type: str = "PolicyQuoted"
policy_id: UUID = None
quote_id: UUID = None
product_code: str = None
holder_id: UUID = None
annual_premium: dict = None # {amount, currency}
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyBound:
event_type: str = "PolicyBound"
policy_id: UUID = None
policy_number: str = None
effective_date: str = None
expiry_date: str = None
channel: str = None
agent_code: str = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyEndorsed:
event_type: str = "PolicyEndorsed"
policy_id: UUID = None
endorsement_id: UUID = None
endorsement_type: str = None
premium_adjustment: dict = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyCancelled:
event_type: str = "PolicyCancelled"
policy_id: UUID = None
cancellation_reason: str = None
cancellation_date: str = None
refund_amount: dict = None
requested_by: str = None # "HOLDER", "INSURER", "REGULATOR"
occurred_at: datetime = None
PolicyEvent = Union[PolicyQuoted, PolicyBound, PolicyEndorsed, PolicyCancelled]
# --- Event Store ---
class PolicyEventStore:
"""Append-only store per eventi di polizza."""
def __init__(self, db_pool):
self.db = db_pool
async def append(self, policy_id: UUID, event: PolicyEvent) -> int:
"""Appende un evento e restituisce il nuovo sequence number."""
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO policy_events
(policy_id, event_type, event_data, occurred_at)
VALUES ($1, $2, $3, $4)
RETURNING sequence_number
""",
str(policy_id),
event.event_type,
event.__dict__,
event.occurred_at or datetime.utcnow()
)
return row["sequence_number"]
async def get_history(self, policy_id: UUID) -> list[dict]:
"""Recupera tutti gli eventi di una polizza in ordine cronologico."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_type, event_data, occurred_at, sequence_number
FROM policy_events
WHERE policy_id = $1
ORDER BY sequence_number ASC
""",
str(policy_id)
)
return [dict(r) for r in rows]
async def rebuild_state(self, policy_id: UUID) -> dict:
"""Ricostruisce lo stato corrente della polizza dagli eventi."""
events = await self.get_history(policy_id)
state = {}
for evt in events:
state = self._apply_event(state, evt)
return state
def _apply_event(self, state: dict, event: dict) -> dict:
event_type = event["event_type"]
data = event["event_data"]
if event_type == "PolicyQuoted":
return {
**state,
"status": "QUOTED",
"policy_id": data["policy_id"],
"product_code": data["product_code"],
"annual_premium": data["annual_premium"]
}
elif event_type == "PolicyBound":
return {
**state,
"status": "IN_FORCE",
"policy_number": data["policy_number"],
"effective_date": data["effective_date"],
"expiry_date": data["expiry_date"],
"channel": data["channel"]
}
elif event_type == "PolicyEndorsed":
# Aggiorna il premio
current_premium = state.get("annual_premium", {"amount": "0"})
adjustment = data.get("premium_adjustment", {"amount": "0"})
return {
**state,
"annual_premium": {
"amount": str(
float(current_premium["amount"])
+ float(adjustment["amount"])
),
"currency": current_premium.get("currency", "EUR")
}
}
elif event_type == "PolicyCancelled":
return {
**state,
"status": "CANCELLED",
"cancellation_reason": data["cancellation_reason"],
"cancellation_date": data["cancellation_date"]
}
return state
5. Managementul aprobării: modificări pe termen mediu
Avizele (modificările la mijlocul termenului) sunt operațiuni frecvente și delicate: adăugarea a șofer, schimbare de vehicul, schimbare de adresă. Fiecare aprobare poate avea are impact asupra primei (pro-rata) și trebuie să genereze documentație actualizată.
from decimal import Decimal
from datetime import date
class EndorsementCalculator:
"""Calcola l'aggiustamento di premio per un endorsement mid-term."""
def calculate_prorata_adjustment(
self,
current_annual_premium: Decimal,
new_annual_premium: Decimal,
endorsement_date: date,
policy_expiry: date
) -> Decimal:
"""
Calcola l'aggiustamento pro-rata del premio.
Restituisce importo positivo (addebitare) o negativo (rimborsare).
"""
remaining_days = (policy_expiry - endorsement_date).days
total_days = 365 # Semplificazione; usare calendario reale per polizze annuali
premium_difference = new_annual_premium - current_annual_premium
prorata_factor = Decimal(remaining_days) / Decimal(total_days)
return premium_difference * prorata_factor
class EndorsementService:
def __init__(self, repo, event_store, calculator, doc_service):
self.repo = repo
self.event_store = event_store
self.calculator = calculator
self.doc_service = doc_service
async def add_driver(
self,
policy_id: str,
driver_data: dict,
effective_date: date
) -> dict:
"""Aggiunge un conducente aggiuntivo alla polizza auto."""
# 1. Recupera polizza corrente
policy = await self.repo.get_policy(policy_id)
if not policy.is_active():
raise ValueError("Cannot endorse inactive policy")
# 2. Calcola nuovo premio con driver aggiuntivo
new_premium = await self._reprice_with_driver(policy, driver_data)
# 3. Calcola aggiustamento pro-rata
adjustment = self.calculator.calculate_prorata_adjustment(
current_annual_premium=policy.annual_premium.amount,
new_annual_premium=new_premium,
endorsement_date=effective_date,
policy_expiry=policy.expiry_date
)
# 4. Persisti endorsement
endorsement_id = await self.repo.create_endorsement(
policy_id=policy_id,
type="ADD_DRIVER",
effective_date=effective_date,
driver_data=driver_data,
premium_adjustment=adjustment
)
# 5. Pubblica evento
await self.event_store.append(policy_id, PolicyEndorsed(
policy_id=policy_id,
endorsement_id=endorsement_id,
endorsement_type="ADD_DRIVER",
premium_adjustment={"amount": str(adjustment), "currency": "EUR"}
))
# 6. Genera documento endorsement (async)
await self.doc_service.generate_endorsement_certificate(
policy_id, endorsement_id
)
return {
"endorsement_id": str(endorsement_id),
"premium_adjustment": float(adjustment),
"currency": "EUR",
"effective_date": effective_date.isoformat()
}
async def _reprice_with_driver(self, policy, driver_data: dict) -> Decimal:
"""Riprice la polizza includendo il nuovo conducente."""
# Logic depends on product rating algorithm
base_premium = policy.annual_premium.amount
driver_age = driver_data.get("age", 30)
# Young driver surcharge (semplificato)
if driver_age < 25:
surcharge = base_premium * Decimal("0.25")
elif driver_age < 30:
surcharge = base_premium * Decimal("0.10")
else:
surcharge = Decimal("0")
return base_premium + surcharge
6. Arhitectura microserviciilor: descompunerea domeniului de politici
Un sistem de management al politicii de întreprindere este de obicei descompus în următoarele microservicii: fiecare cu propriul său context delimitat și responsabilități bine definite:
| Serviciu | Responsabilitate | Baze de date | Comunicare |
|---|---|---|---|
| oferta-serviciu | Estimare, evaluare | Redis (cache) + PostgreSQL | REST + Evenimente |
| serviciu-politică | Politica privind ciclul de viață, aprobarea | PostgreSQL (magazin de evenimente) | REST + Evenimente |
| serviciu de facturare | Plăți, rate, rambursări | PostgreSQL | Evenimente |
| document-serviciu | Generare PDF, arhivare | S3 / Azure Blob | Evenimente (async) |
| serviciu de notificare | E-mail, SMS, push | Redis (coadă) | Evenimente (async) |
| subscriere-serviciu | Scor de risc, acceptare/refuzare | PostgreSQL + Magazin ML | REST (sincronizare) |
7. Integrarea portalului brokerului
Brokerii de asigurări accesează sistemul printr-un portal dedicat. Integrare API trebuie să gestioneze autentificarea, accesul la datele clienților brokerului, comisioanele și raportare agregată. Mai jos este un exemplu de middleware OAuth2 pentru brokeri:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
import httpx
BROKER_PORTAL_SCOPES = [
"policy:read",
"policy:quote",
"policy:bind",
"policy:endorse",
"commission:read"
]
class BrokerAuth:
"""Middleware per autenticazione e autorizzazione broker."""
def __init__(self, jwks_url: str, audience: str):
self.jwks_url = jwks_url
self.audience = audience
self._jwks_cache = None
async def get_jwks(self) -> dict:
if not self._jwks_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(self.jwks_url)
self._jwks_cache = resp.json()
return self._jwks_cache
async def verify_broker_token(
self,
credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer())
) -> dict:
"""Verifica JWT token del broker e restituisce claims."""
try:
jwks = await self.get_jwks()
payload = jwt.decode(
credentials.credentials,
jwks,
algorithms=["RS256"],
audience=self.audience
)
broker_code = payload.get("broker_code")
if not broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing broker_code claim"
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}"
)
# Dependency injection
broker_auth = BrokerAuth(
jwks_url="https://auth.insurer.com/.well-known/jwks.json",
audience="broker-portal-api"
)
@app.get("/v1/broker/{broker_code}/portfolio")
async def get_broker_portfolio(
broker_code: str,
broker_claims: dict = Depends(broker_auth.verify_broker_token),
policy_service: PolicyService = Depends(get_policy_service)
):
"""Restituisce il portfolio polizze di un broker."""
# Verifica che il broker acceda solo ai propri dati
if broker_claims["broker_code"] != broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied: broker_code mismatch"
)
policies = await policy_service.get_policies_by_broker(broker_code)
return {
"broker_code": broker_code,
"total_policies": len(policies),
"in_force_count": sum(1 for p in policies if p["status"] == "IN_FORCE"),
"total_gwp": sum(p["annual_premium"] for p in policies),
"policies": policies[:50] # Paginare in produzione
}
8. Reînnoiri automate: procesare în loturi și sensibilizare
Reînnoirile sunt operațiuni critice: pierderea unui client la reînnoire este costisitoare (cost achiziționarea clienților de 5-7x comparativ cu reținerea). Un sistem modern automatizează proces de reînnoire cu sensibilizare personalizată și prețuri dinamice.
import asyncio
from datetime import date, timedelta
from dataclasses import dataclass
@dataclass
class RenewalCandidate:
policy_id: str
holder_email: str
expiry_date: date
current_premium: float
renewal_premium: float
days_to_expiry: int
class RenewalOrchestrator:
"""Gestisce il processo automatico di rinnovo polizze."""
RENEWAL_WINDOWS = [60, 45, 30, 15, 7] # Giorni prima della scadenza
def __init__(self, policy_repo, pricer, notification_svc, event_bus):
self.repo = policy_repo
self.pricer = pricer
self.notification = notification_svc
self.event_bus = event_bus
async def run_daily_renewal_batch(self) -> dict:
"""Job giornaliero: identifica polizze da rinnovare e avvia outreach."""
today = date.today()
stats = {"processed": 0, "quoted": 0, "errors": 0}
for days_ahead in self.RENEWAL_WINDOWS:
target_date = today + timedelta(days=days_ahead)
candidates = await self.repo.get_expiring_policies(target_date)
tasks = [
self._process_renewal_candidate(c, days_ahead)
for c in candidates
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
stats["processed"] += 1
if isinstance(result, Exception):
stats["errors"] += 1
else:
stats["quoted"] += 1
return stats
async def _process_renewal_candidate(
self,
policy_id: str,
days_to_expiry: int
) -> dict:
policy = await self.repo.get_policy(policy_id)
# Calcola nuovo premio di rinnovo
renewal_premium = await self.pricer.calculate_renewal(policy)
premium_change_pct = (
(renewal_premium - float(policy["annual_premium"])) /
float(policy["annual_premium"]) * 100
)
# Salva preventivo rinnovo
renewal_quote = await self.repo.save_renewal_quote(
policy_id=policy_id,
renewal_premium=renewal_premium,
valid_days=days_to_expiry + 30
)
# Scegli template notifica in base ai giorni rimanenti
if days_to_expiry == 60:
template = "renewal_early_notice"
elif days_to_expiry == 30:
template = "renewal_30_day"
elif days_to_expiry == 7:
template = "renewal_final_reminder"
else:
template = "renewal_standard"
# Invia notifica
await self.notification.send_renewal_outreach(
holder_email=policy["holder"]["email"],
template=template,
context={
"policy_number": policy["policy_number"],
"expiry_date": policy["expiry_date"],
"renewal_premium": renewal_premium,
"premium_change_pct": premium_change_pct,
"renewal_link": f"https://portal.insurer.com/renew/{renewal_quote['id']}"
}
)
# Pubblica evento
await self.event_bus.publish("policy.renewal_quoted", {
"policy_id": policy_id,
"renewal_quote_id": renewal_quote["id"],
"days_to_expiry": days_to_expiry
})
return renewal_quote
9. Testare: Abordări pentru sisteme complexe de asigurări
Sistemele de asigurări au o logică complexă de afaceri și o dependență ridicată de regulile care se schimbă în timp. Testarea trebuie să acopere teste unitare pentru regulile de afaceri, integrare teste pentru fluxuri end-to-end și teste de contract pentru API-uri expuse brokerilor.
import pytest
from decimal import Decimal
from datetime import date, timedelta
from unittest.mock import AsyncMock, MagicMock
class TestEndorsementCalculator:
"""Unit test per il calcolo pro-rata degli endorsement."""
def setup_method(self):
self.calc = EndorsementCalculator()
def test_prorata_increase_with_full_year_remaining(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("750"),
endorsement_date=date(2025, 1, 1),
policy_expiry=date(2026, 1, 1)
)
# 150 difference * (365/365) = 150
assert abs(adjustment - Decimal("150")) < Decimal("1")
def test_prorata_decrease_at_midterm(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("500"),
endorsement_date=date(2025, 7, 1), # 6 mesi dopo inizio
policy_expiry=date(2026, 1, 1)
)
# Aggiustamento negativo (rimborso)
assert adjustment < Decimal("0")
def test_endorsement_on_last_day_is_zero(self):
today = date.today()
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("800"),
endorsement_date=today,
policy_expiry=today # Scadenza oggi
)
assert adjustment == Decimal("0")
@pytest.mark.asyncio
class TestPolicyService:
"""Integration test del service layer."""
async def test_create_quote_success(self):
# Arrange
mock_repo = AsyncMock()
mock_repo.get_product.return_value = MagicMock(terms_version="v2.1")
mock_pricer = AsyncMock()
mock_pricer.calculate.return_value = [
CoverageQuote(
coverage_type="LIABILITY",
limit=Decimal("1000000"),
deductible=Decimal("0"),
annual_premium=Decimal("400")
)
]
mock_underwriter = AsyncMock()
mock_underwriter.score.return_value = MagicMock(score=0.3)
mock_repo.save_quote.return_value = MagicMock(
id="q-123",
valid_until=date.today() + timedelta(days=72)
)
service = PolicyService(
repo=mock_repo,
pricer=mock_pricer,
underwriter=mock_underwriter,
event_bus=AsyncMock()
)
request = QuoteRequest(
product_code="AUTO_RC",
holder_fiscal_code="CLDFRC80A01H501U",
effective_date=date.today() + timedelta(days=1),
desired_coverages=["LIABILITY"]
)
# Act
result = await service.create_quote(request)
# Assert
assert result.total_annual_premium == Decimal("400")
assert result.terms_version == "v2.1"
mock_underwriter.score.assert_awaited_once()
10. Cele mai bune practici și anti-modele
Anti-Pattern: Politica despre obiectul lui Dumnezeu
Nu creați un singur tabel „politici” cu 200 de coloane. Utilizați un model de domeniu bogat cu obiecte cu valoare clară (Bani, Acoperire, Adresă) și granițe agregate. O coloană „coverage_data JSONB” nu este un model de domeniu.
Anti-Pattern: stare mutabilă fără urmă de audit
Nu faceți ACTUALIZĂRI directe asupra unei politici fără a urmări istoricul. Sectorul asigurărilor necesită audituri complete pentru IVASS. Utilizați sursă de evenimente sau cel puțin un tabel istoric.
Cea mai bună practică: Idempotity pentru operațiuni critice
Operațiunile de legare și ștergere trebuie să fie idempotente. Folosiți a
idempotency_key antet în cererile critice. Persista rezultatul
și returnați-l dacă aceeași cheie este trimisă din nou.
Cea mai bună practică: testarea contractelor cu Pact
Pentru integrări cu brokeri și agregatori, utilizați testarea contractelor (Pact). Fiecare consumator definește contractul așteptat; furnizorul verifică dacă îl respectă. Aceasta previne modificările rupturi în API-urile critice.
Concluzii
Un sistem de gestionare a politicilor nativ din cloud API-first necesită investiții în modelare a domeniului chiar înaintea infrastructurii. Modelul de domeniu bogat (agregat, obiecte de valoare, evenimente), proiectarea API compatibilă cu ACORD și aprovizionarea cu evenimente pentru pista de audit sunt cheia pe care să construiască.
74% dintre asigurători modernizează sistemele de bază: concurență cu InsurTechs nativii digitali fac ca această transformare să nu mai fie opțională. Modelele descrise în aceasta articol - aprovizionarea evenimentelor, microservicii, API-first, managementul aprobării - sunt baza tehnică de construire a unor platforme de asigurare scalabile și întreținute.
Articole viitoare din seria InsurTech
- Conducta telematică: procesarea datelor UBI la scară
- Subscriere AI: ingineria caracteristicilor și scorul de risc
- Automatizarea revendicărilor: Computer Vision și NLP
- Detectarea fraudelor: analiză grafică și semnal comportamental







