Automatizare FSMA 204: urmărire, alertă și rechemare prin Python
20 ianuarie 2026 ar fi trebuit să marcheze o cotitură în istoria siguranței alimentare americane. Acolo Regula FSMA 204 — Regulă finală privind trasabilitatea alimentelor al FDA a cerut tuturor operatorilor din lanțul de aprovizionare cu alimente să implementeze sisteme granulare de trasabilitate pentru zeci de categorii de alimente cu risc ridicat. O actualizare de reglementare fără precedent din Legea privind modernizarea siguranței alimentare din 2011, cu repercusiuni globale: oricine exporta pe piața americană – inclusiv companii italiene de vin, brânză și ulei de măsline – era în miză.
În martie 2025, FDA a prelungit termenul limită cu 30 de luni (până la 20 iulie 2028), dar direcția este lipsită de ambiguitate: trasabilitatea granulară pentru alimentele cu risc ridicat nu mai este opțională. Companiile care încep implementarea astăzi vor avea un avantaj competitiv uriaș atunci când conformitatea devine obligatorie. În acest articol, construim un sistem complet în Python — de la proiectarea bazei de date la API-ul REST, de la motorul de alertă la gestionarea retragerii — care îndeplinește cerințele FSMA 204.
Ce vei învăța
- Structura completă a FSMA 204: Food Traceability List, CTE și KDE
- Design de baze de date PostgreSQL pentru trasabilitate pe mai multe niveluri
- FastAPI REST API cu modele Pydantic pentru înregistrarea CTE și interogări one-up/one-down
- Aprovizionare de evenimente imuabile cu Apache Kafka pentru o pistă de audit completă
- Motor de alertă pentru anomalii (lanț de rece, loturi suspecte) cu notificări multicanal
- Flux de lucru de gestionare a retragerii: urmărire/traceforward în mai puțin de 24 de ore (cerință FDA)
- Simulare de exercițiu de rechemare cu valori de performanță
- Comparație FSMA 204 vs Reg. UE 178/2002 și impactul asupra exportatorilor italieni
Seria FoodTech: Unde suntem
Acesta este al șaselea articol din seria FoodTech de pe federicocalo.dev. Iată harta completă:
| # | Titlu | Nivel | Stat |
|---|---|---|---|
| 01 | Conducta IoT pentru agricultura de precizie | Intermediar | Publicat |
| 02 | Viziune computerizată pentru controlul calității alimentelor | Intermediar | Publicat |
| 03 | ML și Edge Computing pentru predicția culturilor | Avansat | Publicat |
| 04 | Blockchain și lanțul de aprovizionare cu alimente transparent | Avansat | Publicat |
| 05 | Prognoza cererii pentru comerțul cu amănuntul la scară largă cu Prophet și LSTM | Avansat | Publicat |
| 06 | Automatizare FSMA 204: urmărire, alertă și rechemare prin Python | Avansat | EŞTI AICI |
| 07 | Agricultură verticală: tablou de bord și control automat | Intermediar | Curând |
| 08 | Satelit API pentru monitorizarea culturilor | Avansat | Curând |
| 09 | Tabloul de bord de la fermă la furcă cu Streamlit | Intermediar | Curând |
| 10 | Reziliența lanțului de aprovizionare: optimizare cu instrumente OR | Avansat | Curând |
Regula 204 FSMA: Cea mai semnificativă schimbare de reglementare din 2011
Actul de modernizare a siguranței alimentare (FSMA) din 2011 a transformat abordarea FDA cu privire la siguranța alimentelor de la reactiv (răspunzând la contaminare) la preventiv (prevenirea acesteia). Secțiunea 204 a delegat FDA puterea de a identifica alimentele cu risc ridicat și de a impune cerințe suplimentare de trasabilitate. Rezultatul este Regulă finală privind trasabilitatea alimentelor, publicat în noiembrie 2022.
Scopul este radical: în cazul unui focar sau contaminare, operatorii trebuie să fie capabili să identifice întregul lanț de distribuție al unui lot — de la câmp până la furculiță — și să livreze înregistrările către FDA într-un format electronic care poate fi comandat de către 24 de ore de la solicitare. Înainte de FSMA 204, acest proces dura în medie 7-10 zile. Impactul asupra reducerii îmbolnăvirilor și deceselor cauzate de alimente contaminate este estimat de FDA la sute de cazuri evitate pe an.
Lista de trasabilitate a alimentelor (FTL): Alimentele implicate
FTL include alimente asociate istoric cu focare severe. Principalele categorii sunt:
| Categorie | Exemple specifice | Riscul primar |
|---|---|---|
| Brânzeturi proaspete | Moale/semi-moale nematurate, mozzarella proaspata, ricotta | Listeria monocytogenes |
| Ouă în coajă | Ouă de pui nepasteurizate | Salmonella Enteritidis |
| Fructe de mare - peste | Ton, somon, cod, peste spada (proaspat/congelat) | Scombroid, Listeria |
| Crustacee | Creveți, crab, homar | Vibrio, Norovirus |
| Moluște bivalve | Stridii, scoici, midii | Norovirus, Vibrio |
| Salate RTE | Salată de cartofi, ouă, fructe de mare | Listeria, Salmonella |
| Fructe și legume proaspete | Castraveți, ierburi, verdeață cu frunze, pepeni, ardei, varza, roșii, fructe tropicale | E. coli O157:H7, Salmonella |
| Unt de nuci | Unt de arahide, migdale, caju | Salmonella |
Notă importantă pentru exportatorii italieni
I brânzeturi proaspete (mozzarella, burrata, ricotta, stracchino) sunt în mod explicit în FTL. Companiile italiene care exportă aceste produse în SUA trebuie să implementeze trasabilitatea FSMA 204 pentru toate loturile destinate pieței americane. Brânzeturile tari mature (Parmigiano Reggiano, Grana Padano, Pecorino Romano) nu sunt în FTL actual.
Evenimente critice de urmărire (CTE)
CTE-urile sunt momentele din lanțul de aprovizionare în care trebuie înregistrată trasabilitatea. FSMA 204 definește șase principale, fiecare cu KDE-uri specifice:
| CTE | Descriere | Aplicabil la |
|---|---|---|
| Creştere | Colectarea produselor proaspete direct din teren | Fructe și legume, ierburi |
| Recoltarea | Recoltarea alimentelor necultivate (crustacee, pește sălbatic) | Fructe de mare sălbatice |
| Răcire | Prima operațiune de răcire după recoltare | Fructe și legume, fructe de mare |
| Ambalare inițială | Prima dată când produsul este ambalat în forma finală | Toate articolele FTL |
| Transport | Orice transfer de proprietate sau custodie | Toate articolele FTL |
| Primirea | Primirea articolelor FTL de la alt operator | Toate articolele FTL |
| Transformare | Când un articol FTL este încorporat într-un produs nou | Transformatori/producători |
Elemente de date cheie (KDE) pentru CTE
Pentru fiecare CTE, regula definește KDE-uri obligatorii. Exemplu pentru Livrare CTE:
- Codul lotului de trasabilitate (TLC) — identificator unic de lot
- Cantitatea și unitatea de măsură livrate
- Data expedierii
- Descrierea locației punctului de expediere (referința sursei TLC)
- Numărul de referință al documentului de transport
- Numele și adresa destinatarului
- Descrierea alimentului (inclusiv numărul de înregistrare a FDA Food Facility)
Pentru Primirea CTE:
- TLC primit (de la expedierea anterioară)
- Cantitatea primita
- Data primirii
- Amplasarea punctului de recepție
- Referire la documentul de expediere
- Sursa TLC (origine lot)
Arhitectura sistemului de conformitate FSMA 204
Un sistem de conformitate FSMA 204 în producție trebuie să gestioneze trei fluxuri principale: cel captarea datelor în timp real în timpul operațiunilor, cel prelucrare si depozitare a evenimentelor într-un mod imuabil, iar cel răspuns rapid în cazul solicitării FDA sau a retragerii. Arhitectura pe care o propunem este bazată pe evenimente și nativă în cloud:
Stiva de tehnologie
- Stratul API: FastAPI (Python 3.11+) — punct final REST pentru înregistrarea CTE
- Baza de date primară: PostgreSQL 16 — schemă relațională pentru trasabilitate
- Streaming de evenimente: Apache Kafka — evenimente CTE ca mesaje imuabile
- Traversare grafică: NetworkX sau Neo4j — urmărirea/urmărirea înainte a lanțului de aprovizionare
- Motor de alertă: Python + Țelină + Redis — reguli asincrone multicanal
- Gestionarea retragerii: FastAPI + proceduri stocate PostgreSQL
- Raportare: generarea automată a notificării de rechemare FDA în JSON/XML
Fluxul de la capăt la capăt:
- Captură de date: operatorii înregistrează CTE prin intermediul API-ului mobil/web sau al integrării ERP
- Procesarea evenimentului: fiecare CTE este validat, îmbogățit și publicat pe Kafka
- Graficul de trasabilitate: graficul genealogic al loturilor este actualizat în timp real
- Motor de alertă: regulile de eveniment detectează anomalii și trimit notificări
- Managementul retragerii: în cazul unei alerte critice, pornește automat fluxul de lucru de rechemare
- Raportare FDA: generarea listei sortabile în format electronic în 24 de ore
Proiectarea bazei de date PostgreSQL pentru trasabilitate FSMA 204
Schema trebuie să îndeplinească două cerințe opuse: performanță pentru operațiunile zilnice (inserție CTE, interogare de afaceri) e viteză extremă pentru interogările de urmărire în caz de rechemare. Folosim PostgreSQL cu o combinație de tabele relaționale și indici JSONB pentru variabilele KDE pentru CTE.
-- ============================================================
-- FSMA 204 Compliance Database Schema
-- PostgreSQL 16
-- ============================================================
-- Estensione per UUID generation
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "ltree"; -- Per path-based queries su genealogia
-- ============================================================
-- TABELLA: locations
-- Tutti i luoghi della filiera (aziende, magazzini, porti)
-- ============================================================
CREATE TABLE locations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fda_facility_id VARCHAR(12) UNIQUE, -- FDA Food Facility Registration Number
name VARCHAR(255) NOT NULL,
address_line1 VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
country_code CHAR(2) NOT NULL, -- ISO 3166-1 alpha-2
postal_code VARCHAR(20),
location_type VARCHAR(50) NOT NULL
CHECK (location_type IN (
'farm', 'packing_house', 'processor',
'distributor', 'retailer', 'port', 'cold_storage'
)),
contact_email VARCHAR(255),
contact_phone VARCHAR(50),
coordinates POINT, -- lat/lon per geo-queries
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: food_items
-- Catalogo alimenti FTL con classificazione FSMA
-- ============================================================
CREATE TABLE food_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
ftl_category VARCHAR(50) NOT NULL
CHECK (ftl_category IN (
'fresh_cheese', 'shell_eggs', 'finfish',
'crustaceans', 'bivalve_mollusks', 'nut_butter',
'rte_deli_salads', 'fresh_produce'
)),
name VARCHAR(255) NOT NULL,
description TEXT,
fda_product_code VARCHAR(7), -- FDA product code
unit_of_measure VARCHAR(20) NOT NULL DEFAULT 'kg',
storage_temp_min DECIMAL(5,2), -- Celsius
storage_temp_max DECIMAL(5,2),
shelf_life_days INTEGER,
is_ftl_covered BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: traceability_lots
-- Core: ogni lotto tracciabile con TLC (Traceability Lot Code)
-- ============================================================
CREATE TABLE traceability_lots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tlc VARCHAR(50) UNIQUE NOT NULL, -- Traceability Lot Code
food_item_id UUID NOT NULL REFERENCES food_items(id),
origin_location_id UUID NOT NULL REFERENCES locations(id),
-- Quantità e dimensione lotto
initial_quantity DECIMAL(12,4) NOT NULL,
remaining_quantity DECIMAL(12,4) NOT NULL,
unit_of_measure VARCHAR(20) NOT NULL,
-- Date critiche
production_date DATE,
harvest_date DATE,
best_before_date DATE,
expiry_date DATE,
-- Genealogia: da quali lotti parent e stato derivato
parent_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
genealogy_path ltree, -- Es: "LOT_A.LOT_B.LOT_C"
-- Stato del lotto
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN (
'active', 'recalled', 'quarantined',
'consumed', 'disposed', 'archived'
)),
-- Attributi FDA specifici
tlc_source_reference TEXT, -- Descrizione provenienza TLC
-- Metadati extra (variabili per tipo alimento)
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indice per query di genealogia rapida
CREATE INDEX idx_lots_genealogy ON traceability_lots USING GIST (genealogy_path);
CREATE INDEX idx_lots_status ON traceability_lots(status);
CREATE INDEX idx_lots_tlc ON traceability_lots(tlc);
CREATE INDEX idx_lots_food_item ON traceability_lots(food_item_id);
CREATE INDEX idx_lots_origin ON traceability_lots(origin_location_id);
CREATE INDEX idx_lots_attributes ON traceability_lots USING GIN (attributes);
-- ============================================================
-- TABELLA: cte_events
-- Registro immutabile di tutti i Critical Tracking Events
-- Questa tabella NON deve mai avere UPDATE o DELETE
-- ============================================================
CREATE TABLE cte_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'growing', 'harvesting', 'cooling',
'initial_packing', 'shipping', 'receiving',
'transformation'
)),
-- Lotti coinvolti
lot_id UUID NOT NULL REFERENCES traceability_lots(id),
related_lot_ids UUID[] DEFAULT ARRAY[]::UUID[], -- per transformation
-- Localizzazione evento
location_id UUID NOT NULL REFERENCES locations(id),
-- Timestamp evento (quando e accaduto, non quando e stato registrato)
event_timestamp TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW(),
recorded_by VARCHAR(255) NOT NULL, -- user/system che ha registrato
-- KDE come JSONB (struttura varia per tipo CTE)
kde JSONB NOT NULL,
-- Per spedizioni: destinatario
destination_location_id UUID REFERENCES locations(id),
-- Documento di riferimento (bill of lading, PO, etc.)
reference_document_number VARCHAR(100),
reference_document_type VARCHAR(50),
-- Quantità movimentata
quantity DECIMAL(12,4),
unit_of_measure VARCHAR(20),
-- Hash per immutabilita (SHA-256 del payload)
event_hash CHAR(64) NOT NULL,
-- Firma digitale opzionale
digital_signature TEXT,
-- Metadati
notes TEXT,
metadata JSONB DEFAULT '{}'
-- NO updated_at: questo record e immutabile
);
-- Gli eventi NON si modificano: trigger di protezione
CREATE RULE no_update_cte AS ON UPDATE TO cte_events DO INSTEAD NOTHING;
CREATE RULE no_delete_cte AS ON DELETE TO cte_events DO INSTEAD NOTHING;
CREATE INDEX idx_cte_lot ON cte_events(lot_id);
CREATE INDEX idx_cte_type ON cte_events(event_type);
CREATE INDEX idx_cte_timestamp ON cte_events(event_timestamp DESC);
CREATE INDEX idx_cte_location ON cte_events(location_id);
CREATE INDEX idx_cte_destination ON cte_events(destination_location_id);
CREATE INDEX idx_cte_kde ON cte_events USING GIN (kde);
-- ============================================================
-- TABELLA: recall_events
-- Gestione recall con workflow completo
-- ============================================================
CREATE TABLE recall_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
recall_number VARCHAR(20) UNIQUE NOT NULL, -- Es: "RECALL-2026-001"
recall_type VARCHAR(20) NOT NULL
CHECK (recall_type IN ('class_i', 'class_ii', 'class_iii', 'market_withdrawal')),
-- Root cause
trigger_lot_ids UUID[] NOT NULL, -- Lotti che hanno scatenato il recall
trigger_reason VARCHAR(50) NOT NULL
CHECK (trigger_reason IN (
'pathogen_contamination', 'allergen_undeclared',
'foreign_material', 'chemical_contamination',
'mislabeling', 'temperature_abuse', 'other'
)),
trigger_description TEXT NOT NULL,
-- Scope calcolato automaticamente
affected_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
affected_quantity DECIMAL(12,4),
affected_unit VARCHAR(20),
-- Workflow stato
status VARCHAR(30) NOT NULL DEFAULT 'identified'
CHECK (status IN (
'identified', 'scope_determined', 'notifications_sent',
'removal_in_progress', 'effectiveness_check',
'closed', 'fda_reported'
)),
-- Timestamp workflow
identified_at TIMESTAMPTZ DEFAULT NOW(),
scope_determined_at TIMESTAMPTZ,
notifications_sent_at TIMESTAMPTZ,
fda_reported_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
-- FDA notification (entro 24h da richiesta)
fda_notified BOOLEAN DEFAULT false,
fda_report_json JSONB,
-- Responsabile
initiated_by VARCHAR(255) NOT NULL,
-- Metriche drill
traceback_seconds INTEGER, -- Tempo per completare traceback
traceforward_seconds INTEGER,
lots_traced_pct DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alert_rules
-- Configurazione regole per alert engine
-- ============================================================
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
description TEXT,
rule_type VARCHAR(30) NOT NULL
CHECK (rule_type IN (
'temperature_breach', 'lot_age', 'missing_cte',
'quantity_discrepancy', 'supplier_blacklist',
'pathogen_alert', 'custom'
)),
condition_json JSONB NOT NULL, -- Regola in formato JSON
severity VARCHAR(10) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
channels TEXT[] NOT NULL, -- ['email', 'sms', 'webhook', 'slack']
recipients JSONB NOT NULL, -- {"email": [...], "phone": [...]}
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alerts
-- Alert generati dall'engine
-- ============================================================
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
rule_id UUID REFERENCES alert_rules(id),
lot_id UUID REFERENCES traceability_lots(id),
cte_event_id UUID REFERENCES cte_events(id),
severity VARCHAR(10) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
data JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'open'
CHECK (status IN ('open', 'acknowledged', 'resolved', 'escalated')),
acknowledged_by VARCHAR(255),
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
auto_recall_triggered BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_alerts_lot ON alerts(lot_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_status ON alerts(status);
CREATE INDEX idx_alerts_created ON alerts(created_at DESC);
API REST cu FastAPI: modele Pydantic și puncte finale CTE
API-ul este punctul de intrare pentru toți operatorii din lanțul de aprovizionare. Folosim FastAPI pentru viteza sa, validarea automată cu Pydantic și generarea automată a documentației OpenAPI - utilă pentru auditurile FDA. Șabloanele Pydantic reflectă KDE-urile cerute de regulă.
# fsma204/models.py
# Pydantic v2 models per FSMA 204 API
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
from uuid import UUID
from enum import Enum
class CTEEventType(str, Enum):
GROWING = "growing"
HARVESTING = "harvesting"
COOLING = "cooling"
INITIAL_PACKING = "initial_packing"
SHIPPING = "shipping"
RECEIVING = "receiving"
TRANSFORMATION = "transformation"
class FTLCategory(str, Enum):
FRESH_CHEESE = "fresh_cheese"
SHELL_EGGS = "shell_eggs"
FINFISH = "finfish"
CRUSTACEANS = "crustaceans"
BIVALVE_MOLLUSKS = "bivalve_mollusks"
NUT_BUTTER = "nut_butter"
RTE_DELI_SALADS = "rte_deli_salads"
FRESH_PRODUCE = "fresh_produce"
class LotStatus(str, Enum):
ACTIVE = "active"
RECALLED = "recalled"
QUARANTINED = "quarantined"
CONSUMED = "consumed"
DISPOSED = "disposed"
# ── KDE Models per CTE type ──────────────────────────────────
class ShippingKDE(BaseModel):
"""Key Data Elements per Shipping CTE - FSMA 204 §1.1330"""
tlc: str = Field(..., description="Traceability Lot Code")
quantity_shipped: float = Field(..., gt=0)
unit_of_measure: str
ship_date: date
tlc_source_reference: str = Field(..., description="Descrizione provenienza TLC")
transport_document_number: str
transport_document_type: str = Field(default="bill_of_lading")
receiver_fda_facility_id: str = Field(..., description="FDA registration del destinatario")
receiver_name: str
receiver_address: str
class ReceivingKDE(BaseModel):
"""Key Data Elements per Receiving CTE - FSMA 204 §1.1335"""
tlc_received: str = Field(..., description="TLC come apparso nel documento di spedizione")
quantity_received: float = Field(..., gt=0)
unit_of_measure: str
receive_date: date
location_description: str
reference_document_number: str
reference_document_type: str
lot_code_generator_location: Optional[str] = None
class GrowingKDE(BaseModel):
"""Key Data Elements per Growing CTE"""
tlc: str
harvest_date: date
field_id: str # Identificativo campo/parcella
grower_fda_facility_id: str
commodity: str
growing_method: str = "conventional" # conventional, organic, hydroponic
class InitialPackingKDE(BaseModel):
"""Key Data Elements per Initial Packing CTE"""
tlc: str
pack_date: date
quantity_packed: float
unit_of_measure: str
packaging_type: str
facility_fda_id: str
lot_code_generator_location: str
class TransformationKDE(BaseModel):
"""Key Data Elements per Transformation CTE"""
new_tlc: str # TLC del nuovo prodotto
input_tlcs: List[str] # TLC degli ingredienti usati
transformation_date: date
facility_fda_id: str
product_description: str
quantity_produced: float
unit_of_measure: str
# ── Request/Response Models ──────────────────────────────────
class CreateLotRequest(BaseModel):
food_item_id: UUID
origin_location_id: UUID
tlc: str = Field(..., min_length=3, max_length=50)
initial_quantity: float = Field(..., gt=0)
unit_of_measure: str = Field(default="kg")
production_date: Optional[date] = None
harvest_date: Optional[date] = None
best_before_date: Optional[date] = None
expiry_date: Optional[date] = None
parent_lot_ids: List[UUID] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
@field_validator('tlc')
@classmethod
def validate_tlc(cls, v: str) -> str:
"""TLC non deve contenere caratteri ambigui"""
forbidden = set('IO0l') # caratteri ambigui
if any(c in forbidden for c in v.upper()):
raise ValueError("TLC contiene caratteri ambigui (I, O, 0, l)")
return v.upper()
class RegisterCTERequest(BaseModel):
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_by: str = Field(..., min_length=3)
kde: Dict[str, Any] = Field(..., description="KDE specifici per il tipo di CTE")
destination_location_id: Optional[UUID] = None
reference_document_number: Optional[str] = None
reference_document_type: Optional[str] = None
quantity: Optional[float] = None
unit_of_measure: Optional[str] = None
notes: Optional[str] = None
class CTEEventResponse(BaseModel):
id: UUID
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_at: datetime
recorded_by: str
kde: Dict[str, Any]
event_hash: str
class TracebackRequest(BaseModel):
lot_id: UUID
depth: int = Field(default=10, ge=1, le=50)
class TracebackResult(BaseModel):
lot_id: UUID
tlc: str
ancestors: List[Dict[str, Any]] # lotti upstream
descendants: List[Dict[str, Any]] # lotti downstream
events: List[Dict[str, Any]]
total_ancestors: int
total_descendants: int
trace_depth: int
computation_ms: int
class SortableListResponse(BaseModel):
"""Formato sortable list richiesto da FDA (§1.1375)"""
generated_at: datetime
requesting_lots: List[str] # TLC richiesti
records: List[Dict[str, Any]] # Record ordinabili
total_records: int
format_version: str = "FSMA204_v1"
# fsma204/api.py
# FastAPI application per FSMA 204 compliance
import hashlib
import json
import time
from uuid import UUID
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .models import (
CreateLotRequest, RegisterCTERequest,
CTEEventResponse, TracebackRequest,
TracebackResult, SortableListResponse
)
from .database import get_db
from .kafka_producer import publish_cte_event
from .alert_engine import check_alert_rules
app = FastAPI(
title="FSMA 204 Traceability API",
description="Food Traceability Compliance System per FDA Rule 204",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def compute_event_hash(payload: dict) -> str:
"""SHA-256 hash del payload CTE per immutabilita"""
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Lot Management ──────────────────────────────────────────
@app.post("/api/v1/lots", status_code=201)
async def create_lot(
request: CreateLotRequest,
db: AsyncSession = Depends(get_db)
):
"""
Crea un nuovo lotto tracciabile.
Il TLC deve essere unico nel sistema.
"""
# Verifica unicita TLC
result = await db.execute(
text("SELECT id FROM traceability_lots WHERE tlc = :tlc"),
{"tlc": request.tlc}
)
if result.fetchone():
raise HTTPException(
status_code=409,
detail=f"TLC '{request.tlc}' già esistente nel sistema"
)
# Costruisce genealogy_path
genealogy_path = request.tlc
if request.parent_lot_ids:
# Recupera path del parent principale
parent_result = await db.execute(
text("SELECT genealogy_path, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.parent_lot_ids[0])}
)
parent = parent_result.fetchone()
if parent and parent.genealogy_path:
genealogy_path = f"{parent.genealogy_path}.{request.tlc}"
await db.execute(
text("""
INSERT INTO traceability_lots (
food_item_id, origin_location_id, tlc,
initial_quantity, remaining_quantity, unit_of_measure,
production_date, harvest_date, best_before_date, expiry_date,
parent_lot_ids, genealogy_path, attributes
) VALUES (
:food_item_id, :origin_location_id, :tlc,
:initial_quantity, :initial_quantity, :unit_of_measure,
:production_date, :harvest_date, :best_before_date, :expiry_date,
:parent_lot_ids::uuid[], :genealogy_path::ltree, :attributes::jsonb
)
"""),
{
"food_item_id": str(request.food_item_id),
"origin_location_id": str(request.origin_location_id),
"tlc": request.tlc,
"initial_quantity": request.initial_quantity,
"unit_of_measure": request.unit_of_measure,
"production_date": request.production_date,
"harvest_date": request.harvest_date,
"best_before_date": request.best_before_date,
"expiry_date": request.expiry_date,
"parent_lot_ids": [str(x) for x in request.parent_lot_ids],
"genealogy_path": genealogy_path,
"attributes": json.dumps(request.attributes)
}
)
await db.commit()
return {"message": "Lotto creato", "tlc": request.tlc}
# ── CTE Registration ────────────────────────────────────────
@app.post("/api/v1/cte-events", response_model=CTEEventResponse, status_code=201)
async def register_cte_event(
request: RegisterCTERequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Registra un Critical Tracking Event.
Il record e IMMUTABILE una volta creato (no update/delete).
"""
# Verifica che il lotto esista e sia attivo
lot_result = await db.execute(
text("SELECT id, status, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.lot_id)}
)
lot = lot_result.fetchone()
if not lot:
raise HTTPException(status_code=404, detail="Lotto non trovato")
if lot.status in ('recalled', 'disposed'):
raise HTTPException(
status_code=422,
detail=f"Impossibile registrare CTE su lotto con stato '{lot.status}'"
)
# Calcola hash per immutabilita
payload = {
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp.isoformat(),
"kde": request.kde
}
event_hash = compute_event_hash(payload)
# Inserisce CTE (immutabile)
result = await db.execute(
text("""
INSERT INTO cte_events (
event_type, lot_id, location_id,
event_timestamp, recorded_by, kde,
destination_location_id, reference_document_number,
reference_document_type, quantity, unit_of_measure,
event_hash, notes
) VALUES (
:event_type, :lot_id, :location_id,
:event_timestamp, :recorded_by, :kde::jsonb,
:destination_location_id, :reference_document_number,
:reference_document_type, :quantity, :unit_of_measure,
:event_hash, :notes
)
RETURNING id, recorded_at
"""),
{
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp,
"recorded_by": request.recorded_by,
"kde": json.dumps(request.kde),
"destination_location_id": (
str(request.destination_location_id)
if request.destination_location_id else None
),
"reference_document_number": request.reference_document_number,
"reference_document_type": request.reference_document_type,
"quantity": request.quantity,
"unit_of_measure": request.unit_of_measure,
"event_hash": event_hash,
"notes": request.notes
}
)
row = result.fetchone()
await db.commit()
# Aggiorna remaining_quantity per spedizioni
if request.event_type == "shipping" and request.quantity:
await db.execute(
text("""
UPDATE traceability_lots
SET remaining_quantity = remaining_quantity - :qty
WHERE id = :id
"""),
{"qty": request.quantity, "id": str(request.lot_id)}
)
await db.commit()
# Background: pubblica su Kafka e verifica alert rules
event_data = {
"id": str(row.id),
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"tlc": lot.tlc,
"kde": request.kde,
"event_timestamp": request.event_timestamp.isoformat()
}
background_tasks.add_task(publish_cte_event, event_data)
background_tasks.add_task(check_alert_rules, event_data, str(request.lot_id))
return CTEEventResponse(
id=row.id,
event_type=request.event_type,
lot_id=request.lot_id,
location_id=request.location_id,
event_timestamp=request.event_timestamp,
recorded_at=row.recorded_at,
recorded_by=request.recorded_by,
kde=request.kde,
event_hash=event_hash
)
# ── Traceback / Traceforward ────────────────────────────────
@app.get("/api/v1/lots/{lot_id}/traceback")
async def traceback(lot_id: UUID, depth: int = 10, db: AsyncSession = Depends(get_db)):
"""
One-step-up AND full traceback: risale la filiera upstream.
Requisito FDA: completare entro 24h da richiesta.
"""
start_ms = time.monotonic()
# Recupera lotto root
root_result = await db.execute(
text("SELECT id, tlc, genealogy_path FROM traceability_lots WHERE id = :id"),
{"id": str(lot_id)}
)
root = root_result.fetchone()
if not root:
raise HTTPException(status_code=404, detail="Lotto non trovato")
# Usa ltree per trovare tutti gli antenati
ancestors_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id, initial_quantity
FROM traceability_lots
WHERE genealogy_path @> :path::ltree
AND id != :lot_id
ORDER BY nlevel(genealogy_path) ASC
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
ancestors = [dict(r._mapping) for r in ancestors_result.fetchall()]
# Trova tutti i discendenti (traceforward)
descendants_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id
FROM traceability_lots
WHERE genealogy_path <@ :path::ltree
AND id != :lot_id
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
descendants = [dict(r._mapping) for r in descendants_result.fetchall()]
# Recupera tutti gli eventi CTE per il lotto
events_result = await db.execute(
text("""
SELECT event_type, event_timestamp, recorded_by,
kde, location_id, event_hash
FROM cte_events
WHERE lot_id = :lot_id
ORDER BY event_timestamp ASC
"""),
{"lot_id": str(lot_id)}
)
events = [dict(r._mapping) for r in events_result.fetchall()]
elapsed_ms = int((time.monotonic() - start_ms) * 1000)
return TracebackResult(
lot_id=lot_id,
tlc=root.tlc,
ancestors=ancestors,
descendants=descendants,
events=events,
total_ancestors=len(ancestors),
total_descendants=len(descendants),
trace_depth=depth,
computation_ms=elapsed_ms
)
# ── FDA Sortable List ───────────────────────────────────────
@app.get("/api/v1/lots/sortable-list", response_model=SortableListResponse)
async def fda_sortable_list(
tlcs: str, # comma-separated TLC list
db: AsyncSession = Depends(get_db)
):
"""
Genera il sortable list in formato elettronico richiesto da FDA.
Deve essere producibile entro 24h da richiesta FDA (§1.1375).
"""
tlc_list = [t.strip() for t in tlcs.split(",")]
# Recupera tutti i record CTE per i TLC richiesti
records_result = await db.execute(
text("""
SELECT
tl.tlc,
ce.event_type,
ce.event_timestamp,
ce.recorded_by,
ce.kde,
ce.quantity,
ce.unit_of_measure,
l.name AS location_name,
l.fda_facility_id,
ce.reference_document_number,
ce.event_hash
FROM cte_events ce
JOIN traceability_lots tl ON ce.lot_id = tl.id
JOIN locations l ON ce.location_id = l.id
WHERE tl.tlc = ANY(:tlcs)
ORDER BY tl.tlc, ce.event_timestamp ASC
"""),
{"tlcs": tlc_list}
)
records = []
for row in records_result.fetchall():
r = dict(row._mapping)
r['event_timestamp'] = r['event_timestamp'].isoformat()
records.append(r)
return SortableListResponse(
generated_at=__import__('datetime').datetime.utcnow(),
requesting_lots=tlc_list,
records=records,
total_records=len(records)
)
Aprovizionare de evenimente imuabile cu Apache Kafka
Natura imuabilă a CTE-urilor se potrivește perfect cu modelul Event Sourcing. Fiecare eveniment este un fapt imuabil despre ceea ce s-a întâmplat în lanțul de aprovizionare. Kafka se asigură că aceste evenimente sunt:
- Durabil: retenție configurabilă (de exemplu, 7 ani pentru FSMA 204)
- Rejucat: pentru a reconstrui statul în orice punct istoric
- Sortat după partiție: garanție pentru comandă de lot (partiție prin TLC)
- Distribuit: scalabil la milioane de evenimente pe zi
# fsma204/kafka_producer.py
# Kafka producer per CTE events
import json
import logging
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from datetime import datetime
logger = logging.getLogger(__name__)
# Schema Avro per CTE event (per Schema Registry)
CTE_EVENT_SCHEMA = """
{
"type": "record",
"name": "CTEEvent",
"namespace": "com.fsma204.traceability",
"fields": [
{"name": "id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "lot_id", "type": "string"},
{"name": "tlc", "type": "string"},
{"name": "event_timestamp", "type": "string"},
{"name": "kde", "type": "string"},
{"name": "event_hash", "type": "string"}
]
}
"""
# Configurazione Kafka
kafka_config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'fsma204-producer',
'acks': 'all', # Garanzia durabilita
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'snappy'
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
logger.error(f"Kafka delivery failed: {err}")
else:
logger.debug(
f"CTE event delivered to {msg.topic()} "
f"partition {msg.partition()} offset {msg.offset()}"
)
async def publish_cte_event(event_data: dict):
"""
Pubblica un CTE event su Kafka.
Usa il TLC come partition key per garantire ordine per lotto.
"""
try:
message = json.dumps(event_data, default=str).encode('utf-8')
# Partition key = TLC: tutti gli eventi dello stesso lotto
# vanno alla stessa partizione, garantendo ordinamento
partition_key = event_data.get('tlc', event_data['lot_id'])
producer.produce(
topic='fsma204.cte.events',
key=partition_key.encode('utf-8'),
value=message,
callback=delivery_callback
)
producer.poll(0) # Flush asincrono
except Exception as e:
logger.error(f"Failed to publish CTE event: {e}", exc_info=True)
# ── Consumer per Alert Engine ───────────────────────────────
# fsma204/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import asyncio
async def start_cte_consumer():
"""
Consumer Kafka per il real-time alert processing.
Eseguito come background service separato dall'API.
"""
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'fsma204-alert-engine',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Commit manuale per at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['fsma204.cte.events'])
logger.info("CTE Consumer avviato, in ascolto su fsma204.cte.events")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
await process_cte_for_alerts(event)
consumer.commit(msg)
except Exception as e:
logger.error(f"Error processing CTE event: {e}", exc_info=True)
# Non committiamo: l'evento verrà riprocessato
finally:
consumer.close()
async def process_cte_for_alerts(event: dict):
"""Pipeline di processing per ogni CTE event"""
event_type = event['event_type']
kde = json.loads(event.get('kde', '{}'))
# 1. Controlla cold chain per eventi di spedizione/ricezione
if event_type in ('shipping', 'receiving'):
await check_cold_chain_continuity(event, kde)
# 2. Verifica completezza KDE per il tipo di evento
await validate_kde_completeness(event_type, kde, event['lot_id'])
# 3. Controlla se il fornitore e in blacklist
if 'supplier_fda_id' in kde:
await check_supplier_blacklist(kde['supplier_fda_id'], event['lot_id'])
# 4. Aggiorna grafo di tracciabilita in memoria (NetworkX)
await update_traceability_graph(event)
Motor de alertă: detectarea anomaliilor în timp real
Motorul de alertă monitorizează fluxul continuu al evenimentelor CTE și aplică reguli configurabile. Notificările sunt trimise pe mai multe canale în funcție de gravitate. O alertă critical poate declanșa automat un flux de lucru de rechemare.
# fsma204/alert_engine.py
# Alert engine per FSMA 204 compliance monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
import httpx
import smtplib
from email.mime.text import MIMEText
from twilio.rest import Client as TwilioClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .database import get_db_session
logger = logging.getLogger(__name__)
class AlertSeverity:
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertRule:
"""Base class per le regole di alert"""
def __init__(self, rule_id: str, name: str, severity: str):
self.rule_id = rule_id
self.name = name
self.severity = severity
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
"""Ritorna un dict con i dettagli dell'alert, o None se ok"""
raise NotImplementedError
class ColdChainBreachRule(AlertRule):
"""
Rileva interruzioni nella catena del freddo.
Regola: se la temperatura registrata nei KDE supera la soglia
prevista per la categoria di alimento.
"""
TEMP_THRESHOLDS = {
'fresh_cheese': {'min': 1.0, 'max': 7.0},
'finfish': {'min': -2.0, 'max': 4.0},
'crustaceans': {'min': 0.0, 'max': 4.0},
'shell_eggs': {'min': 5.0, 'max': 7.2}, # FDA: < 45°F
'fresh_produce': {'min': 1.0, 'max': 10.0},
}
def __init__(self):
super().__init__("cold_chain_breach", "Cold Chain Breach", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
temp_recorded = kde.get('temperature_celsius')
if temp_recorded is None:
return None
# Recupera categoria alimento del lotto
result = await db.execute(
text("""
SELECT fi.ftl_category
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
WHERE tl.id = :lot_id
"""),
{"lot_id": event['lot_id']}
)
row = result.fetchone()
if not row:
return None
category = row.ftl_category
threshold = self.TEMP_THRESHOLDS.get(category)
if not threshold:
return None
breach = False
breach_detail = ""
if temp_recorded < threshold['min']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sotto minimo {threshold['min']}°C"
)
elif temp_recorded > threshold['max']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sopra massimo {threshold['max']}°C"
)
if breach:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Cold Chain Breach - TLC {event.get('tlc', 'N/A')}",
"description": (
f"{breach_detail} per categoria {category}. "
f"CTE: {event['event_type']}, "
f"timestamp: {event['event_timestamp']}"
),
"lot_id": event['lot_id'],
"data": {
"temperature": temp_recorded,
"threshold": threshold,
"category": category,
"event_type": event['event_type']
}
}
return None
class MissingKDERule(AlertRule):
"""
Verifica che tutti i KDE obbligatori siano presenti per il tipo CTE.
Mancanza di KDE = non-compliance FSMA 204.
"""
REQUIRED_KDE = {
'shipping': ['tlc', 'quantity_shipped', 'ship_date', 'receiver_name'],
'receiving': ['tlc_received', 'quantity_received', 'receive_date'],
'initial_packing': ['tlc', 'pack_date', 'quantity_packed', 'facility_fda_id'],
'growing': ['tlc', 'harvest_date', 'field_id', 'grower_fda_facility_id'],
'transformation': ['new_tlc', 'input_tlcs', 'transformation_date'],
}
def __init__(self):
super().__init__("missing_kde", "Missing KDE Fields", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
event_type = event['event_type']
required = self.REQUIRED_KDE.get(event_type, [])
if not required:
return None
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
missing = [f for f in required if f not in kde or kde[f] is None]
if missing:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"KDE Mancanti - CTE {event_type}",
"description": (
f"CTE {event_type} per TLC {event.get('tlc')} "
f"manca dei KDE obbligatori: {', '.join(missing)}"
),
"lot_id": event['lot_id'],
"data": {"missing_fields": missing, "event_type": event_type}
}
return None
# ── Notification System ──────────────────────────────────────
class MultiChannelNotifier:
def __init__(self, config: dict):
self.config = config
self.twilio = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
async def send_alert(self, alert_data: dict, rule_config: dict):
channels = rule_config.get('channels', ['email'])
severity = alert_data['severity']
tasks = []
if 'email' in channels:
tasks.append(self._send_email(alert_data, rule_config))
if 'sms' in channels and severity in ('high', 'critical'):
tasks.append(self._send_sms(alert_data, rule_config))
if 'webhook' in channels:
tasks.append(self._send_webhook(alert_data, rule_config))
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
logger.error(f"Notification failed: {r}")
async def _send_email(self, alert: dict, config: dict):
recipients = config.get('recipients', {}).get('email', [])
msg = MIMEText(
f"FSMA 204 Alert\n\n"
f"Severity: {alert['severity'].upper()}\n"
f"Title: {alert['title']}\n"
f"Description: {alert['description']}\n"
f"Lot ID: {alert['lot_id']}\n"
f"Timestamp: {datetime.utcnow().isoformat()}"
)
msg['Subject'] = f"[FSMA204-{alert['severity'].upper()}] {alert['title']}"
msg['From'] = self.config['smtp_from']
msg['To'] = ', '.join(recipients)
# In produzione usare asyncio-friendly SMTP
with smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.sendmail(self.config['smtp_from'], recipients, msg.as_string())
async def _send_sms(self, alert: dict, config: dict):
phones = config.get('recipients', {}).get('phone', [])
message_body = (
f"FSMA204 ALERT {alert['severity'].upper()}: "
f"{alert['title']} - Lot: {alert['lot_id']}"
)
for phone in phones:
self.twilio.messages.create(
body=message_body,
from_=self.config['twilio_from'],
to=phone
)
async def _send_webhook(self, alert: dict, config: dict):
webhook_url = config.get('recipients', {}).get('webhook_url')
if not webhook_url:
return
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"event": "fsma204_alert",
"severity": alert['severity'],
"data": alert
},
timeout=10.0
)
# ── Main check function ──────────────────────────────────────
ACTIVE_RULES = [
ColdChainBreachRule(),
MissingKDERule(),
# Aggiungere: SupplierBlacklistRule(), LotAgeRule(), etc.
]
async def check_alert_rules(event: dict, lot_id: str):
"""Chiamato come background task dopo ogni registrazione CTE"""
async with get_db_session() as db:
for rule in ACTIVE_RULES:
try:
alert_data = await rule.evaluate(event, db)
if alert_data:
# Salva alert su DB
await db.execute(
text("""
INSERT INTO alerts (rule_id, lot_id, severity, title, description, data)
SELECT ar.id, :lot_id, :severity, :title, :description, :data::jsonb
FROM alert_rules ar WHERE ar.name = :rule_name
"""),
{
"lot_id": lot_id,
"severity": alert_data['severity'],
"title": alert_data['title'],
"description": alert_data['description'],
"data": json.dumps(alert_data.get('data', {})),
"rule_name": rule.name
}
)
await db.commit()
# Auto-trigger recall se critical
if alert_data['severity'] == AlertSeverity.CRITICAL:
await auto_trigger_recall(lot_id, alert_data, db)
logger.warning(
f"Alert generato: {alert_data['severity']} - {alert_data['title']}"
)
except Exception as e:
logger.error(f"Error evaluating rule {rule.rule_id}: {e}", exc_info=True)
Gestionarea retragerii: fluxul de lucru complet în mai puțin de 24 de ore
Cea mai strictă cerință a FSMA 204 este capacitatea de a finaliza traceback și traceforward și de a livra FDA lista care poate fi sortată prin 24 de ore de la solicitare. Un flux de lucru de rechemare automată este singura modalitate de a garanta acest SLA la scară. În 2024, FDA a înregistrat 241 de retrageri de alimente, cu un timp mediu de finalizare de 73 de ore - cu mult peste cerințele noii reguli.
# fsma204/recall_manager.py
# Recall Management con workflow automatizzato
import asyncio
import json
import logging
from datetime import datetime
from uuid import UUID, uuid4
from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import networkx as nx
logger = logging.getLogger(__name__)
class RecallWorkflow:
"""
Gestisce il workflow completo di un recall FSMA 204.
Fasi:
1. IDENTIFIED - Identificazione del problema
2. SCOPE_DETERMINED - Calcolo scope (traceback + traceforward)
3. NOTIFICATIONS_SENT - Notifiche a tutti gli operatori coinvolti
4. REMOVAL_IN_PROGRESS - Rimozione dal mercato
5. EFFECTIVENESS_CHECK - Verifica efficacia
6. CLOSED / FDA_REPORTED
"""
def __init__(self, db: AsyncSession, notifier):
self.db = db
self.notifier = notifier
async def initiate_recall(
self,
trigger_lot_ids: List[UUID],
trigger_reason: str,
trigger_description: str,
initiated_by: str,
recall_type: str = "class_i"
) -> dict:
"""
Avvia un recall. Questo metodo deve completare
traceback + traceforward + scope in < 30 minuti
per rispettare il requisito 24h FDA complessivo.
"""
recall_number = self._generate_recall_number()
start_time = datetime.utcnow()
logger.info(
f"Recall {recall_number} avviato da {initiated_by} "
f"per {len(trigger_lot_ids)} lotti"
)
# Fase 1: Crea record recall
recall_id = await self._create_recall_record(
recall_number, trigger_lot_ids, trigger_reason,
trigger_description, initiated_by, recall_type
)
# Fase 2: Determina scope (traceback + traceforward)
scope_start = datetime.utcnow()
affected_lots = await self._determine_scope(trigger_lot_ids)
scope_seconds = int((datetime.utcnow() - scope_start).total_seconds())
# Fase 3: Aggiorna recall con scope e stati
await self._update_recall_scope(recall_id, affected_lots, scope_seconds)
# Fase 4: Genera FDA Recall Notice
fda_report = await self._generate_fda_report(
recall_number, trigger_lot_ids, affected_lots,
trigger_reason, trigger_description
)
# Fase 5: Marca lotti come recalled
await self._mark_lots_recalled(affected_lots)
# Fase 6: Invia notifiche a tutti gli operatori
notify_start = datetime.utcnow()
await self._send_recall_notifications(affected_lots, recall_number, trigger_description)
notify_seconds = int((datetime.utcnow() - notify_start).total_seconds())
# Aggiorna timestamp workflow
await self.db.execute(
text("""
UPDATE recall_events SET
scope_determined_at = NOW(),
notifications_sent_at = NOW(),
status = 'notifications_sent',
fda_report_json = :fda_report::jsonb,
traceback_seconds = :traceback_seconds,
lots_traced_pct = :lots_pct
WHERE id = :id
"""),
{
"id": str(recall_id),
"fda_report": json.dumps(fda_report),
"traceback_seconds": scope_seconds,
"lots_pct": 100.0 # aggiornare con verifica reale
}
)
await self.db.commit()
total_seconds = int((datetime.utcnow() - start_time).total_seconds())
logger.info(
f"Recall {recall_number} scope determinato in {scope_seconds}s, "
f"notifiche in {notify_seconds}s, totale {total_seconds}s"
)
return {
"recall_number": recall_number,
"recall_id": str(recall_id),
"affected_lots_count": len(affected_lots),
"scope_determination_seconds": scope_seconds,
"notification_seconds": notify_seconds,
"total_seconds": total_seconds,
"fda_report": fda_report
}
async def _determine_scope(self, trigger_lot_ids: List[UUID]) -> List[str]:
"""
Determina scope usando graph traversal.
Usa NetworkX per BFS su grafo di genealogia.
"""
# Costruisce grafo da DB
G = await self._build_traceability_graph()
affected = set()
for lot_id in trigger_lot_ids:
lot_str = str(lot_id)
if lot_str not in G:
continue
# Traceback: tutti gli antenati (upstream)
ancestors = nx.ancestors(G, lot_str)
affected.update(ancestors)
# Traceforward: tutti i discendenti (downstream)
descendants = nx.descendants(G, lot_str)
affected.update(descendants)
affected.add(lot_str)
return list(affected)
async def _build_traceability_graph(self) -> nx.DiGraph:
"""
Costruisce un grafo orientato della genealogia lotti
usando gli eventi CTE di tipo shipping/receiving/transformation.
"""
G = nx.DiGraph()
# Nodi: tutti i lotti attivi
lots_result = await self.db.execute(
text("SELECT id, tlc, status FROM traceability_lots WHERE status != 'archived'")
)
for row in lots_result.fetchall():
G.add_node(str(row.id), tlc=row.tlc, status=row.status)
# Archi: da shipping events
edges_result = await self.db.execute(
text("""
SELECT
s.lot_id AS from_lot,
r.lot_id AS to_lot
FROM cte_events s
JOIN cte_events r ON s.reference_document_number = r.reference_document_number
WHERE s.event_type = 'shipping'
AND r.event_type = 'receiving'
""")
)
for row in edges_result.fetchall():
G.add_edge(str(row.from_lot), str(row.to_lot))
# Archi: da transformation events
transform_result = await self.db.execute(
text("""
SELECT lot_id, related_lot_ids
FROM cte_events
WHERE event_type = 'transformation'
""")
)
for row in transform_result.fetchall():
for parent_id in (row.related_lot_ids or []):
G.add_edge(str(parent_id), str(row.lot_id))
return G
async def _generate_fda_report(
self,
recall_number: str,
trigger_lots: List[UUID],
affected_lots: List[str],
reason: str,
description: str
) -> dict:
"""
Genera il recall notice in formato FDA.
Struttura basata su FDA 21 CFR 7.46.
"""
# Recupera dettagli lotti coinvolti
lots_result = await self.db.execute(
text("""
SELECT tl.tlc, tl.initial_quantity, tl.unit_of_measure,
fi.name AS food_name, fi.ftl_category,
l.name AS origin_name, l.fda_facility_id,
tl.production_date, tl.best_before_date
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
JOIN locations l ON tl.origin_location_id = l.id
WHERE tl.id = ANY(:affected)
"""),
{"affected": affected_lots}
)
affected_details = [dict(r._mapping) for r in lots_result.fetchall()]
return {
"recall_number": recall_number,
"report_date": datetime.utcnow().isoformat(),
"regulation": "FSMA Section 204 - 21 CFR Part 1 Subpart S",
"recall_type": "Voluntary",
"reason": reason,
"description": description,
"affected_products": affected_details,
"total_affected_lots": len(affected_lots),
"fsma_compliance": {
"rule": "Food Traceability Final Rule",
"response_format": "Electronic Sortable",
"24h_compliance": True
}
}
def _generate_recall_number(self) -> str:
year = datetime.utcnow().year
unique = str(uuid4())[:8].upper()
return f"RECALL-{year}-{unique}"
async def _create_recall_record(
self, recall_number, trigger_lot_ids, reason,
description, initiated_by, recall_type
) -> UUID:
result = await self.db.execute(
text("""
INSERT INTO recall_events (
recall_number, recall_type, trigger_lot_ids,
trigger_reason, trigger_description, initiated_by
) VALUES (
:number, :type, :lots::uuid[],
:reason, :desc, :by
)
RETURNING id
"""),
{
"number": recall_number,
"type": recall_type,
"lots": [str(l) for l in trigger_lot_ids],
"reason": reason,
"desc": description,
"by": initiated_by
}
)
row = result.fetchone()
await self.db.commit()
return row.id
async def _mark_lots_recalled(self, affected_lot_ids: List[str]):
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'recalled', updated_at = NOW()
WHERE id = ANY(:ids::uuid[])
"""),
{"ids": affected_lot_ids}
)
await self.db.commit()
async def _update_recall_scope(self, recall_id, affected_lots, scope_seconds):
await self.db.execute(
text("""
UPDATE recall_events SET
affected_lot_ids = :lots::uuid[],
status = 'scope_determined',
traceback_seconds = :seconds
WHERE id = :id
"""),
{
"lots": affected_lots,
"seconds": scope_seconds,
"id": str(recall_id)
}
)
await self.db.commit()
async def _send_recall_notifications(
self,
affected_lot_ids: List[str],
recall_number: str,
description: str
):
"""Notifica tutti gli operatori che hanno movimentato i lotti coinvolti"""
# Recupera contatti di tutti i location coinvolti
contacts_result = await self.db.execute(
text("""
SELECT DISTINCT l.contact_email, l.contact_phone, l.name
FROM cte_events ce
JOIN locations l ON ce.location_id = l.id
WHERE ce.lot_id = ANY(:lots::uuid[])
AND l.contact_email IS NOT NULL
"""),
{"lots": affected_lot_ids}
)
for contact in contacts_result.fetchall():
alert_payload = {
"severity": "critical",
"title": f"RECALL NOTICE {recall_number}",
"description": (
f"I lotti che avete ricevuto/spedito sono coinvolti nel recall {recall_number}. "
f"Motivo: {description}. "
"Sospendere immediatamente la distribuzione e contattare il responsabile."
),
"lot_id": "multiple"
}
rule_config = {
"channels": ["email"],
"recipients": {
"email": [contact.contact_email],
"phone": [contact.contact_phone] if contact.contact_phone else []
}
}
try:
await self.notifier.send_alert(alert_payload, rule_config)
except Exception as e:
logger.error(f"Failed to notify {contact.name}: {e}")
Simulare de exercițiu de rechemare: testare periodică a performanței
FDA recomandă insistent i simulare de exercițiu de rechemare — simulări periodice ale procesului de rechemare pentru a verifica dacă sistemul respectă cerința de 24 de ore. Un exercițiu trebuie efectuat cel puțin o dată la 6 luni, iar rezultatele să fie documentate pentru audituri. Valorile cheie sunt: timpul total până la finalizare, procentul de loturi urmărite, timpul de determinare a domeniului de aplicare.
# fsma204/mock_recall_drill.py
# Mock recall drill con metriche performance
import asyncio
import random
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
logger = logging.getLogger(__name__)
class MockRecallDrill:
"""
Simula un recall scenario per testare le performance del sistema.
Il drill usa lotti MOCK (non production) con dati sintetici.
"""
def __init__(self, db, recall_workflow, lot_count: int = 100):
self.db = db
self.workflow = recall_workflow
self.lot_count = lot_count
self.drill_id = f"DRILL-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
async def run_drill(self) -> Dict[str, Any]:
"""
Esegue il drill completo e ritorna le metriche.
"""
logger.info(f"Avvio mock recall drill {self.drill_id}")
metrics = {
"drill_id": self.drill_id,
"started_at": datetime.utcnow().isoformat(),
"phases": {}
}
# Fase 1: Seed dati mock
phase_start = time.monotonic()
mock_lots = await self._seed_mock_data()
metrics['phases']['data_seeding'] = {
"duration_seconds": round(time.monotonic() - phase_start, 2),
"lots_created": len(mock_lots)
}
# Fase 2: Seleziona lotto trigger casuale
trigger_lot = random.choice(mock_lots[10:40]) # Dal mezzo della supply chain
logger.info(f"Drill trigger lot: {trigger_lot['tlc']}")
# Fase 3: Esegue recall
phase_start = time.monotonic()
recall_result = await self.workflow.initiate_recall(
trigger_lot_ids=[trigger_lot['id']],
trigger_reason="pathogen_contamination",
trigger_description=f"[DRILL] Mock Listeria contamination test - {self.drill_id}",
initiated_by="recall_drill_system",
recall_type="class_i"
)
recall_duration = time.monotonic() - phase_start
metrics['phases']['recall_execution'] = {
"duration_seconds": round(recall_duration, 2),
"scope_seconds": recall_result['scope_determination_seconds'],
"notification_seconds": recall_result['notification_seconds'],
"affected_lots": recall_result['affected_lots_count'],
"recall_number": recall_result['recall_number']
}
# Fase 4: Verifica compliance SLA
sla_24h_seconds = 24 * 3600
compliance_ok = recall_duration < sla_24h_seconds
metrics['sla_compliance'] = {
"target_seconds": sla_24h_seconds,
"actual_seconds": round(recall_duration, 2),
"compliant": compliance_ok,
"margin_hours": round((sla_24h_seconds - recall_duration) / 3600, 1)
}
# Fase 5: Cleanup dati mock
await self._cleanup_mock_data(mock_lots)
metrics['completed_at'] = datetime.utcnow().isoformat()
metrics['overall_pass'] = compliance_ok
self._log_drill_report(metrics)
return metrics
async def _seed_mock_data(self) -> list:
"""Crea lotti sintetici con CTE events per il drill"""
from sqlalchemy import text
mock_lots = []
for i in range(self.lot_count):
tlc = f"MOCK-{self.drill_id}-LOT-{i:04d}"
lot_data = {
"id": None,
"tlc": tlc,
"is_mock": True
}
# In produzione: inserire tramite lot creation API
# Per semplicità usiamo insert diretto nel drill
mock_lots.append(lot_data)
logger.info(f"Creati {len(mock_lots)} lotti mock per drill {self.drill_id}")
return mock_lots
async def _cleanup_mock_data(self, mock_lots: list):
"""Rimuove dati mock dopo il drill"""
from sqlalchemy import text
tlcs = [l['tlc'] for l in mock_lots]
# I mock TLC iniziano con "MOCK-" per identificazione sicura
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'archived'
WHERE tlc LIKE 'MOCK-%' AND tlc = ANY(:tlcs)
"""),
{"tlcs": tlcs}
)
await self.db.commit()
logger.info(f"Cleanup drill {self.drill_id} completato")
def _log_drill_report(self, metrics: dict):
report_lines = [
f"",
f"=== MOCK RECALL DRILL REPORT ===",
f"Drill ID: {metrics['drill_id']}",
f"Started: {metrics['started_at']}",
f"",
f"FASI:",
]
for phase, data in metrics.get('phases', {}).items():
report_lines.append(f" {phase}: {json.dumps(data)}")
sla = metrics.get('sla_compliance', {})
status = "PASS ✓" if sla.get('compliant') else "FAIL ✗"
report_lines.extend([
f"",
f"SLA 24H COMPLIANCE: {status}",
f" Target: {sla.get('target_seconds')}s",
f" Actual: {sla.get('actual_seconds')}s",
f" Margin: {sla.get('margin_hours')}h",
f"",
f"OVERALL: {'PASS' if metrics.get('overall_pass') else 'FAIL'}",
f"================================",
])
logger.info('\n'.join(report_lines))
Comparație de reglementare: FSMA 204 vs Reg. UE 178/2002 vs Legea privind siguranța alimentară din Regatul Unit
Pentru operatorii care exportă pe mai multe piețe, este esențial să înțeleagă diferențele dintre cadrele de reglementare. Deși împărtășesc principiul „un pas în sus, un pas în jos”, ele diferă semnificativ în ceea ce privește granularitatea și sancțiunile.
| Dimensiune | FSMA 204 (SUA) | Reg. UE 178/2002 | Legea privind siguranța alimentară din Regatul Unit 1990 + Reg. |
|---|---|---|---|
| Mături | Numai alimente FTL (listă specifică) | Toate alimentele și furajele | Toate alimentele din Marea Britanie |
| Granularitatea | CTE + KDE specific pentru eveniment | Generic one-step-up / one-step-down | Cu un pas în sus / cu un pas în jos |
| Timpul de răspuns al FDA/autorității | 24 de ore (listă care poate fi sortată electronic) | Fără termeni specifici | Fără termeni specifici |
| Format de înregistrare | Electronic, comandabil, interoperabil | Orice format documentat | Orice format documentat |
| Retenţie | 24 de luni | Variabila dupa categorie | Variabila dupa categorie |
| Sancțiuni | Până la 10.000 USD/zi per încălcare | Acestea variază în funcție de stat membru | Până la 20.000 GBP + pedeapsa cu închisoarea |
| Aplicație către importatori | Da: se aplică oricărei persoane care importă în SUA | Da: pentru operatorii de pe piața UE | Da: pentru operatorii de pe piața din Marea Britanie |
| Standarde tehnologice | Fără mandat (dar recomandat GS1) | Niciun mandat | Niciun mandat |
Implicații practice pentru exportatori
O companie italiană care exportă mozzarella proaspătă în SUA trebuie să respecte este Reg. UE 178/2002 decât FSMA 204. Întrucât FSMA 204 este mai strictă (specific CTE/KDE, răspuns 24h, format electronic), un sistem compatibil cu FSMA 204 este automat și în conformitate cu Regulamentul UE. Strategia optimă este implementarea FSMA 204 ca bază și adaptarea acestuia la cerințele UE/UK.
Impact asupra exportatorilor italieni în SUA
Italia este al treilea cel mai mare exportator de produse alimentare și băuturi către SUA (după Canada și Mexic), cu o valoare de aproximativ 8 miliarde de euro în 2024. Principalele categorii afectate de FSMA 204:
| Categorie | Export IT-SUA 2024 (est.) | Acoperire FSMA 204 | Acțiune necesară |
|---|---|---|---|
| Vin | ~2,1 miliarde USD | Nu (nu în FTL) | Numai înregistrarea FDA |
| Brânzeturi mature (parmezan, grana, pecorino) | ~350 milioane USD | Nu (excluse persoanele experimentate) | Doar înregistrare FDA + notificare prealabilă |
| Brânzeturi proaspete (mozzarella, burrata, ricotta) | ~120 milioane USD | DA — prioritate critică | Implementare completă CTE/KDE |
| Ulei de măsline | ~706 milioane USD | No | Numai înregistrarea FDA |
| Conserve de legume (rosii, piure) | ~200 de milioane de dolari | Parțial (roșii proaspete în FTL) | Verificați dacă ingredientele rămân proaspete |
Estimarea costurilor de conformitate pentru un IMM italian alimentar
| Componentă | Configurare (o singură dată) | Costul anual de exploatare |
|---|---|---|
| Trasabilitate software (SaaS sau personalizat) | 15.000 € – 80.000 € | 8.000 € – 20.000 €/an |
| Integrare ERP/MES existentă | 10.000 € – 40.000 € | 2.000 € – 5.000 €/an |
| Hardware (scanere, cititoare RFID, senzori) | 5.000 € – 25.000 € | 1.000 € – 3.000 €/an (întreținere) |
| Antrenament personal | 3.000 € – 10.000 € | 1.500 € – 3.000 €/an |
| Consiliere juridică/conformitate FDA | 5.000 € – 20.000 € | 3.000 € – 8.000 €/an |
| Total estimat | 38.000 € – 175.000 € | 15.500 € – 39.000 €/an |
Finanțare disponibilă: PNRR Tranziție 5.0
IMM-urile alimentare italiene pot accesa fondurile PNRR Tranziție 5.0 (12,7 miliarde EUR alocate, credit fiscal de până la 45%) pentru costurile implementării sistemelor de trasabilitate digitală. Sistemele FSMA 204 se califică drept „digitizare a proceselor de producție” dacă integrează senzori IoT pentru monitorizarea lanțului de frig. Verificați aplicabilitatea cu contabilul dvs.
Foaia de parcurs de conformitate Recomandată pentru IMM-urile italiene
- Luna 1-2: Analiza decalajului — cartografiază fluxurile existente, identifică produsele FTL, evaluează sistemele IT actuale
- Luna 3-4: Proiectare baze de date + dezvoltare API (utilizați codul din acest articol ca bază)
- Luna 5-6: Integrare cu ERP existent (SAP, Dynamics, Sage), instruire operator
- Luna 7-8: Pilotează un produs/linie, verifică caracterul complet KDE, primul simulare de exercițiu
- Luna 9-10: Lansare completă, motor de alertă activ, documentație FDA
- Luna 11-12: A doua simulare de exercițiu, reglare fină, pregătire pentru auditul FDA
Cele mai bune practici și anti-modele
Cele mai bune practici
- TLC unic și fără ambiguitate: evitați caracterele similare vizual (I/l/1, O/0). Utilizați GS1-128 sau SSCC ca standard de interoperabilitate cu lanțul de aprovizionare din SUA
- Imuabilitate garantată la nivel DB: utilizați declanșatoare PostgreSQL care împiedică UPDATE/DELETE pe cte_events, nu doar convențiile de cod
- Hash eveniment pentru pista de audit: SHA-256 a sarcinii utile la momentul creării — vă permite să verificați dacă înregistrările nu au fost modificate
- Test ALS 24 de ore în mod regulat: desfășurați simulații la fiecare 6 luni cu date sintetice, documentați rezultatele
- Sincronizare cu GS1: utilizați GLN (Global Location Number) pentru locație_ids — compatibil cu EDI și sistemele partenere din SUA
- Versiune KDE: FSMA 204 poate evolua; păstrați un câmp kde_schema_version în înregistrările dvs. pentru a facilita migrarea
Anti-modele de evitat
- KDE numai pe hârtie: multe companii păstrează KDE-urile în foi PDF sau Excel. Nu este posibil să îndepliniți cerința „format electronic, sortabil” în 24 de ore cu aceste sisteme
- TLC non-standard: utilizarea numerelor de lot interne care nu sunt comunicate partenerilor din aval face imposibilă urmărirea între companii
- Alertă fără escaladare: un sistem de alertă care trimite e-mailuri dar nu are un protocol de escaladare pentru probleme critice (fără răspuns în 2 ore → apel automat) nu respectă spiritul FSMA
- Doar trasabilitate internă: FSMA 204 necesită un pas în sus ȘI un pas în jos pentru parteneri. Un sistem intern singur nu este suficient
- Eroare de sincronizare UTC: Marcajele de timp CTE trebuie să fie în UTC sau decalate explicit. Marcajele temporale ambigue fac imposibilă reconstituirea cronologiei în cazul unei retrageri internaționale
Concluzii și pașii următori
Regula 204 FSMA reprezintă o schimbare de paradigmă: de la trasabilitate ca document pe hârtie la urmăribile ca infrastructură software în timp real. Sistemul pe care l-am construit în acest articol — PostgreSQL pentru stocare, FastAPI pentru stratul API, Kafka pentru streaming de evenimente, NetworkX pentru traversarea graficelor — este capabil să satisfacă cea mai critică cerință: lista completă sortabilă către FDA în 24 de ore de la orice solicitare.
Vestea bună pentru companiile italiene: prelungirea termenului limită până în iulie 2028 oferă timp suficient pentru o implementare treptată. Vestea proastă: companiile care așteaptă până în 2027 pentru a începe vor găsi o piață de consultanți și furnizori de software la prețuri umflate și cu cozi lungi de așteptare.
Datele din 2024 vorbesc clar: 241 de retrageri de alimente FDA, 1.392 de bolnavi, spitalizări mai mult decât dublu față de 2023. Trasabilitatea granulară nu este birocrație – este prevenirea măsurabilă în viețile umane.
Seria FoodTech: Articolul următor
Următorul articol din serie este 07 — Agricultura verticală: tablou de bord și control automat, unde vom construi un sistem de control în timp real pentru fermele hidroponice: senzori IoT (lumină, CO2, pH, EC), tablouri de bord Streamlit și algoritmi de optimizare pentru a maximiza randamentul pe metru pătrat. Rămâneți pe federicocalo.dev.
Cross-Series: Perspective legate
- MLOps (Seria 12): cum să implementați modele de detectare a anomaliilor motorului de alertă în producție cu MLflow
- Inginerie AI (Seria 13): RAG privind documentația FDA - un LLM care răspunde la întrebările de conformitate citând textul corect de reglementare
- PostgreSQL AI (Seria 22): pgvector pentru căutarea de similaritate pe profilurile de contaminare — găsește loturi cu caracteristici similare celor implicate în focare istorice
- DevOps Frontend (Seria 9): Conducta CI/CD pentru implementarea continuă a API-ului FSMA 204 pe Kubernetes







