Zarządzanie polityką natywną w chmurze: architektura oparta na interfejsie API dla platform ubezpieczeniowych
System zarządzania polityką (PAS) to bijące serce każdego systemu firma ubezpieczeniowa. Oraz system, który określa, co klient kupił i na jak długo: a jaka cena i na jakich warunkach. Jednak w zdecydowanej większości towarzystw ubezpieczeniowych Ten krytyczny system włoski i europejski nadal działa na komputerach typu mainframe z lat 90. XX wieku, z nocnymi partiami ostatnie godziny i interfejsy API, które nie istnieją lub są zredukowane do zaplanowanych plików SFTP.
Przejście w kierunku architektury Najpierw natywny interfejs API w chmurze to nie tylko pytanie technologiczne: jest to konieczność konkurencyjna. Nowi gracze InsurTech wprowadzają produkty w ciągu kilku tygodni zamiast miesięcy obsługują klientów w czasie rzeczywistym i integrują dane z kilkudziesięciu źródeł zewnętrznych. Rynek Global InsurTech, którego wartość w 2024 r. wyniesie 5,3 miliarda dolarów, do roku 2024 wzrośnie do ponad 132,9 miliarda dolarów. 2034 (CAGR 22%). 74% tradycyjnych ubezpieczycieli już inwestuje w modernizację systemu podstawowego (Capgemini World InsurTech Report 2024).
W tym artykule budujemy od podstaw system zarządzania polityką natywną w chmurze: od modelowania domeny po projektowanie API, od zarządzania cyklem życia polis po integracje z brokerami, kanały cyfrowe i systemy regulacyjne.
Czego się nauczysz
- Architektura oparta na API do administrowania polityką natywną w chmurze
- Modelowanie pełnego cyklu życia polisy
- Projekt interfejsu API REST do wyceny, wiązania, zatwierdzania, anulowania i odnawiania
- Event sourcing i CQRS w kontekście ubezpieczeń
- Integracja z portalami brokerskimi, e-commerce i pośrednikami
- Wersjonowanie polityk i zarządzanie rekomendacjami
- Wzorce testowe dla złożonych systemów ubezpieczeniowych
1. Cykl życia polisy: od kwot do wygaśnięcia
Zanim napiszemy choćby jedną linię kodu, musimy zrozumieć pełny cykl życia polityki. Model będący standardem branżowym ma następujące główne stany:
Stany cyklu życia polityki
- ZACYTOWANY: Klient otrzymał wycenę, ale jeszcze nie dokonał zakupu
- APLIKACJA: Wniosek jest na etapie oceny ryzyka ubezpieczeniowego
- ZOBOWIĄZANY: Ubezpieczenie jest aktywne i oczekuje na formalne wydanie
- IN_FORCE: Polityka aktywna i w pełni skuteczna
- ZAWIESZONY: Ochrona tymczasowo zawieszona (np. brak płatności)
- ODNOWIONY: Polisa przedłużona na nowy okres
- ODWOŁANY: Polisa anulowana przed naturalnym wygaśnięciem
- WYGAŚŁ / WYGAŚŁ: Polisa, która wygasła w sposób naturalny lub wygasła
Każda zmiana stanu ma precyzyjne reguły biznesowe: kto może ją aktywować, jakie walidacje są żądane, jakie zdarzenia są generowane, jakie powiadomienia są wysyłane. Kod musi wyrazić te zasady w sposób wyraźny i sprawdzalny.
2. Model domeny: Podmioty podstawowe
Dobry system zarządzania polityką opiera się na precyzyjnym modelu dziedzinowym. Oto podstawowe byty i ich relacje w Pythonie z adnotacjami typów:
from dataclasses import dataclass, field
from datetime import date, datetime
from decimal import Decimal
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
class PolicyStatus(str, Enum):
QUOTED = "QUOTED"
APPLICATION = "APPLICATION"
BOUND = "BOUND"
IN_FORCE = "IN_FORCE"
SUSPENDED = "SUSPENDED"
RENEWED = "RENEWED"
CANCELLED = "CANCELLED"
EXPIRED = "EXPIRED"
class CoverageType(str, Enum):
LIABILITY = "LIABILITY"
COMPREHENSIVE = "COMPREHENSIVE"
COLLISION = "COLLISION"
PERSONAL_INJURY = "PERSONAL_INJURY"
PROPERTY = "PROPERTY"
BUSINESS_INTERRUPTION = "BUSINESS_INTERRUPTION"
@dataclass(frozen=True)
class Money:
"""Value object immutabile per importi monetari."""
amount: Decimal
currency: str = "EUR"
def __add__(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError(f"Cannot add {self.currency} and {other.currency}")
return Money(self.amount + other.amount, self.currency)
def __mul__(self, factor: Decimal) -> "Money":
return Money(self.amount * factor, self.currency)
@dataclass(frozen=True)
class Coverage:
"""Una singola copertura assicurativa all'interno di una polizza."""
coverage_id: UUID
coverage_type: CoverageType
limit: Money # Massimale di indennizzo
deductible: Money # Franchigia
premium: Money # Premio per questa copertura
effective_date: date
expiry_date: date
is_active: bool = True
@dataclass
class PolicyHolder:
"""Il contraente/assicurato della polizza."""
party_id: UUID
first_name: str
last_name: str
fiscal_code: str # Codice fiscale IT
date_of_birth: date
email: str
phone: Optional[str]
address: dict # Indirizzo strutturato
@dataclass
class Endorsement:
"""Una modifica alla polizza originale (endorsement / appendice)."""
endorsement_id: UUID
policy_id: UUID
endorsement_type: str # ADD_COVERAGE, REMOVE_COVERAGE, CHANGE_LIMIT, etc.
effective_date: date
description: str
premium_adjustment: Money # Positivo = aumento, negativo = rimborso
previous_state: dict # Snapshot dello stato prima dell'endorsement
new_state: dict # Snapshot dello stato dopo l'endorsement
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Policy:
"""Entità radice dell'aggregato polizza."""
policy_id: UUID
policy_number: str # Es. "AUTO-IT-2024-0001234"
product_code: str # Es. "AUTO_RC", "HOME_ALL_RISK"
status: PolicyStatus
holder: PolicyHolder
coverages: list[Coverage]
endorsements: list[Endorsement]
effective_date: date
expiry_date: date
annual_premium: Money
created_at: datetime
updated_at: datetime
# Metadata di distribuzione
channel: str # "DIRECT", "BROKER", "AGGREGATOR"
agent_code: Optional[str]
broker_code: Optional[str]
def total_premium(self) -> Money:
base = sum(
(c.premium for c in self.coverages if c.is_active),
start=Money(Decimal("0"))
)
adjustments = sum(
(e.premium_adjustment for e in self.endorsements),
start=Money(Decimal("0"))
)
return base + adjustments
def is_active(self) -> bool:
return self.status == PolicyStatus.IN_FORCE
def can_file_claim(self) -> bool:
today = date.today()
return (
self.is_active()
and self.effective_date <= today <= self.expiry_date
)
3. Projektowanie API: API REST do obsługi polityk
Architektura oparta na interfejsie API oznacza, że interfejsy API są pierwszą rzeczą, którą projektujesz, nawet wcześniej wdrożenia. Umowa API musi być jasna, wersjonowana i kompatybilna z oczekiwania branży (standardy ACORD, OpenAPI 3.1).
Poniżej znajduje się struktura głównych interfejsów API punktów końcowych dla usługi zarządzania zasadami:
| Metoda | Punkty końcowe | Działanie | Ciało / odpowiedź |
|---|---|---|---|
| POST | /v1/cytaty | Wygeneruj wycenę | Zapytanie ofertowe -> Odpowiedź na wycenę |
| POST | /v1/quotes/{quoteId}/bind | Zamień wycenę na polisę | BindRequest -> Odpowiedź na politykę |
| DOSTAWAĆ | /v1/policies/{policyId} | Odzyskaj polisę | -> Odpowiedź na zasady |
| POST | /v1/policies/{policyId}/endore | Wydaje adnotacje | Żądanie potwierdzenia -> Odpowiedź potwierdzenia |
| POST | /v1/policies/{policyId}/cancel | Anuluj politykę | Żądanie anulowania -> Odpowiedź na zasady |
| POST | /v1/policies/{policyId}/renew | Odnów polisę | Żądanie odnowienia -> Odpowiedź na politykę |
| DOSTAWAĆ | /v1/policies/{policyId}/documents | Dokumenty polityczne | ->Odpowiedź na listę dokumentów |
Implementacja FastAPI z walidacją Pydantic:
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import Optional
from decimal import Decimal
from datetime import date
from uuid import UUID
import uuid
app = FastAPI(
title="Policy Management API",
version="1.0.0",
description="Cloud-native insurance policy management"
)
# --- Request/Response Models ---
class VehicleInfo(BaseModel):
plate: str = Field(..., pattern=r"^[A-Z]{2}[0-9]{3}[A-Z]{2}$|^[A-Z]{2}[0-9]{5}$")
make: str
model: str
year: int = Field(..., ge=1900, le=2026)
value: Decimal = Field(..., gt=0)
engine_cc: int
class QuoteRequest(BaseModel):
product_code: str = Field(..., pattern="^[A-Z_]+$")
holder_fiscal_code: str
vehicle: Optional[VehicleInfo]
effective_date: date
desired_coverages: list[str]
channel: str = Field(default="DIRECT")
@validator("effective_date")
def effective_date_not_past(cls, v: date) -> date:
if v < date.today():
raise ValueError("Effective date cannot be in the past")
return v
class CoverageQuote(BaseModel):
coverage_type: str
limit: Decimal
deductible: Decimal
annual_premium: Decimal
currency: str = "EUR"
class QuoteResponse(BaseModel):
quote_id: UUID
product_code: str
coverages: list[CoverageQuote]
total_annual_premium: Decimal
currency: str = "EUR"
valid_until: date
terms_version: str
class BindRequest(BaseModel):
payment_reference: str
holder_data: dict # Validated separately by KYC service
selected_coverages: list[str]
broker_code: Optional[str]
class PolicyResponse(BaseModel):
policy_id: UUID
policy_number: str
status: str
product_code: str
effective_date: date
expiry_date: date
annual_premium: Decimal
currency: str = "EUR"
created_at: str
# --- Service Layer ---
class PolicyService:
def __init__(self, repo, pricer, underwriter, event_bus):
self.repo = repo
self.pricer = pricer
self.underwriter = underwriter
self.event_bus = event_bus
async def create_quote(self, request: QuoteRequest) -> QuoteResponse:
# 1. Validate product exists
product = await self.repo.get_product(request.product_code)
if not product:
raise ValueError(f"Unknown product: {request.product_code}")
# 2. Score the risk
risk_score = await self.underwriter.score(request)
# 3. Calculate premium
coverages = await self.pricer.calculate(product, risk_score, request)
# 4. Persist quote with TTL
quote = await self.repo.save_quote(
product_code=request.product_code,
coverages=coverages,
valid_hours=72
)
return QuoteResponse(
quote_id=quote.id,
product_code=request.product_code,
coverages=coverages,
total_annual_premium=sum(c.annual_premium for c in coverages),
valid_until=quote.valid_until,
terms_version=product.terms_version
)
async def bind_policy(
self,
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks
) -> PolicyResponse:
# 1. Retrieve and validate quote
quote = await self.repo.get_quote(quote_id)
if not quote or quote.is_expired():
raise ValueError("Quote expired or not found")
# 2. Final underwriting check
uw_result = await self.underwriter.final_check(quote, request.holder_data)
if uw_result.is_declined:
raise ValueError(f"Policy declined: {uw_result.reason}")
# 3. Create policy
policy = await self.repo.create_policy(quote, request, uw_result)
# 4. Async operations (non-blocking)
background_tasks.add_task(self._post_bind_workflow, policy)
return PolicyResponse(
policy_id=policy.id,
policy_number=policy.number,
status=policy.status,
product_code=policy.product_code,
effective_date=policy.effective_date,
expiry_date=policy.expiry_date,
annual_premium=policy.annual_premium.amount,
currency=policy.annual_premium.currency,
created_at=policy.created_at.isoformat()
)
async def _post_bind_workflow(self, policy):
"""Operazioni asincrone post-bind."""
await self.event_bus.publish("policy.bound", {"policy_id": str(policy.id)})
# Trigger: document generation, CRM update, payment setup
# --- API Routes ---
@app.post("/v1/quotes", response_model=QuoteResponse, status_code=201)
async def create_quote(
request: QuoteRequest,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.create_quote(request)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
@app.post("/v1/quotes/{quote_id}/bind", response_model=PolicyResponse, status_code=201)
async def bind_policy(
quote_id: UUID,
request: BindRequest,
background_tasks: BackgroundTasks,
service: PolicyService = Depends(get_policy_service)
):
try:
return await service.bind_policy(quote_id, request, background_tasks)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
4. Pozyskiwanie zdarzeń na potrzeby ścieżki audytu ubezpieczenia
Branża ubezpieczeniowa ma rygorystyczne wymagania dotyczące audytu i identyfikowalności: każda zmiana polisy muszą być śledzone w sposób niezmienny ze względów regulacyjnych (IVASS, EIOPA) i prawnych. Wydarzenie źródła zaopatrzenia i wzór architektoniczny najlepiej dostosowany do tego wymagania.
Zamiast bezpośrednio aktualizować stan, każda operacja na zasadach generuje niezmienne zdarzenie. Stan obecny i projekcja wszystkich wydarzeń z przeszłości:
from dataclasses import dataclass
from datetime import datetime
from typing import Union
from uuid import UUID
# --- Policy Domain Events ---
@dataclass(frozen=True)
class PolicyQuoted:
event_type: str = "PolicyQuoted"
policy_id: UUID = None
quote_id: UUID = None
product_code: str = None
holder_id: UUID = None
annual_premium: dict = None # {amount, currency}
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyBound:
event_type: str = "PolicyBound"
policy_id: UUID = None
policy_number: str = None
effective_date: str = None
expiry_date: str = None
channel: str = None
agent_code: str = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyEndorsed:
event_type: str = "PolicyEndorsed"
policy_id: UUID = None
endorsement_id: UUID = None
endorsement_type: str = None
premium_adjustment: dict = None
occurred_at: datetime = None
@dataclass(frozen=True)
class PolicyCancelled:
event_type: str = "PolicyCancelled"
policy_id: UUID = None
cancellation_reason: str = None
cancellation_date: str = None
refund_amount: dict = None
requested_by: str = None # "HOLDER", "INSURER", "REGULATOR"
occurred_at: datetime = None
PolicyEvent = Union[PolicyQuoted, PolicyBound, PolicyEndorsed, PolicyCancelled]
# --- Event Store ---
class PolicyEventStore:
"""Append-only store per eventi di polizza."""
def __init__(self, db_pool):
self.db = db_pool
async def append(self, policy_id: UUID, event: PolicyEvent) -> int:
"""Appende un evento e restituisce il nuovo sequence number."""
async with self.db.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO policy_events
(policy_id, event_type, event_data, occurred_at)
VALUES ($1, $2, $3, $4)
RETURNING sequence_number
""",
str(policy_id),
event.event_type,
event.__dict__,
event.occurred_at or datetime.utcnow()
)
return row["sequence_number"]
async def get_history(self, policy_id: UUID) -> list[dict]:
"""Recupera tutti gli eventi di una polizza in ordine cronologico."""
async with self.db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT event_type, event_data, occurred_at, sequence_number
FROM policy_events
WHERE policy_id = $1
ORDER BY sequence_number ASC
""",
str(policy_id)
)
return [dict(r) for r in rows]
async def rebuild_state(self, policy_id: UUID) -> dict:
"""Ricostruisce lo stato corrente della polizza dagli eventi."""
events = await self.get_history(policy_id)
state = {}
for evt in events:
state = self._apply_event(state, evt)
return state
def _apply_event(self, state: dict, event: dict) -> dict:
event_type = event["event_type"]
data = event["event_data"]
if event_type == "PolicyQuoted":
return {
**state,
"status": "QUOTED",
"policy_id": data["policy_id"],
"product_code": data["product_code"],
"annual_premium": data["annual_premium"]
}
elif event_type == "PolicyBound":
return {
**state,
"status": "IN_FORCE",
"policy_number": data["policy_number"],
"effective_date": data["effective_date"],
"expiry_date": data["expiry_date"],
"channel": data["channel"]
}
elif event_type == "PolicyEndorsed":
# Aggiorna il premio
current_premium = state.get("annual_premium", {"amount": "0"})
adjustment = data.get("premium_adjustment", {"amount": "0"})
return {
**state,
"annual_premium": {
"amount": str(
float(current_premium["amount"])
+ float(adjustment["amount"])
),
"currency": current_premium.get("currency", "EUR")
}
}
elif event_type == "PolicyCancelled":
return {
**state,
"status": "CANCELLED",
"cancellation_reason": data["cancellation_reason"],
"cancellation_date": data["cancellation_date"]
}
return state
5. Zarządzanie aprobatami: zmiany średnioterminowe
Zatwierdzenia (zmiany średnioterminowe) to częste i delikatne operacje: dodanie a kierowca, zmiana pojazdu, zmiana adresu. Każde poparcie może mieć wpływ na składkę (proporcjonalnie) i musi wygenerować zaktualizowaną dokumentację.
from decimal import Decimal
from datetime import date
class EndorsementCalculator:
"""Calcola l'aggiustamento di premio per un endorsement mid-term."""
def calculate_prorata_adjustment(
self,
current_annual_premium: Decimal,
new_annual_premium: Decimal,
endorsement_date: date,
policy_expiry: date
) -> Decimal:
"""
Calcola l'aggiustamento pro-rata del premio.
Restituisce importo positivo (addebitare) o negativo (rimborsare).
"""
remaining_days = (policy_expiry - endorsement_date).days
total_days = 365 # Semplificazione; usare calendario reale per polizze annuali
premium_difference = new_annual_premium - current_annual_premium
prorata_factor = Decimal(remaining_days) / Decimal(total_days)
return premium_difference * prorata_factor
class EndorsementService:
def __init__(self, repo, event_store, calculator, doc_service):
self.repo = repo
self.event_store = event_store
self.calculator = calculator
self.doc_service = doc_service
async def add_driver(
self,
policy_id: str,
driver_data: dict,
effective_date: date
) -> dict:
"""Aggiunge un conducente aggiuntivo alla polizza auto."""
# 1. Recupera polizza corrente
policy = await self.repo.get_policy(policy_id)
if not policy.is_active():
raise ValueError("Cannot endorse inactive policy")
# 2. Calcola nuovo premio con driver aggiuntivo
new_premium = await self._reprice_with_driver(policy, driver_data)
# 3. Calcola aggiustamento pro-rata
adjustment = self.calculator.calculate_prorata_adjustment(
current_annual_premium=policy.annual_premium.amount,
new_annual_premium=new_premium,
endorsement_date=effective_date,
policy_expiry=policy.expiry_date
)
# 4. Persisti endorsement
endorsement_id = await self.repo.create_endorsement(
policy_id=policy_id,
type="ADD_DRIVER",
effective_date=effective_date,
driver_data=driver_data,
premium_adjustment=adjustment
)
# 5. Pubblica evento
await self.event_store.append(policy_id, PolicyEndorsed(
policy_id=policy_id,
endorsement_id=endorsement_id,
endorsement_type="ADD_DRIVER",
premium_adjustment={"amount": str(adjustment), "currency": "EUR"}
))
# 6. Genera documento endorsement (async)
await self.doc_service.generate_endorsement_certificate(
policy_id, endorsement_id
)
return {
"endorsement_id": str(endorsement_id),
"premium_adjustment": float(adjustment),
"currency": "EUR",
"effective_date": effective_date.isoformat()
}
async def _reprice_with_driver(self, policy, driver_data: dict) -> Decimal:
"""Riprice la polizza includendo il nuovo conducente."""
# Logic depends on product rating algorithm
base_premium = policy.annual_premium.amount
driver_age = driver_data.get("age", 30)
# Young driver surcharge (semplificato)
if driver_age < 25:
surcharge = base_premium * Decimal("0.25")
elif driver_age < 30:
surcharge = base_premium * Decimal("0.10")
else:
surcharge = Decimal("0")
return base_premium + surcharge
6. Architektura mikrousług: dekompozycja domeny polityk
System zarządzania polityką przedsiębiorstwa jest zwykle podzielony na następujące mikrousługi: każdy z własnym, ograniczonym kontekstem i dobrze określonymi obowiązkami:
| Praca | Odpowiedzialność | Bazy danych | Komunikacja |
|---|---|---|---|
| usługa wyceny | Szacowanie, ocena | Redis (pamięć podręczna) + PostgreSQL | ODPOCZYNEK + Wydarzenia |
| usługa polityczna | Polityka cyklu życia, poparcie | PostgreSQL (magazyn zdarzeń) | ODPOCZYNEK + Wydarzenia |
| usługa rozliczeniowa | Płatności, raty, zwroty pieniędzy | PostgreSQL | Wydarzenia |
| obsługa dokumentów | Generowanie plików PDF, archiwizacja | S3 / Azure Blob | Zdarzenia (asynchroniczne) |
| usługa powiadamiania | E-mail, SMS, push | Redis (kolejka) | Zdarzenia (asynchroniczne) |
| usługa underwritingu | Ocena ryzyka, akceptacja/odrzucenie | Sklep PostgreSQL + ML | ODPOCZYNEK (synchronizacja) |
7. Integracja z Portalem Brokerskim
Brokerzy ubezpieczeniowi uzyskują dostęp do systemu poprzez dedykowany portal. Integracja API musi zarządzać uwierzytelnianiem, dostępem do danych klientów brokera, prowizjami i raportowanie zbiorcze. Poniżej znajduje się przykład oprogramowania pośredniego OAuth2 dla brokerów:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
import httpx
BROKER_PORTAL_SCOPES = [
"policy:read",
"policy:quote",
"policy:bind",
"policy:endorse",
"commission:read"
]
class BrokerAuth:
"""Middleware per autenticazione e autorizzazione broker."""
def __init__(self, jwks_url: str, audience: str):
self.jwks_url = jwks_url
self.audience = audience
self._jwks_cache = None
async def get_jwks(self) -> dict:
if not self._jwks_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(self.jwks_url)
self._jwks_cache = resp.json()
return self._jwks_cache
async def verify_broker_token(
self,
credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer())
) -> dict:
"""Verifica JWT token del broker e restituisce claims."""
try:
jwks = await self.get_jwks()
payload = jwt.decode(
credentials.credentials,
jwks,
algorithms=["RS256"],
audience=self.audience
)
broker_code = payload.get("broker_code")
if not broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing broker_code claim"
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}"
)
# Dependency injection
broker_auth = BrokerAuth(
jwks_url="https://auth.insurer.com/.well-known/jwks.json",
audience="broker-portal-api"
)
@app.get("/v1/broker/{broker_code}/portfolio")
async def get_broker_portfolio(
broker_code: str,
broker_claims: dict = Depends(broker_auth.verify_broker_token),
policy_service: PolicyService = Depends(get_policy_service)
):
"""Restituisce il portfolio polizze di un broker."""
# Verifica che il broker acceda solo ai propri dati
if broker_claims["broker_code"] != broker_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied: broker_code mismatch"
)
policies = await policy_service.get_policies_by_broker(broker_code)
return {
"broker_code": broker_code,
"total_policies": len(policies),
"in_force_count": sum(1 for p in policies if p["status"] == "IN_FORCE"),
"total_gwp": sum(p["annual_premium"] for p in policies),
"policies": policies[:50] # Paginare in produzione
}
8. Automatyczne odnawianie: przetwarzanie wsadowe i zasięg
Odnowienia są operacjami krytycznymi: utrata klienta w wyniku odnowienia jest kosztowna (koszt pozyskiwanie klientów 5-7 razy w porównaniu z utrzymaniem). Nowoczesny system automatyzuje proces odnowienia ze spersonalizowanym zasięgiem i dynamiczną ceną.
import asyncio
from datetime import date, timedelta
from dataclasses import dataclass
@dataclass
class RenewalCandidate:
policy_id: str
holder_email: str
expiry_date: date
current_premium: float
renewal_premium: float
days_to_expiry: int
class RenewalOrchestrator:
"""Gestisce il processo automatico di rinnovo polizze."""
RENEWAL_WINDOWS = [60, 45, 30, 15, 7] # Giorni prima della scadenza
def __init__(self, policy_repo, pricer, notification_svc, event_bus):
self.repo = policy_repo
self.pricer = pricer
self.notification = notification_svc
self.event_bus = event_bus
async def run_daily_renewal_batch(self) -> dict:
"""Job giornaliero: identifica polizze da rinnovare e avvia outreach."""
today = date.today()
stats = {"processed": 0, "quoted": 0, "errors": 0}
for days_ahead in self.RENEWAL_WINDOWS:
target_date = today + timedelta(days=days_ahead)
candidates = await self.repo.get_expiring_policies(target_date)
tasks = [
self._process_renewal_candidate(c, days_ahead)
for c in candidates
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
stats["processed"] += 1
if isinstance(result, Exception):
stats["errors"] += 1
else:
stats["quoted"] += 1
return stats
async def _process_renewal_candidate(
self,
policy_id: str,
days_to_expiry: int
) -> dict:
policy = await self.repo.get_policy(policy_id)
# Calcola nuovo premio di rinnovo
renewal_premium = await self.pricer.calculate_renewal(policy)
premium_change_pct = (
(renewal_premium - float(policy["annual_premium"])) /
float(policy["annual_premium"]) * 100
)
# Salva preventivo rinnovo
renewal_quote = await self.repo.save_renewal_quote(
policy_id=policy_id,
renewal_premium=renewal_premium,
valid_days=days_to_expiry + 30
)
# Scegli template notifica in base ai giorni rimanenti
if days_to_expiry == 60:
template = "renewal_early_notice"
elif days_to_expiry == 30:
template = "renewal_30_day"
elif days_to_expiry == 7:
template = "renewal_final_reminder"
else:
template = "renewal_standard"
# Invia notifica
await self.notification.send_renewal_outreach(
holder_email=policy["holder"]["email"],
template=template,
context={
"policy_number": policy["policy_number"],
"expiry_date": policy["expiry_date"],
"renewal_premium": renewal_premium,
"premium_change_pct": premium_change_pct,
"renewal_link": f"https://portal.insurer.com/renew/{renewal_quote['id']}"
}
)
# Pubblica evento
await self.event_bus.publish("policy.renewal_quoted", {
"policy_id": policy_id,
"renewal_quote_id": renewal_quote["id"],
"days_to_expiry": days_to_expiry
})
return renewal_quote
9. Testowanie: podejścia do złożonych systemów ubezpieczeniowych
Systemy ubezpieczeniowe mają złożoną logikę biznesową i dużą zależność od reguł zmieniają się z biegiem czasu. Testowanie musi obejmować testy jednostkowe dla reguł biznesowych, integrację testy przepływów typu end-to-end oraz testy kontraktów dla interfejsów API udostępnianych brokerom.
import pytest
from decimal import Decimal
from datetime import date, timedelta
from unittest.mock import AsyncMock, MagicMock
class TestEndorsementCalculator:
"""Unit test per il calcolo pro-rata degli endorsement."""
def setup_method(self):
self.calc = EndorsementCalculator()
def test_prorata_increase_with_full_year_remaining(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("750"),
endorsement_date=date(2025, 1, 1),
policy_expiry=date(2026, 1, 1)
)
# 150 difference * (365/365) = 150
assert abs(adjustment - Decimal("150")) < Decimal("1")
def test_prorata_decrease_at_midterm(self):
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("500"),
endorsement_date=date(2025, 7, 1), # 6 mesi dopo inizio
policy_expiry=date(2026, 1, 1)
)
# Aggiustamento negativo (rimborso)
assert adjustment < Decimal("0")
def test_endorsement_on_last_day_is_zero(self):
today = date.today()
adjustment = self.calc.calculate_prorata_adjustment(
current_annual_premium=Decimal("600"),
new_annual_premium=Decimal("800"),
endorsement_date=today,
policy_expiry=today # Scadenza oggi
)
assert adjustment == Decimal("0")
@pytest.mark.asyncio
class TestPolicyService:
"""Integration test del service layer."""
async def test_create_quote_success(self):
# Arrange
mock_repo = AsyncMock()
mock_repo.get_product.return_value = MagicMock(terms_version="v2.1")
mock_pricer = AsyncMock()
mock_pricer.calculate.return_value = [
CoverageQuote(
coverage_type="LIABILITY",
limit=Decimal("1000000"),
deductible=Decimal("0"),
annual_premium=Decimal("400")
)
]
mock_underwriter = AsyncMock()
mock_underwriter.score.return_value = MagicMock(score=0.3)
mock_repo.save_quote.return_value = MagicMock(
id="q-123",
valid_until=date.today() + timedelta(days=72)
)
service = PolicyService(
repo=mock_repo,
pricer=mock_pricer,
underwriter=mock_underwriter,
event_bus=AsyncMock()
)
request = QuoteRequest(
product_code="AUTO_RC",
holder_fiscal_code="CLDFRC80A01H501U",
effective_date=date.today() + timedelta(days=1),
desired_coverages=["LIABILITY"]
)
# Act
result = await service.create_quote(request)
# Assert
assert result.total_annual_premium == Decimal("400")
assert result.terms_version == "v2.1"
mock_underwriter.score.assert_awaited_once()
10. Najlepsze praktyki i antywzorce
Anty-wzorzec: polityka obiektu Boga
Nie twórz pojedynczej tabeli „zasad” zawierającej 200 kolumn. Użyj bogatego modelu domeny z jasne obiekty wartościowe (pieniądze, zasięg, adres) i zagregowane granice. Kolumna „coverage_data JSONB” nie jest modelem domeny.
Anty-wzorzec: zmienny stan bez ścieżki audytu
Nie wykonuj bezpośrednich AKTUALIZACJI zasad bez śledzenia historii. Sektor ubezpieczeń wymaga pełnych audytów dla IVASS. Użyj źródła zdarzeń lub przynajmniej tabeli historii.
Najlepsza praktyka: Idempotencja dla operacji krytycznych
Operacje wiązania i usuwania muszą być idempotentne. Użyj
idempotency_key nagłówek w żądaniach krytycznych. Utrzymaj wynik
i zwróć go, jeśli ten sam klucz zostanie wysłany ponownie.
Najlepsza praktyka: testowanie kontraktu za pomocą Pact
Do integracji z brokerami i agregatorami wykorzystaj testy kontraktowe (Pact). Każdy konsument definiuje oczekiwany kontrakt; dostawca sprawdza, czy go przestrzega. To zapobiega wprowadzaniu zmian w krytycznych interfejsach API.
Wnioski
Natywny dla chmury system zarządzania zasadami oparty na interfejsie API wymaga inwestycji w modelowanie domeny jeszcze przed infrastrukturą. Bogaty model domeny (agregat, obiekty wartości, zdarzenia), projekt API zgodny z ACORD i pozyskiwanie zdarzeń na potrzeby ścieżki audytu są kluczowe na czym budować.
74% ubezpieczycieli modernizuje podstawowe systemy: konkurencja z InsurTechami cyfrowi tubylcy sprawiają, że ta transformacja nie jest już opcjonalna. Wzory opisane w tym artykuł - pozyskiwanie wydarzeń, mikrousługi, API-first, zarządzanie poparciem - to podstawa technika budowania skalowalnych i łatwych w utrzymaniu platform ubezpieczeniowych.
Nadchodzące artykuły z serii InsurTech
- Rurociąg telematyczny: przetwarzanie danych UBI na dużą skalę
- Underwriting AI: inżynieria funkcji i punktacja ryzyka
- Automatyzacja roszczeń: wizja komputerowa i NLP
- Wykrywanie oszustw: analiza wykresów i sygnał behawioralny







