Automatyzacja FSMA 204: śledzenie, ostrzeganie i przywoływanie za pośrednictwem Pythona
20 stycznia 2026 r. powinien był stanowić przełom w historii amerykańskiego bezpieczeństwa żywności. Tam Zasada 204 FSMA – Zasada końcowa dotycząca identyfikowalności żywności FDA nałożyła na wszystkie podmioty w łańcuchu dostaw żywności obowiązek wdrożenia systemów szczegółowej identyfikowalności dla kilkudziesięciu kategorii żywności wysokiego ryzyka. Bezprecedensowa aktualizacja przepisów wynikająca z ustawy o modernizacji bezpieczeństwa żywności z 2011 r., mająca globalne konsekwencje: każdy, kto eksportuje na rynek amerykański – w tym włoskie firmy produkujące wino, sery i oliwę z oliwek – był na celowniku.
W marcu 2025 r. FDA przedłużyła termin o 30 miesięcy (do 20 lipca 2028 r.), ale kierunek jest jednoznaczny: szczegółowa identyfikowalność żywności wysokiego ryzyka nie jest już opcjonalna. Firmy, które rozpoczną wdrażanie już dziś, będą miały ogromną przewagę konkurencyjną, gdy zgodność stanie się obowiązkowa. W tym artykule budujemy kompletny system w Pythonie — od projektu bazy danych po REST API, od silnika alertów po zarządzanie wycofaniem — spełniający wymagania FSMA 204.
Czego się nauczysz
- Pełna struktura FSMA 204: Lista identyfikowalności żywności, CTE i KDE
- Projekt bazy danych PostgreSQL zapewniający wielopoziomową identyfikowalność
- FastAPI REST API z modelami Pydantic do rejestracji CTE i zapytań typu one-up/one-down
- Niezmienne pozyskiwanie zdarzeń za pomocą Apache Kafka w celu uzyskania pełnej ścieżki audytu
- Silnik alertów o anomaliach (łańcuch chłodniczy, podejrzane partie) z powiadomieniami wielokanałowymi
- Przebieg zarządzania wycofaniem: śledzenie wstecz/śledzenie w przód w czasie krótszym niż 24 godziny (wymagania FDA)
- Próbne ćwiczenie przypominania ze wskaźnikami wydajności
- Porównanie FSMA 204 z przepisami UE 178/2002 i wpływ na włoskich eksporterów
Seria FoodTech: Gdzie jesteśmy
To szósty artykuł z serii FoodTech na federicocalo.dev. Oto pełna mapa:
| # | Tytuł | Poziom | Państwo |
|---|---|---|---|
| 01 | Rurociąg IoT dla rolnictwa precyzyjnego | Mediator | Opublikowany |
| 02 | Wizja komputerowa w kontroli jakości żywności | Mediator | Opublikowany |
| 03 | ML i Edge Computing do przewidywania upraw | Zaawansowany | Opublikowany |
| 04 | Blockchain i przejrzysty łańcuch dostaw żywności | Zaawansowany | Opublikowany |
| 05 | Prognozowanie popytu dla handlu detalicznego na dużą skalę za pomocą Prophet i LSTM | Zaawansowany | Opublikowany |
| 06 | Automatyzacja FSMA 204: śledzenie, ostrzeganie i przywoływanie za pośrednictwem Pythona | Zaawansowany | JESTEŚ TU |
| 07 | Rolnictwo pionowe: deska rozdzielcza i zautomatyzowane sterowanie | Mediator | Wkrótce |
| 08 | Satelitarne API do monitorowania upraw | Zaawansowany | Wkrótce |
| 09 | Pulpit nawigacyjny „od pola do stołu” z Streamlit | Mediator | Wkrótce |
| 10 | Odporność łańcucha dostaw: optymalizacja za pomocą narzędzi OR | Zaawansowany | Wkrótce |
Zasada FSMA 204: Najbardziej znacząca zmiana regulacyjna od 2011 roku
Ustawa o modernizacji bezpieczeństwa żywności (FSMA) z 2011 r. zmieniła podejście FDA do bezpieczeństwa żywności z reaktywnego (reagowanie na zanieczyszczenia) na zapobiegawcze (zapobieganie). Sekcja 204 przekazała FDA uprawnienia do identyfikowania żywności wysokiego ryzyka i nakładania dodatkowych wymagań w zakresie identyfikowalności. Rezultatem jest Ostateczna zasada dotycząca identyfikowalności żywności, opublikowany w listopadzie 2022 r.
Cel jest radykalny: w przypadku wybuchu epidemii lub skażenia operatorzy muszą być w stanie zidentyfikować cały łańcuch dystrybucji partii – od pola do stołu – i dostarczyć dokumentację do FDA w formacie elektronicznym, który może zamówić 24 godziny od zgłoszenia. Przed FSMA 204 proces ten trwał średnio 7–10 dni. FDA szacuje wpływ skażonej żywności na zmniejszenie liczby zachorowań i zgonów na setki przypadków, których udało się uniknąć rocznie.
Lista identyfikowalności żywności (FTL): zaangażowana żywność
FTL obejmuje żywność, która w przeszłości była związana z poważnymi epidemiami. Główne kategorie to:
| Kategoria | Konkretne przykłady | Ryzyko pierwotne |
|---|---|---|
| Świeże sery | Miękka/półmiękka, niedojrzała, świeża mozzarella, ricotta | Listeria monocytogenes |
| Jajka w skorupkach | Niepasteryzowane jaja kurze | Salmonella Enteritidis |
| Owoce morza – ryby | Tuńczyk, łosoś, dorsz, miecznik (świeży/mrożony) | Scombroid, Listeria |
| Skorupiaki | Krewetki, kraby, homary | Vibrio, Norowirus |
| Małże dwuskorupowe | Ostrygi, małże, małże | Norowirus, Vibrio |
| Sałatki RTE | Sałatka ziemniaczana, jajka, owoce morza | Listeria, Salmonella |
| Świeże owoce i warzywa | Ogórki, zioła, warzywa liściaste, melony, papryka, kiełki, pomidory, owoce tropikalne | E. coli O157:H7, Salmonella |
| Masło orzechowe | Masło orzechowe, migdały, orzechy nerkowca | Salmonella |
Ważna uwaga dla włoskich eksporterów
I świeże sery (mozzarella, burrata, ricotta, stracchino) są wyraźnie w FTL. Włoskie firmy eksportujące te produkty do USA muszą wdrożyć identyfikowalność FSMA 204 dla wszystkich partii przeznaczonych na rynek amerykański. Twarde sery dojrzałe (Parmigiano Reggiano, Grana Padano, Pecorino Romano) nie są objęte obecnym FTL.
Krytyczne zdarzenia śledzenia (CTE)
CTE to momenty w łańcuchu dostaw, w których należy rejestrować identyfikowalność. FSMA 204 definiuje sześć głównych, każdy z określonymi KDE:
| CTE | Opis | Dotyczy |
|---|---|---|
| Rozwój | Odbiór świeżych produktów bezpośrednio z pola | Owoce i warzywa, zioła |
| Żniwny | Zbiór żywności nieuprawnej (skorupiaki, dzikie ryby) | Dzikie owoce morza |
| Chłodzenie | Pierwsza operacja chłodzenia po zbiorach | Owoce i warzywa, owoce morza |
| Wstępne pakowanie | Po raz pierwszy produkt jest pakowany w ostatecznej formie | Wszystkie elementy FTL |
| Wysyłka | Wszelkie przeniesienie własności lub opieki | Wszystkie elementy FTL |
| Otrzymujący | Odbiór przesyłek FTL od innego operatora | Wszystkie elementy FTL |
| Transformacja | Kiedy element FTL zostaje włączony do nowego produktu | Transformatory/producenci |
Kluczowe elementy danych (KDE) dla CTE
Dla każdego CTE reguła definiuje obowiązkowe KDE. Przykład dla Wysyłka CTE:
- Kod partii identyfikowalności (TLC) — niepowtarzalny identyfikator partii
- Ilość i jednostka miary wysłana
- Data wysyłki
- Opis lokalizacji punktu wysyłki (odniesienie źródłowe TLC)
- Numer referencyjny dokumentu przewozowego
- Nazwa i adres odbiorcy
- Opis żywności (w tym numer rejestracyjny FDA Food Facility)
Dla Odbiór CTE:
- Otrzymano TLC (z poprzedniej przesyłki)
- Otrzymana ilość
- Data otrzymania
- Lokalizacja punktu odbioru
- Odniesienie do dokumentu przewozowego
- Źródło TLC (pochodzenie partii)
Architektura systemu zgodności z FSMA 204
System zgodności z FSMA 204 w środowisku produkcyjnym musi zarządzać trzema głównymi przepływami: przechwytywanie danych w czasie rzeczywistym podczas operacji, przetwarzanie i przechowywanie zdarzeń w sposób niezmienny, oraz szybka reakcja w przypadku żądania FDA lub wycofania. Proponowana przez nas architektura jest sterowana zdarzeniami i natywna dla chmury:
Stos technologii
- Warstwa API: FastAPI (Python 3.11+) — punkt końcowy REST do rejestracji CTE
- Podstawowa baza danych: PostgreSQL 16 — schemat relacyjny do śledzenia
- Transmisja wydarzeń: Apache Kafka — zdarzenia CTE jako niezmienne komunikaty
- Przejście wykresu: NetworkX lub Neo4j — śledzenie wstecz/śledzenie łańcucha dostaw
- Silnik alertów: Python + Seler + Redis — wielokanałowe reguły asynchroniczne
- Zarządzanie wycofaniem: Procedury składowane FastAPI + PostgreSQL
- Raportowanie: automatyczne generowanie powiadomienia o wycofaniu produktu przez FDA w formacie JSON/XML
Przepływ od końca do końca:
- Przechwytywanie danych: operatorzy rejestrują CTE za pośrednictwem mobilnego/internetowego API lub integracji z ERP
- Przetwarzanie zdarzeń: każdy CTE jest sprawdzany, wzbogacany i publikowany w Kafce
- Wykres identyfikowalności: wykres genealogiczny partii jest aktualizowany w czasie rzeczywistym
- Silnik alertów: reguły zdarzeń wykrywają anomalie i wysyłają powiadomienia
- Zarządzanie wycofaniem: w przypadku alertu krytycznego automatycznie rozpoczyna proces przywracania
- Raportowanie FDA: wygenerowanie listy sortowalnej w formie elektronicznej w ciągu 24 godzin
Projekt bazy danych PostgreSQL pod kątem identyfikowalności FSMA 204
System musi spełniać dwa przeciwstawne wymagania: wydajność w codziennych operacjach (wkładka CTE, zapytanie biznesowe) e ekstremalna prędkość zapytań o śledzenie w przypadku wycofania. Używamy PostgreSQL z kombinacją tabel relacyjnych i indeksów JSONB dla zmiennych KDE dla CTE.
-- ============================================================
-- FSMA 204 Compliance Database Schema
-- PostgreSQL 16
-- ============================================================
-- Estensione per UUID generation
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "ltree"; -- Per path-based queries su genealogia
-- ============================================================
-- TABELLA: locations
-- Tutti i luoghi della filiera (aziende, magazzini, porti)
-- ============================================================
CREATE TABLE locations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fda_facility_id VARCHAR(12) UNIQUE, -- FDA Food Facility Registration Number
name VARCHAR(255) NOT NULL,
address_line1 VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
country_code CHAR(2) NOT NULL, -- ISO 3166-1 alpha-2
postal_code VARCHAR(20),
location_type VARCHAR(50) NOT NULL
CHECK (location_type IN (
'farm', 'packing_house', 'processor',
'distributor', 'retailer', 'port', 'cold_storage'
)),
contact_email VARCHAR(255),
contact_phone VARCHAR(50),
coordinates POINT, -- lat/lon per geo-queries
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: food_items
-- Catalogo alimenti FTL con classificazione FSMA
-- ============================================================
CREATE TABLE food_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
ftl_category VARCHAR(50) NOT NULL
CHECK (ftl_category IN (
'fresh_cheese', 'shell_eggs', 'finfish',
'crustaceans', 'bivalve_mollusks', 'nut_butter',
'rte_deli_salads', 'fresh_produce'
)),
name VARCHAR(255) NOT NULL,
description TEXT,
fda_product_code VARCHAR(7), -- FDA product code
unit_of_measure VARCHAR(20) NOT NULL DEFAULT 'kg',
storage_temp_min DECIMAL(5,2), -- Celsius
storage_temp_max DECIMAL(5,2),
shelf_life_days INTEGER,
is_ftl_covered BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: traceability_lots
-- Core: ogni lotto tracciabile con TLC (Traceability Lot Code)
-- ============================================================
CREATE TABLE traceability_lots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tlc VARCHAR(50) UNIQUE NOT NULL, -- Traceability Lot Code
food_item_id UUID NOT NULL REFERENCES food_items(id),
origin_location_id UUID NOT NULL REFERENCES locations(id),
-- Quantità e dimensione lotto
initial_quantity DECIMAL(12,4) NOT NULL,
remaining_quantity DECIMAL(12,4) NOT NULL,
unit_of_measure VARCHAR(20) NOT NULL,
-- Date critiche
production_date DATE,
harvest_date DATE,
best_before_date DATE,
expiry_date DATE,
-- Genealogia: da quali lotti parent e stato derivato
parent_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
genealogy_path ltree, -- Es: "LOT_A.LOT_B.LOT_C"
-- Stato del lotto
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN (
'active', 'recalled', 'quarantined',
'consumed', 'disposed', 'archived'
)),
-- Attributi FDA specifici
tlc_source_reference TEXT, -- Descrizione provenienza TLC
-- Metadati extra (variabili per tipo alimento)
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indice per query di genealogia rapida
CREATE INDEX idx_lots_genealogy ON traceability_lots USING GIST (genealogy_path);
CREATE INDEX idx_lots_status ON traceability_lots(status);
CREATE INDEX idx_lots_tlc ON traceability_lots(tlc);
CREATE INDEX idx_lots_food_item ON traceability_lots(food_item_id);
CREATE INDEX idx_lots_origin ON traceability_lots(origin_location_id);
CREATE INDEX idx_lots_attributes ON traceability_lots USING GIN (attributes);
-- ============================================================
-- TABELLA: cte_events
-- Registro immutabile di tutti i Critical Tracking Events
-- Questa tabella NON deve mai avere UPDATE o DELETE
-- ============================================================
CREATE TABLE cte_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'growing', 'harvesting', 'cooling',
'initial_packing', 'shipping', 'receiving',
'transformation'
)),
-- Lotti coinvolti
lot_id UUID NOT NULL REFERENCES traceability_lots(id),
related_lot_ids UUID[] DEFAULT ARRAY[]::UUID[], -- per transformation
-- Localizzazione evento
location_id UUID NOT NULL REFERENCES locations(id),
-- Timestamp evento (quando e accaduto, non quando e stato registrato)
event_timestamp TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW(),
recorded_by VARCHAR(255) NOT NULL, -- user/system che ha registrato
-- KDE come JSONB (struttura varia per tipo CTE)
kde JSONB NOT NULL,
-- Per spedizioni: destinatario
destination_location_id UUID REFERENCES locations(id),
-- Documento di riferimento (bill of lading, PO, etc.)
reference_document_number VARCHAR(100),
reference_document_type VARCHAR(50),
-- Quantità movimentata
quantity DECIMAL(12,4),
unit_of_measure VARCHAR(20),
-- Hash per immutabilita (SHA-256 del payload)
event_hash CHAR(64) NOT NULL,
-- Firma digitale opzionale
digital_signature TEXT,
-- Metadati
notes TEXT,
metadata JSONB DEFAULT '{}'
-- NO updated_at: questo record e immutabile
);
-- Gli eventi NON si modificano: trigger di protezione
CREATE RULE no_update_cte AS ON UPDATE TO cte_events DO INSTEAD NOTHING;
CREATE RULE no_delete_cte AS ON DELETE TO cte_events DO INSTEAD NOTHING;
CREATE INDEX idx_cte_lot ON cte_events(lot_id);
CREATE INDEX idx_cte_type ON cte_events(event_type);
CREATE INDEX idx_cte_timestamp ON cte_events(event_timestamp DESC);
CREATE INDEX idx_cte_location ON cte_events(location_id);
CREATE INDEX idx_cte_destination ON cte_events(destination_location_id);
CREATE INDEX idx_cte_kde ON cte_events USING GIN (kde);
-- ============================================================
-- TABELLA: recall_events
-- Gestione recall con workflow completo
-- ============================================================
CREATE TABLE recall_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
recall_number VARCHAR(20) UNIQUE NOT NULL, -- Es: "RECALL-2026-001"
recall_type VARCHAR(20) NOT NULL
CHECK (recall_type IN ('class_i', 'class_ii', 'class_iii', 'market_withdrawal')),
-- Root cause
trigger_lot_ids UUID[] NOT NULL, -- Lotti che hanno scatenato il recall
trigger_reason VARCHAR(50) NOT NULL
CHECK (trigger_reason IN (
'pathogen_contamination', 'allergen_undeclared',
'foreign_material', 'chemical_contamination',
'mislabeling', 'temperature_abuse', 'other'
)),
trigger_description TEXT NOT NULL,
-- Scope calcolato automaticamente
affected_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
affected_quantity DECIMAL(12,4),
affected_unit VARCHAR(20),
-- Workflow stato
status VARCHAR(30) NOT NULL DEFAULT 'identified'
CHECK (status IN (
'identified', 'scope_determined', 'notifications_sent',
'removal_in_progress', 'effectiveness_check',
'closed', 'fda_reported'
)),
-- Timestamp workflow
identified_at TIMESTAMPTZ DEFAULT NOW(),
scope_determined_at TIMESTAMPTZ,
notifications_sent_at TIMESTAMPTZ,
fda_reported_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
-- FDA notification (entro 24h da richiesta)
fda_notified BOOLEAN DEFAULT false,
fda_report_json JSONB,
-- Responsabile
initiated_by VARCHAR(255) NOT NULL,
-- Metriche drill
traceback_seconds INTEGER, -- Tempo per completare traceback
traceforward_seconds INTEGER,
lots_traced_pct DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alert_rules
-- Configurazione regole per alert engine
-- ============================================================
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
description TEXT,
rule_type VARCHAR(30) NOT NULL
CHECK (rule_type IN (
'temperature_breach', 'lot_age', 'missing_cte',
'quantity_discrepancy', 'supplier_blacklist',
'pathogen_alert', 'custom'
)),
condition_json JSONB NOT NULL, -- Regola in formato JSON
severity VARCHAR(10) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
channels TEXT[] NOT NULL, -- ['email', 'sms', 'webhook', 'slack']
recipients JSONB NOT NULL, -- {"email": [...], "phone": [...]}
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alerts
-- Alert generati dall'engine
-- ============================================================
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
rule_id UUID REFERENCES alert_rules(id),
lot_id UUID REFERENCES traceability_lots(id),
cte_event_id UUID REFERENCES cte_events(id),
severity VARCHAR(10) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
data JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'open'
CHECK (status IN ('open', 'acknowledged', 'resolved', 'escalated')),
acknowledged_by VARCHAR(255),
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
auto_recall_triggered BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_alerts_lot ON alerts(lot_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_status ON alerts(status);
CREATE INDEX idx_alerts_created ON alerts(created_at DESC);
REST API z FastAPI: modele Pydantic i punkty końcowe CTE
API jest punktem wejścia dla wszystkich operatorów w łańcuchu dostaw. Używamy FastAPI ze względu na jego szybkość, automatyczną walidację w Pydantic i automatyczne generowanie dokumentacji OpenAPI – przydatnej przy audytach FDA. Szablony Pydantic odzwierciedlają KDE wymagane przez regułę.
# fsma204/models.py
# Pydantic v2 models per FSMA 204 API
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
from uuid import UUID
from enum import Enum
class CTEEventType(str, Enum):
GROWING = "growing"
HARVESTING = "harvesting"
COOLING = "cooling"
INITIAL_PACKING = "initial_packing"
SHIPPING = "shipping"
RECEIVING = "receiving"
TRANSFORMATION = "transformation"
class FTLCategory(str, Enum):
FRESH_CHEESE = "fresh_cheese"
SHELL_EGGS = "shell_eggs"
FINFISH = "finfish"
CRUSTACEANS = "crustaceans"
BIVALVE_MOLLUSKS = "bivalve_mollusks"
NUT_BUTTER = "nut_butter"
RTE_DELI_SALADS = "rte_deli_salads"
FRESH_PRODUCE = "fresh_produce"
class LotStatus(str, Enum):
ACTIVE = "active"
RECALLED = "recalled"
QUARANTINED = "quarantined"
CONSUMED = "consumed"
DISPOSED = "disposed"
# ── KDE Models per CTE type ──────────────────────────────────
class ShippingKDE(BaseModel):
"""Key Data Elements per Shipping CTE - FSMA 204 §1.1330"""
tlc: str = Field(..., description="Traceability Lot Code")
quantity_shipped: float = Field(..., gt=0)
unit_of_measure: str
ship_date: date
tlc_source_reference: str = Field(..., description="Descrizione provenienza TLC")
transport_document_number: str
transport_document_type: str = Field(default="bill_of_lading")
receiver_fda_facility_id: str = Field(..., description="FDA registration del destinatario")
receiver_name: str
receiver_address: str
class ReceivingKDE(BaseModel):
"""Key Data Elements per Receiving CTE - FSMA 204 §1.1335"""
tlc_received: str = Field(..., description="TLC come apparso nel documento di spedizione")
quantity_received: float = Field(..., gt=0)
unit_of_measure: str
receive_date: date
location_description: str
reference_document_number: str
reference_document_type: str
lot_code_generator_location: Optional[str] = None
class GrowingKDE(BaseModel):
"""Key Data Elements per Growing CTE"""
tlc: str
harvest_date: date
field_id: str # Identificativo campo/parcella
grower_fda_facility_id: str
commodity: str
growing_method: str = "conventional" # conventional, organic, hydroponic
class InitialPackingKDE(BaseModel):
"""Key Data Elements per Initial Packing CTE"""
tlc: str
pack_date: date
quantity_packed: float
unit_of_measure: str
packaging_type: str
facility_fda_id: str
lot_code_generator_location: str
class TransformationKDE(BaseModel):
"""Key Data Elements per Transformation CTE"""
new_tlc: str # TLC del nuovo prodotto
input_tlcs: List[str] # TLC degli ingredienti usati
transformation_date: date
facility_fda_id: str
product_description: str
quantity_produced: float
unit_of_measure: str
# ── Request/Response Models ──────────────────────────────────
class CreateLotRequest(BaseModel):
food_item_id: UUID
origin_location_id: UUID
tlc: str = Field(..., min_length=3, max_length=50)
initial_quantity: float = Field(..., gt=0)
unit_of_measure: str = Field(default="kg")
production_date: Optional[date] = None
harvest_date: Optional[date] = None
best_before_date: Optional[date] = None
expiry_date: Optional[date] = None
parent_lot_ids: List[UUID] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
@field_validator('tlc')
@classmethod
def validate_tlc(cls, v: str) -> str:
"""TLC non deve contenere caratteri ambigui"""
forbidden = set('IO0l') # caratteri ambigui
if any(c in forbidden for c in v.upper()):
raise ValueError("TLC contiene caratteri ambigui (I, O, 0, l)")
return v.upper()
class RegisterCTERequest(BaseModel):
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_by: str = Field(..., min_length=3)
kde: Dict[str, Any] = Field(..., description="KDE specifici per il tipo di CTE")
destination_location_id: Optional[UUID] = None
reference_document_number: Optional[str] = None
reference_document_type: Optional[str] = None
quantity: Optional[float] = None
unit_of_measure: Optional[str] = None
notes: Optional[str] = None
class CTEEventResponse(BaseModel):
id: UUID
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_at: datetime
recorded_by: str
kde: Dict[str, Any]
event_hash: str
class TracebackRequest(BaseModel):
lot_id: UUID
depth: int = Field(default=10, ge=1, le=50)
class TracebackResult(BaseModel):
lot_id: UUID
tlc: str
ancestors: List[Dict[str, Any]] # lotti upstream
descendants: List[Dict[str, Any]] # lotti downstream
events: List[Dict[str, Any]]
total_ancestors: int
total_descendants: int
trace_depth: int
computation_ms: int
class SortableListResponse(BaseModel):
"""Formato sortable list richiesto da FDA (§1.1375)"""
generated_at: datetime
requesting_lots: List[str] # TLC richiesti
records: List[Dict[str, Any]] # Record ordinabili
total_records: int
format_version: str = "FSMA204_v1"
# fsma204/api.py
# FastAPI application per FSMA 204 compliance
import hashlib
import json
import time
from uuid import UUID
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .models import (
CreateLotRequest, RegisterCTERequest,
CTEEventResponse, TracebackRequest,
TracebackResult, SortableListResponse
)
from .database import get_db
from .kafka_producer import publish_cte_event
from .alert_engine import check_alert_rules
app = FastAPI(
title="FSMA 204 Traceability API",
description="Food Traceability Compliance System per FDA Rule 204",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def compute_event_hash(payload: dict) -> str:
"""SHA-256 hash del payload CTE per immutabilita"""
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Lot Management ──────────────────────────────────────────
@app.post("/api/v1/lots", status_code=201)
async def create_lot(
request: CreateLotRequest,
db: AsyncSession = Depends(get_db)
):
"""
Crea un nuovo lotto tracciabile.
Il TLC deve essere unico nel sistema.
"""
# Verifica unicita TLC
result = await db.execute(
text("SELECT id FROM traceability_lots WHERE tlc = :tlc"),
{"tlc": request.tlc}
)
if result.fetchone():
raise HTTPException(
status_code=409,
detail=f"TLC '{request.tlc}' già esistente nel sistema"
)
# Costruisce genealogy_path
genealogy_path = request.tlc
if request.parent_lot_ids:
# Recupera path del parent principale
parent_result = await db.execute(
text("SELECT genealogy_path, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.parent_lot_ids[0])}
)
parent = parent_result.fetchone()
if parent and parent.genealogy_path:
genealogy_path = f"{parent.genealogy_path}.{request.tlc}"
await db.execute(
text("""
INSERT INTO traceability_lots (
food_item_id, origin_location_id, tlc,
initial_quantity, remaining_quantity, unit_of_measure,
production_date, harvest_date, best_before_date, expiry_date,
parent_lot_ids, genealogy_path, attributes
) VALUES (
:food_item_id, :origin_location_id, :tlc,
:initial_quantity, :initial_quantity, :unit_of_measure,
:production_date, :harvest_date, :best_before_date, :expiry_date,
:parent_lot_ids::uuid[], :genealogy_path::ltree, :attributes::jsonb
)
"""),
{
"food_item_id": str(request.food_item_id),
"origin_location_id": str(request.origin_location_id),
"tlc": request.tlc,
"initial_quantity": request.initial_quantity,
"unit_of_measure": request.unit_of_measure,
"production_date": request.production_date,
"harvest_date": request.harvest_date,
"best_before_date": request.best_before_date,
"expiry_date": request.expiry_date,
"parent_lot_ids": [str(x) for x in request.parent_lot_ids],
"genealogy_path": genealogy_path,
"attributes": json.dumps(request.attributes)
}
)
await db.commit()
return {"message": "Lotto creato", "tlc": request.tlc}
# ── CTE Registration ────────────────────────────────────────
@app.post("/api/v1/cte-events", response_model=CTEEventResponse, status_code=201)
async def register_cte_event(
request: RegisterCTERequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Registra un Critical Tracking Event.
Il record e IMMUTABILE una volta creato (no update/delete).
"""
# Verifica che il lotto esista e sia attivo
lot_result = await db.execute(
text("SELECT id, status, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.lot_id)}
)
lot = lot_result.fetchone()
if not lot:
raise HTTPException(status_code=404, detail="Lotto non trovato")
if lot.status in ('recalled', 'disposed'):
raise HTTPException(
status_code=422,
detail=f"Impossibile registrare CTE su lotto con stato '{lot.status}'"
)
# Calcola hash per immutabilita
payload = {
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp.isoformat(),
"kde": request.kde
}
event_hash = compute_event_hash(payload)
# Inserisce CTE (immutabile)
result = await db.execute(
text("""
INSERT INTO cte_events (
event_type, lot_id, location_id,
event_timestamp, recorded_by, kde,
destination_location_id, reference_document_number,
reference_document_type, quantity, unit_of_measure,
event_hash, notes
) VALUES (
:event_type, :lot_id, :location_id,
:event_timestamp, :recorded_by, :kde::jsonb,
:destination_location_id, :reference_document_number,
:reference_document_type, :quantity, :unit_of_measure,
:event_hash, :notes
)
RETURNING id, recorded_at
"""),
{
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp,
"recorded_by": request.recorded_by,
"kde": json.dumps(request.kde),
"destination_location_id": (
str(request.destination_location_id)
if request.destination_location_id else None
),
"reference_document_number": request.reference_document_number,
"reference_document_type": request.reference_document_type,
"quantity": request.quantity,
"unit_of_measure": request.unit_of_measure,
"event_hash": event_hash,
"notes": request.notes
}
)
row = result.fetchone()
await db.commit()
# Aggiorna remaining_quantity per spedizioni
if request.event_type == "shipping" and request.quantity:
await db.execute(
text("""
UPDATE traceability_lots
SET remaining_quantity = remaining_quantity - :qty
WHERE id = :id
"""),
{"qty": request.quantity, "id": str(request.lot_id)}
)
await db.commit()
# Background: pubblica su Kafka e verifica alert rules
event_data = {
"id": str(row.id),
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"tlc": lot.tlc,
"kde": request.kde,
"event_timestamp": request.event_timestamp.isoformat()
}
background_tasks.add_task(publish_cte_event, event_data)
background_tasks.add_task(check_alert_rules, event_data, str(request.lot_id))
return CTEEventResponse(
id=row.id,
event_type=request.event_type,
lot_id=request.lot_id,
location_id=request.location_id,
event_timestamp=request.event_timestamp,
recorded_at=row.recorded_at,
recorded_by=request.recorded_by,
kde=request.kde,
event_hash=event_hash
)
# ── Traceback / Traceforward ────────────────────────────────
@app.get("/api/v1/lots/{lot_id}/traceback")
async def traceback(lot_id: UUID, depth: int = 10, db: AsyncSession = Depends(get_db)):
"""
One-step-up AND full traceback: risale la filiera upstream.
Requisito FDA: completare entro 24h da richiesta.
"""
start_ms = time.monotonic()
# Recupera lotto root
root_result = await db.execute(
text("SELECT id, tlc, genealogy_path FROM traceability_lots WHERE id = :id"),
{"id": str(lot_id)}
)
root = root_result.fetchone()
if not root:
raise HTTPException(status_code=404, detail="Lotto non trovato")
# Usa ltree per trovare tutti gli antenati
ancestors_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id, initial_quantity
FROM traceability_lots
WHERE genealogy_path @> :path::ltree
AND id != :lot_id
ORDER BY nlevel(genealogy_path) ASC
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
ancestors = [dict(r._mapping) for r in ancestors_result.fetchall()]
# Trova tutti i discendenti (traceforward)
descendants_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id
FROM traceability_lots
WHERE genealogy_path <@ :path::ltree
AND id != :lot_id
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
descendants = [dict(r._mapping) for r in descendants_result.fetchall()]
# Recupera tutti gli eventi CTE per il lotto
events_result = await db.execute(
text("""
SELECT event_type, event_timestamp, recorded_by,
kde, location_id, event_hash
FROM cte_events
WHERE lot_id = :lot_id
ORDER BY event_timestamp ASC
"""),
{"lot_id": str(lot_id)}
)
events = [dict(r._mapping) for r in events_result.fetchall()]
elapsed_ms = int((time.monotonic() - start_ms) * 1000)
return TracebackResult(
lot_id=lot_id,
tlc=root.tlc,
ancestors=ancestors,
descendants=descendants,
events=events,
total_ancestors=len(ancestors),
total_descendants=len(descendants),
trace_depth=depth,
computation_ms=elapsed_ms
)
# ── FDA Sortable List ───────────────────────────────────────
@app.get("/api/v1/lots/sortable-list", response_model=SortableListResponse)
async def fda_sortable_list(
tlcs: str, # comma-separated TLC list
db: AsyncSession = Depends(get_db)
):
"""
Genera il sortable list in formato elettronico richiesto da FDA.
Deve essere producibile entro 24h da richiesta FDA (§1.1375).
"""
tlc_list = [t.strip() for t in tlcs.split(",")]
# Recupera tutti i record CTE per i TLC richiesti
records_result = await db.execute(
text("""
SELECT
tl.tlc,
ce.event_type,
ce.event_timestamp,
ce.recorded_by,
ce.kde,
ce.quantity,
ce.unit_of_measure,
l.name AS location_name,
l.fda_facility_id,
ce.reference_document_number,
ce.event_hash
FROM cte_events ce
JOIN traceability_lots tl ON ce.lot_id = tl.id
JOIN locations l ON ce.location_id = l.id
WHERE tl.tlc = ANY(:tlcs)
ORDER BY tl.tlc, ce.event_timestamp ASC
"""),
{"tlcs": tlc_list}
)
records = []
for row in records_result.fetchall():
r = dict(row._mapping)
r['event_timestamp'] = r['event_timestamp'].isoformat()
records.append(r)
return SortableListResponse(
generated_at=__import__('datetime').datetime.utcnow(),
requesting_lots=tlc_list,
records=records,
total_records=len(records)
)
Niezmienne pozyskiwanie zdarzeń za pomocą Apache Kafka
Niezmienny charakter CTE idealnie pasuje do wzorca Event Sourcing. Każde zdarzenie jest niezmiennym faktem dotyczącym tego, co wydarzyło się w łańcuchu dostaw. Kafka zapewnia, że wydarzenia te są:
- Wytrzymały: konfigurowalne przechowywanie (np. 7 lat dla FSMA 204)
- Możliwość odtwarzania: odbudować państwo w dowolnym momencie historycznym
- Posortowane według partycji: gwarancja zamówienia partii (podział przez TLC)
- Rozproszone: skalowalne do milionów zdarzeń dziennie
# fsma204/kafka_producer.py
# Kafka producer per CTE events
import json
import logging
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from datetime import datetime
logger = logging.getLogger(__name__)
# Schema Avro per CTE event (per Schema Registry)
CTE_EVENT_SCHEMA = """
{
"type": "record",
"name": "CTEEvent",
"namespace": "com.fsma204.traceability",
"fields": [
{"name": "id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "lot_id", "type": "string"},
{"name": "tlc", "type": "string"},
{"name": "event_timestamp", "type": "string"},
{"name": "kde", "type": "string"},
{"name": "event_hash", "type": "string"}
]
}
"""
# Configurazione Kafka
kafka_config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'fsma204-producer',
'acks': 'all', # Garanzia durabilita
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'snappy'
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
logger.error(f"Kafka delivery failed: {err}")
else:
logger.debug(
f"CTE event delivered to {msg.topic()} "
f"partition {msg.partition()} offset {msg.offset()}"
)
async def publish_cte_event(event_data: dict):
"""
Pubblica un CTE event su Kafka.
Usa il TLC come partition key per garantire ordine per lotto.
"""
try:
message = json.dumps(event_data, default=str).encode('utf-8')
# Partition key = TLC: tutti gli eventi dello stesso lotto
# vanno alla stessa partizione, garantendo ordinamento
partition_key = event_data.get('tlc', event_data['lot_id'])
producer.produce(
topic='fsma204.cte.events',
key=partition_key.encode('utf-8'),
value=message,
callback=delivery_callback
)
producer.poll(0) # Flush asincrono
except Exception as e:
logger.error(f"Failed to publish CTE event: {e}", exc_info=True)
# ── Consumer per Alert Engine ───────────────────────────────
# fsma204/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import asyncio
async def start_cte_consumer():
"""
Consumer Kafka per il real-time alert processing.
Eseguito come background service separato dall'API.
"""
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'fsma204-alert-engine',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Commit manuale per at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['fsma204.cte.events'])
logger.info("CTE Consumer avviato, in ascolto su fsma204.cte.events")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
await process_cte_for_alerts(event)
consumer.commit(msg)
except Exception as e:
logger.error(f"Error processing CTE event: {e}", exc_info=True)
# Non committiamo: l'evento verrà riprocessato
finally:
consumer.close()
async def process_cte_for_alerts(event: dict):
"""Pipeline di processing per ogni CTE event"""
event_type = event['event_type']
kde = json.loads(event.get('kde', '{}'))
# 1. Controlla cold chain per eventi di spedizione/ricezione
if event_type in ('shipping', 'receiving'):
await check_cold_chain_continuity(event, kde)
# 2. Verifica completezza KDE per il tipo di evento
await validate_kde_completeness(event_type, kde, event['lot_id'])
# 3. Controlla se il fornitore e in blacklist
if 'supplier_fda_id' in kde:
await check_supplier_blacklist(kde['supplier_fda_id'], event['lot_id'])
# 4. Aggiorna grafo di tracciabilita in memoria (NetworkX)
await update_traceability_graph(event)
Silnik alertów: wykrywanie anomalii w czasie rzeczywistym
Silnik alertów monitoruje ciągły przepływ zdarzeń CTE i stosuje konfigurowalne reguły. Powiadomienia są wysyłane wieloma kanałami w zależności od wagi. Alarm critical może automatycznie uruchomić proces przywracania.
# fsma204/alert_engine.py
# Alert engine per FSMA 204 compliance monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
import httpx
import smtplib
from email.mime.text import MIMEText
from twilio.rest import Client as TwilioClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .database import get_db_session
logger = logging.getLogger(__name__)
class AlertSeverity:
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertRule:
"""Base class per le regole di alert"""
def __init__(self, rule_id: str, name: str, severity: str):
self.rule_id = rule_id
self.name = name
self.severity = severity
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
"""Ritorna un dict con i dettagli dell'alert, o None se ok"""
raise NotImplementedError
class ColdChainBreachRule(AlertRule):
"""
Rileva interruzioni nella catena del freddo.
Regola: se la temperatura registrata nei KDE supera la soglia
prevista per la categoria di alimento.
"""
TEMP_THRESHOLDS = {
'fresh_cheese': {'min': 1.0, 'max': 7.0},
'finfish': {'min': -2.0, 'max': 4.0},
'crustaceans': {'min': 0.0, 'max': 4.0},
'shell_eggs': {'min': 5.0, 'max': 7.2}, # FDA: < 45°F
'fresh_produce': {'min': 1.0, 'max': 10.0},
}
def __init__(self):
super().__init__("cold_chain_breach", "Cold Chain Breach", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
temp_recorded = kde.get('temperature_celsius')
if temp_recorded is None:
return None
# Recupera categoria alimento del lotto
result = await db.execute(
text("""
SELECT fi.ftl_category
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
WHERE tl.id = :lot_id
"""),
{"lot_id": event['lot_id']}
)
row = result.fetchone()
if not row:
return None
category = row.ftl_category
threshold = self.TEMP_THRESHOLDS.get(category)
if not threshold:
return None
breach = False
breach_detail = ""
if temp_recorded < threshold['min']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sotto minimo {threshold['min']}°C"
)
elif temp_recorded > threshold['max']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sopra massimo {threshold['max']}°C"
)
if breach:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Cold Chain Breach - TLC {event.get('tlc', 'N/A')}",
"description": (
f"{breach_detail} per categoria {category}. "
f"CTE: {event['event_type']}, "
f"timestamp: {event['event_timestamp']}"
),
"lot_id": event['lot_id'],
"data": {
"temperature": temp_recorded,
"threshold": threshold,
"category": category,
"event_type": event['event_type']
}
}
return None
class MissingKDERule(AlertRule):
"""
Verifica che tutti i KDE obbligatori siano presenti per il tipo CTE.
Mancanza di KDE = non-compliance FSMA 204.
"""
REQUIRED_KDE = {
'shipping': ['tlc', 'quantity_shipped', 'ship_date', 'receiver_name'],
'receiving': ['tlc_received', 'quantity_received', 'receive_date'],
'initial_packing': ['tlc', 'pack_date', 'quantity_packed', 'facility_fda_id'],
'growing': ['tlc', 'harvest_date', 'field_id', 'grower_fda_facility_id'],
'transformation': ['new_tlc', 'input_tlcs', 'transformation_date'],
}
def __init__(self):
super().__init__("missing_kde", "Missing KDE Fields", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
event_type = event['event_type']
required = self.REQUIRED_KDE.get(event_type, [])
if not required:
return None
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
missing = [f for f in required if f not in kde or kde[f] is None]
if missing:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"KDE Mancanti - CTE {event_type}",
"description": (
f"CTE {event_type} per TLC {event.get('tlc')} "
f"manca dei KDE obbligatori: {', '.join(missing)}"
),
"lot_id": event['lot_id'],
"data": {"missing_fields": missing, "event_type": event_type}
}
return None
# ── Notification System ──────────────────────────────────────
class MultiChannelNotifier:
def __init__(self, config: dict):
self.config = config
self.twilio = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
async def send_alert(self, alert_data: dict, rule_config: dict):
channels = rule_config.get('channels', ['email'])
severity = alert_data['severity']
tasks = []
if 'email' in channels:
tasks.append(self._send_email(alert_data, rule_config))
if 'sms' in channels and severity in ('high', 'critical'):
tasks.append(self._send_sms(alert_data, rule_config))
if 'webhook' in channels:
tasks.append(self._send_webhook(alert_data, rule_config))
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
logger.error(f"Notification failed: {r}")
async def _send_email(self, alert: dict, config: dict):
recipients = config.get('recipients', {}).get('email', [])
msg = MIMEText(
f"FSMA 204 Alert\n\n"
f"Severity: {alert['severity'].upper()}\n"
f"Title: {alert['title']}\n"
f"Description: {alert['description']}\n"
f"Lot ID: {alert['lot_id']}\n"
f"Timestamp: {datetime.utcnow().isoformat()}"
)
msg['Subject'] = f"[FSMA204-{alert['severity'].upper()}] {alert['title']}"
msg['From'] = self.config['smtp_from']
msg['To'] = ', '.join(recipients)
# In produzione usare asyncio-friendly SMTP
with smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.sendmail(self.config['smtp_from'], recipients, msg.as_string())
async def _send_sms(self, alert: dict, config: dict):
phones = config.get('recipients', {}).get('phone', [])
message_body = (
f"FSMA204 ALERT {alert['severity'].upper()}: "
f"{alert['title']} - Lot: {alert['lot_id']}"
)
for phone in phones:
self.twilio.messages.create(
body=message_body,
from_=self.config['twilio_from'],
to=phone
)
async def _send_webhook(self, alert: dict, config: dict):
webhook_url = config.get('recipients', {}).get('webhook_url')
if not webhook_url:
return
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"event": "fsma204_alert",
"severity": alert['severity'],
"data": alert
},
timeout=10.0
)
# ── Main check function ──────────────────────────────────────
ACTIVE_RULES = [
ColdChainBreachRule(),
MissingKDERule(),
# Aggiungere: SupplierBlacklistRule(), LotAgeRule(), etc.
]
async def check_alert_rules(event: dict, lot_id: str):
"""Chiamato come background task dopo ogni registrazione CTE"""
async with get_db_session() as db:
for rule in ACTIVE_RULES:
try:
alert_data = await rule.evaluate(event, db)
if alert_data:
# Salva alert su DB
await db.execute(
text("""
INSERT INTO alerts (rule_id, lot_id, severity, title, description, data)
SELECT ar.id, :lot_id, :severity, :title, :description, :data::jsonb
FROM alert_rules ar WHERE ar.name = :rule_name
"""),
{
"lot_id": lot_id,
"severity": alert_data['severity'],
"title": alert_data['title'],
"description": alert_data['description'],
"data": json.dumps(alert_data.get('data', {})),
"rule_name": rule.name
}
)
await db.commit()
# Auto-trigger recall se critical
if alert_data['severity'] == AlertSeverity.CRITICAL:
await auto_trigger_recall(lot_id, alert_data, db)
logger.warning(
f"Alert generato: {alert_data['severity']} - {alert_data['title']}"
)
except Exception as e:
logger.error(f"Error evaluating rule {rule.rule_id}: {e}", exc_info=True)
Zarządzanie wycofaniem: pełny przepływ pracy w mniej niż 24 godziny
Najbardziej rygorystycznym wymaganiem FSMA 204 jest możliwość wykonania śledzenia wstecznego i śledzenia w przód oraz dostarczenia listy umożliwiającej sortowanie do FDA przez 24 godziny od zgłoszenia. Zautomatyzowany przepływ pracy przy przypominaniu to jedyny sposób, aby zagwarantować działanie na dużą skalę zgodnie z umową SLA. W 2024 r. FDA odnotowała 241 przypadków wycofania żywności, a średni czas realizacji wyniósł 73 godziny, czyli znacznie poza wymaganiami nowej zasady.
# fsma204/recall_manager.py
# Recall Management con workflow automatizzato
import asyncio
import json
import logging
from datetime import datetime
from uuid import UUID, uuid4
from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import networkx as nx
logger = logging.getLogger(__name__)
class RecallWorkflow:
"""
Gestisce il workflow completo di un recall FSMA 204.
Fasi:
1. IDENTIFIED - Identificazione del problema
2. SCOPE_DETERMINED - Calcolo scope (traceback + traceforward)
3. NOTIFICATIONS_SENT - Notifiche a tutti gli operatori coinvolti
4. REMOVAL_IN_PROGRESS - Rimozione dal mercato
5. EFFECTIVENESS_CHECK - Verifica efficacia
6. CLOSED / FDA_REPORTED
"""
def __init__(self, db: AsyncSession, notifier):
self.db = db
self.notifier = notifier
async def initiate_recall(
self,
trigger_lot_ids: List[UUID],
trigger_reason: str,
trigger_description: str,
initiated_by: str,
recall_type: str = "class_i"
) -> dict:
"""
Avvia un recall. Questo metodo deve completare
traceback + traceforward + scope in < 30 minuti
per rispettare il requisito 24h FDA complessivo.
"""
recall_number = self._generate_recall_number()
start_time = datetime.utcnow()
logger.info(
f"Recall {recall_number} avviato da {initiated_by} "
f"per {len(trigger_lot_ids)} lotti"
)
# Fase 1: Crea record recall
recall_id = await self._create_recall_record(
recall_number, trigger_lot_ids, trigger_reason,
trigger_description, initiated_by, recall_type
)
# Fase 2: Determina scope (traceback + traceforward)
scope_start = datetime.utcnow()
affected_lots = await self._determine_scope(trigger_lot_ids)
scope_seconds = int((datetime.utcnow() - scope_start).total_seconds())
# Fase 3: Aggiorna recall con scope e stati
await self._update_recall_scope(recall_id, affected_lots, scope_seconds)
# Fase 4: Genera FDA Recall Notice
fda_report = await self._generate_fda_report(
recall_number, trigger_lot_ids, affected_lots,
trigger_reason, trigger_description
)
# Fase 5: Marca lotti come recalled
await self._mark_lots_recalled(affected_lots)
# Fase 6: Invia notifiche a tutti gli operatori
notify_start = datetime.utcnow()
await self._send_recall_notifications(affected_lots, recall_number, trigger_description)
notify_seconds = int((datetime.utcnow() - notify_start).total_seconds())
# Aggiorna timestamp workflow
await self.db.execute(
text("""
UPDATE recall_events SET
scope_determined_at = NOW(),
notifications_sent_at = NOW(),
status = 'notifications_sent',
fda_report_json = :fda_report::jsonb,
traceback_seconds = :traceback_seconds,
lots_traced_pct = :lots_pct
WHERE id = :id
"""),
{
"id": str(recall_id),
"fda_report": json.dumps(fda_report),
"traceback_seconds": scope_seconds,
"lots_pct": 100.0 # aggiornare con verifica reale
}
)
await self.db.commit()
total_seconds = int((datetime.utcnow() - start_time).total_seconds())
logger.info(
f"Recall {recall_number} scope determinato in {scope_seconds}s, "
f"notifiche in {notify_seconds}s, totale {total_seconds}s"
)
return {
"recall_number": recall_number,
"recall_id": str(recall_id),
"affected_lots_count": len(affected_lots),
"scope_determination_seconds": scope_seconds,
"notification_seconds": notify_seconds,
"total_seconds": total_seconds,
"fda_report": fda_report
}
async def _determine_scope(self, trigger_lot_ids: List[UUID]) -> List[str]:
"""
Determina scope usando graph traversal.
Usa NetworkX per BFS su grafo di genealogia.
"""
# Costruisce grafo da DB
G = await self._build_traceability_graph()
affected = set()
for lot_id in trigger_lot_ids:
lot_str = str(lot_id)
if lot_str not in G:
continue
# Traceback: tutti gli antenati (upstream)
ancestors = nx.ancestors(G, lot_str)
affected.update(ancestors)
# Traceforward: tutti i discendenti (downstream)
descendants = nx.descendants(G, lot_str)
affected.update(descendants)
affected.add(lot_str)
return list(affected)
async def _build_traceability_graph(self) -> nx.DiGraph:
"""
Costruisce un grafo orientato della genealogia lotti
usando gli eventi CTE di tipo shipping/receiving/transformation.
"""
G = nx.DiGraph()
# Nodi: tutti i lotti attivi
lots_result = await self.db.execute(
text("SELECT id, tlc, status FROM traceability_lots WHERE status != 'archived'")
)
for row in lots_result.fetchall():
G.add_node(str(row.id), tlc=row.tlc, status=row.status)
# Archi: da shipping events
edges_result = await self.db.execute(
text("""
SELECT
s.lot_id AS from_lot,
r.lot_id AS to_lot
FROM cte_events s
JOIN cte_events r ON s.reference_document_number = r.reference_document_number
WHERE s.event_type = 'shipping'
AND r.event_type = 'receiving'
""")
)
for row in edges_result.fetchall():
G.add_edge(str(row.from_lot), str(row.to_lot))
# Archi: da transformation events
transform_result = await self.db.execute(
text("""
SELECT lot_id, related_lot_ids
FROM cte_events
WHERE event_type = 'transformation'
""")
)
for row in transform_result.fetchall():
for parent_id in (row.related_lot_ids or []):
G.add_edge(str(parent_id), str(row.lot_id))
return G
async def _generate_fda_report(
self,
recall_number: str,
trigger_lots: List[UUID],
affected_lots: List[str],
reason: str,
description: str
) -> dict:
"""
Genera il recall notice in formato FDA.
Struttura basata su FDA 21 CFR 7.46.
"""
# Recupera dettagli lotti coinvolti
lots_result = await self.db.execute(
text("""
SELECT tl.tlc, tl.initial_quantity, tl.unit_of_measure,
fi.name AS food_name, fi.ftl_category,
l.name AS origin_name, l.fda_facility_id,
tl.production_date, tl.best_before_date
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
JOIN locations l ON tl.origin_location_id = l.id
WHERE tl.id = ANY(:affected)
"""),
{"affected": affected_lots}
)
affected_details = [dict(r._mapping) for r in lots_result.fetchall()]
return {
"recall_number": recall_number,
"report_date": datetime.utcnow().isoformat(),
"regulation": "FSMA Section 204 - 21 CFR Part 1 Subpart S",
"recall_type": "Voluntary",
"reason": reason,
"description": description,
"affected_products": affected_details,
"total_affected_lots": len(affected_lots),
"fsma_compliance": {
"rule": "Food Traceability Final Rule",
"response_format": "Electronic Sortable",
"24h_compliance": True
}
}
def _generate_recall_number(self) -> str:
year = datetime.utcnow().year
unique = str(uuid4())[:8].upper()
return f"RECALL-{year}-{unique}"
async def _create_recall_record(
self, recall_number, trigger_lot_ids, reason,
description, initiated_by, recall_type
) -> UUID:
result = await self.db.execute(
text("""
INSERT INTO recall_events (
recall_number, recall_type, trigger_lot_ids,
trigger_reason, trigger_description, initiated_by
) VALUES (
:number, :type, :lots::uuid[],
:reason, :desc, :by
)
RETURNING id
"""),
{
"number": recall_number,
"type": recall_type,
"lots": [str(l) for l in trigger_lot_ids],
"reason": reason,
"desc": description,
"by": initiated_by
}
)
row = result.fetchone()
await self.db.commit()
return row.id
async def _mark_lots_recalled(self, affected_lot_ids: List[str]):
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'recalled', updated_at = NOW()
WHERE id = ANY(:ids::uuid[])
"""),
{"ids": affected_lot_ids}
)
await self.db.commit()
async def _update_recall_scope(self, recall_id, affected_lots, scope_seconds):
await self.db.execute(
text("""
UPDATE recall_events SET
affected_lot_ids = :lots::uuid[],
status = 'scope_determined',
traceback_seconds = :seconds
WHERE id = :id
"""),
{
"lots": affected_lots,
"seconds": scope_seconds,
"id": str(recall_id)
}
)
await self.db.commit()
async def _send_recall_notifications(
self,
affected_lot_ids: List[str],
recall_number: str,
description: str
):
"""Notifica tutti gli operatori che hanno movimentato i lotti coinvolti"""
# Recupera contatti di tutti i location coinvolti
contacts_result = await self.db.execute(
text("""
SELECT DISTINCT l.contact_email, l.contact_phone, l.name
FROM cte_events ce
JOIN locations l ON ce.location_id = l.id
WHERE ce.lot_id = ANY(:lots::uuid[])
AND l.contact_email IS NOT NULL
"""),
{"lots": affected_lot_ids}
)
for contact in contacts_result.fetchall():
alert_payload = {
"severity": "critical",
"title": f"RECALL NOTICE {recall_number}",
"description": (
f"I lotti che avete ricevuto/spedito sono coinvolti nel recall {recall_number}. "
f"Motivo: {description}. "
"Sospendere immediatamente la distribuzione e contattare il responsabile."
),
"lot_id": "multiple"
}
rule_config = {
"channels": ["email"],
"recipients": {
"email": [contact.contact_email],
"phone": [contact.contact_phone] if contact.contact_phone else []
}
}
try:
await self.notifier.send_alert(alert_payload, rule_config)
except Exception as e:
logger.error(f"Failed to notify {contact.name}: {e}")
Próbne przypomnienie: okresowe testowanie wydajności
FDA zdecydowanie zaleca m.in próbne ćwiczenie przypominające — okresowe symulacje procesu wycofania w celu sprawdzenia, czy system spełnia wymóg 24-godzinny. Wiertarkę należy przeprowadzać nie rzadziej niż co 6 miesięcy, a wyniki dokumentować na potrzeby audytów. Kluczowe wskaźniki to: całkowity czas realizacji, procent śledzonych partii, czas potrzebny na określenie zakresu.
# fsma204/mock_recall_drill.py
# Mock recall drill con metriche performance
import asyncio
import random
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
logger = logging.getLogger(__name__)
class MockRecallDrill:
"""
Simula un recall scenario per testare le performance del sistema.
Il drill usa lotti MOCK (non production) con dati sintetici.
"""
def __init__(self, db, recall_workflow, lot_count: int = 100):
self.db = db
self.workflow = recall_workflow
self.lot_count = lot_count
self.drill_id = f"DRILL-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
async def run_drill(self) -> Dict[str, Any]:
"""
Esegue il drill completo e ritorna le metriche.
"""
logger.info(f"Avvio mock recall drill {self.drill_id}")
metrics = {
"drill_id": self.drill_id,
"started_at": datetime.utcnow().isoformat(),
"phases": {}
}
# Fase 1: Seed dati mock
phase_start = time.monotonic()
mock_lots = await self._seed_mock_data()
metrics['phases']['data_seeding'] = {
"duration_seconds": round(time.monotonic() - phase_start, 2),
"lots_created": len(mock_lots)
}
# Fase 2: Seleziona lotto trigger casuale
trigger_lot = random.choice(mock_lots[10:40]) # Dal mezzo della supply chain
logger.info(f"Drill trigger lot: {trigger_lot['tlc']}")
# Fase 3: Esegue recall
phase_start = time.monotonic()
recall_result = await self.workflow.initiate_recall(
trigger_lot_ids=[trigger_lot['id']],
trigger_reason="pathogen_contamination",
trigger_description=f"[DRILL] Mock Listeria contamination test - {self.drill_id}",
initiated_by="recall_drill_system",
recall_type="class_i"
)
recall_duration = time.monotonic() - phase_start
metrics['phases']['recall_execution'] = {
"duration_seconds": round(recall_duration, 2),
"scope_seconds": recall_result['scope_determination_seconds'],
"notification_seconds": recall_result['notification_seconds'],
"affected_lots": recall_result['affected_lots_count'],
"recall_number": recall_result['recall_number']
}
# Fase 4: Verifica compliance SLA
sla_24h_seconds = 24 * 3600
compliance_ok = recall_duration < sla_24h_seconds
metrics['sla_compliance'] = {
"target_seconds": sla_24h_seconds,
"actual_seconds": round(recall_duration, 2),
"compliant": compliance_ok,
"margin_hours": round((sla_24h_seconds - recall_duration) / 3600, 1)
}
# Fase 5: Cleanup dati mock
await self._cleanup_mock_data(mock_lots)
metrics['completed_at'] = datetime.utcnow().isoformat()
metrics['overall_pass'] = compliance_ok
self._log_drill_report(metrics)
return metrics
async def _seed_mock_data(self) -> list:
"""Crea lotti sintetici con CTE events per il drill"""
from sqlalchemy import text
mock_lots = []
for i in range(self.lot_count):
tlc = f"MOCK-{self.drill_id}-LOT-{i:04d}"
lot_data = {
"id": None,
"tlc": tlc,
"is_mock": True
}
# In produzione: inserire tramite lot creation API
# Per semplicità usiamo insert diretto nel drill
mock_lots.append(lot_data)
logger.info(f"Creati {len(mock_lots)} lotti mock per drill {self.drill_id}")
return mock_lots
async def _cleanup_mock_data(self, mock_lots: list):
"""Rimuove dati mock dopo il drill"""
from sqlalchemy import text
tlcs = [l['tlc'] for l in mock_lots]
# I mock TLC iniziano con "MOCK-" per identificazione sicura
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'archived'
WHERE tlc LIKE 'MOCK-%' AND tlc = ANY(:tlcs)
"""),
{"tlcs": tlcs}
)
await self.db.commit()
logger.info(f"Cleanup drill {self.drill_id} completato")
def _log_drill_report(self, metrics: dict):
report_lines = [
f"",
f"=== MOCK RECALL DRILL REPORT ===",
f"Drill ID: {metrics['drill_id']}",
f"Started: {metrics['started_at']}",
f"",
f"FASI:",
]
for phase, data in metrics.get('phases', {}).items():
report_lines.append(f" {phase}: {json.dumps(data)}")
sla = metrics.get('sla_compliance', {})
status = "PASS ✓" if sla.get('compliant') else "FAIL ✗"
report_lines.extend([
f"",
f"SLA 24H COMPLIANCE: {status}",
f" Target: {sla.get('target_seconds')}s",
f" Actual: {sla.get('actual_seconds')}s",
f" Margin: {sla.get('margin_hours')}h",
f"",
f"OVERALL: {'PASS' if metrics.get('overall_pass') else 'FAIL'}",
f"================================",
])
logger.info('\n'.join(report_lines))
Porównanie przepisów: FSMA 204 vs Reg. UE 178/2002 a brytyjska ustawa o bezpieczeństwie żywności
Dla operatorów eksportujących na wiele rynków kluczowe znaczenie ma zrozumienie różnic między ramami regulacyjnymi. Chociaż wyznają zasadę „jeden krok w górę, jeden krok w dół”, różnią się znacznie pod względem szczegółowości i sankcji.
| Rozmiar | FSMA 204 (USA) | Rejestracja UE 178/2002 | Brytyjska ustawa o bezpieczeństwie żywności z 1990 r. + rozp. |
|---|---|---|---|
| Miotły | Tylko żywność FTL (konkretna lista) | Cała żywność i pasza | Wszystkie produkty spożywcze w Wielkiej Brytanii |
| Szczegółowość | CTE + KDE specyficzne dla wydarzenia | Ogólny krok w górę / jeden krok w dół | Jeden krok w górę / jeden krok w dół |
| Czas reakcji FDA/organu | 24 godziny (elektroniczna lista sortowalna) | Brak konkretnych warunków | Brak konkretnych warunków |
| Format zapisu | Elektroniczne, zamawialne, interoperacyjne | Dowolny udokumentowany format | Dowolny udokumentowany format |
| Zatrzymanie | 24 miesiące | Zmienna według kategorii | Zmienna według kategorii |
| Sankcje | Do 10 000 USD dziennie za każde naruszenie | Różnią się one w zależności od państwa członkowskiego | Do 20 000 GBP + kara więzienia |
| Aplikacja dla importerów | Tak: dotyczy to każdego, kto importuje do USA | Tak: dla operatorów na rynku UE | Tak: dla operatorów na rynku brytyjskim |
| Standardy technologiczne | Brak mandatu (ale zalecany GS1) | Żadnego nakazu | Żadnego nakazu |
Praktyczne implikacje dla eksporterów
Włoska firma eksportująca świeżą mozzarellę do USA musi się do tego zastosować Jest Rejestracja UE 178/2002 niż FSMA 204. Ponieważ FSMA 204 jest bardziej rygorystyczna (specyficzne CTE/KDE, czas reakcji 24h, format elektroniczny), system zgodny z FSMA 204 jest automatycznie również zgodny z rozporządzeniem UE. Optymalną strategią jest wdrożenie FSMA 204 jako punktu odniesienia i dostosowanie go do wymagań UE/Wielka Brytania.
Wpływ na włoskich eksporterów do USA
Włochy są trzecim co do wielkości eksporterem produktów spożywczych i napojów do USA (po Kanadzie i Meksyku), z wartością ok 8 miliardów euro w 2024 r. Główne kategorie, na które wpływa FSMA 204:
| Kategoria | Eksport IT-USA 2024 (szac.) | Zasięg FSMA 204 | Wymagane działanie |
|---|---|---|---|
| Wino | ~2,1 miliarda dolarów | Nie (nie w FTL) | Tylko rejestracja FDA |
| Dojrzałe sery (Parmezan, Grana, Pecorino) | ~350 milionów dolarów | Nie (z wyłączeniem osób doświadczonych) | Tylko rejestracja FDA + wcześniejsze powiadomienie |
| Sery świeże (mozzarella, burrata, ricotta) | ~120 milionów dolarów | TAK – priorytet krytyczny | Pełna implementacja CTE/KDE |
| Oliwa z oliwek | ~706 milionów dolarów | No | Tylko rejestracja FDA |
| Przetwory warzywne (pomidory, przeciery) | ~200 milionów dolarów | Częściowe (świeże pomidory w FTL) | Sprawdź, czy składniki pozostają świeże |
Oszacowanie kosztów przestrzegania przepisów dla włoskiego MŚP z branży spożywczej
| Część | Konfiguracja (jednorazowa) | Roczny koszt operacyjny |
|---|---|---|
| Możliwość śledzenia oprogramowania (SaaS lub niestandardowe) | 15 000 € – 80 000 € | 8 000–20 000 euro rocznie |
| Istniejąca integracja ERP/MES | 10 000 EUR – 40 000 EUR | 2000–5000 euro rocznie |
| Sprzęt (skanery, czytniki RFID, czujniki) | 5 000–25 000 euro | 1000–3000 EUR rocznie (utrzymanie) |
| Trening personalny | 3000 € – 10 000 € | 1500–3000 euro rocznie |
| Porada prawna/zgodność z FDA | 5 000 – 20 000 euro | 3000–8000 euro rocznie |
| Szacunkowa suma | 38 000 EUR – 175 000 EUR | 15 500–39 000 euro rocznie |
Dostępne finansowanie: PNRR Transition 5.0
Włoskie MŚP z branży spożywczej mogą uzyskać dostęp do funduszy PNRR Transition 5.0 (przydzielonych 12,7 miliardów euro, ulga podatkowa do 45%) na pokrycie kosztów wdrożenia systemów cyfrowej identyfikowalności. Systemy FSMA 204 kwalifikują się jako „cyfrowość procesów produkcyjnych”, jeśli integrują czujniki IoT do monitorowania łańcucha chłodniczego. Sprawdź możliwość zastosowania u swojego księgowego.
Plan działania dotyczący zgodności Zalecany dla włoskich MŚP
- Miesiące 1-2: Analiza luk — mapuj istniejące przepływy, identyfikuj produkty FTL, oceniaj obecne systemy IT
- Miesiące 3-4: Projekt bazy danych + rozwój API (użyj kodu z tego artykułu jako punktu odniesienia)
- Miesiące 5-6: Integracja z istniejącym systemem ERP (SAP, Dynamics, Sage), szkolenia operatorów
- Miesiące 7-8: Pilotuj produkt/linię, sprawdź kompletność KDE, pierwsze próbne ćwiczenie
- Miesiące 9-10: Kompletne wdrożenie, aktywny silnik alertów, dokumentacja FDA
- Miesiące 11-12: Drugie ćwiczenie próbne, dostrajanie, przygotowanie do audytu FDA
Najlepsze praktyki i anty-wzorce
Najlepsze praktyki
- Unikalna i jednoznaczna TLC: unikać wizualnie podobnych znaków (I/l/1, O/0). Użyj GS1-128 lub SSCC jako standardu interoperacyjności z amerykańskim łańcuchem dostaw
- Niezmienność gwarantowana na poziomie DB: używaj wyzwalaczy PostgreSQL, które uniemożliwiają UPDATE/DELETE na cte_events, a nie tylko konwencje kodu
- Hash zdarzenia dla ścieżki audytu: SHA-256 ładunku w momencie tworzenia — pozwala sprawdzić, czy rekordy nie zostały zmienione
- Regularne 24-godzinne badanie ALS: przeprowadzaj ćwiczenia próbne co 6 miesięcy z danymi syntetycznymi i dokumentuj wyniki
- Synchronizacja z GS1: użyj GLN (globalnego numeru lokalizacji) dla identyfikatorów lokalizacji — kompatybilny z systemami partnerskimi EDI i USA
- Wersja KDE: FSMA 204 może ewoluować; zachowaj pole kde_schema_version w swoich rekordach, aby ułatwić migrację
Anty-wzorce, których należy unikać
- KDE tylko na papierze: wiele firm przechowuje KDE w arkuszach PDF lub Excel. W przypadku tych systemów nie jest możliwe spełnienie wymogu „elektronicznego formatu umożliwiającego sortowanie” w ciągu 24 godzin
- Niestandardowy TLC: używanie wewnętrznych numerów partii, które nie są przekazywane partnerom na niższym szczeblu łańcucha dostaw, uniemożliwia śledzenie między firmami
- Alarm bez eskalacji: system ostrzegania, który wysyła e-maile, ale nie ma protokołu eskalacji w przypadku problemów krytycznych (brak odpowiedzi w ciągu 2 godzin → automatyczne połączenie) nie przestrzega ducha FSMA
- Tylko wewnętrzna identyfikowalność: FSMA 204 wymaga od partnerów jednego kroku w górę ORAZ jednego kroku w dół. Sam system wewnętrzny nie wystarczy
- Błąd synchronizacji UTC: Sygnatury czasowe CTE muszą być podane w formacie UTC lub jawnie przesunięte. Niejednoznaczne znaczniki czasu uniemożliwiają odtworzenie harmonogramu w przypadku wycofania międzynarodowego
Wnioski i dalsze kroki
Zasada 204 FSMA oznacza zmianę paradygmatu: od identyfikowalności w formie dokumentu papierowego do identyfikowalna jako infrastruktura oprogramowania działająca w czasie rzeczywistym. System, który zbudowaliśmy w tym artykule — PostgreSQL do przechowywania danych, FastAPI dla warstwy API, Kafka do strumieniowego przesyłania zdarzeń, NetworkX do przeglądania grafów — jest w stanie spełnić najważniejsze wymaganie: pełną sortowalną listę przesyłaną do FDA w ciągu 24 godzin od każdego żądania.
Dobra wiadomość dla włoskich firm: przedłużenie terminu do lipca 2028 r. zapewnia wystarczająco dużo czasu na stopniowe wdrożenie. Zła wiadomość: firmy, które czekają z rozpoczęciem działalności do 2027 r., znajdą rynek konsultantów i dostawców oprogramowania po zawyżonych cenach i długich kolejkach oczekujących.
Dane za 2024 r. mówią jasno: 241 wycofań żywności przez FDA, 1392 chorych, liczba hospitalizacji wzrosła ponad dwukrotnie w porównaniu z 2023 r. Szczegółowa identyfikowalność to nie biurokracja – to zapobieganie mierzalne w życiu ludzkim.
Seria FoodTech: Następny artykuł
Następny artykuł z tej serii to 07 — Rolnictwo pionowe: deska rozdzielcza i zautomatyzowane sterowanie, gdzie zbudujemy system sterowania w czasie rzeczywistym dla gospodarstw hydroponicznych: czujniki IoT (światło, CO2, pH, EC), dashboardy Streamlit i algorytmy optymalizacyjne maksymalizujące plon z metra kwadratowego. Bądź na bieżąco na fedicocalo.dev.
Między seriami: powiązane spostrzeżenia
- MLOps (seria 12): jak wdrożyć modele wykrywania anomalii silnika alertów w środowisku produkcyjnym za pomocą MLflow
- Inżynieria AI (seria 13): RAG na temat dokumentacji FDA — LLM, który odpowiada na pytania dotyczące zgodności, cytując prawidłowy tekst przepisów
- PostgreSQL AI (seria 22): pgvector do wyszukiwania podobieństw w profilach zanieczyszczeń — znajduje partie o cechach podobnych do tych, które występowały w historycznych epidemiach
- Interfejs DevOps (seria 9): Potok CI/CD do ciągłego wdrażania interfejsu API FSMA 204 w Kubernetes







