FSMA 204 Automation: Tracking, Alert and Recall přes Python
20. leden 2026 měl znamenat předěl v historii americké bezpečnosti potravin. Tam Pravidlo FSMA 204 – Konečné pravidlo sledovatelnosti potravin FDA požadoval, aby všichni provozovatelé v potravinovém dodavatelském řetězci zavedli systémy granulované sledovatelnosti pro desítky vysoce rizikových kategorií potravin. Bezprecedentní regulační aktualizace zákona o modernizaci bezpečnosti potravin z roku 2011 s globálními dopady: v hledáčku byl kdokoli, kdo vyvážel na americký trh – včetně italských společností vyrábějících víno, sýr a olivový olej.
V březnu 2025 FDA prodloužila lhůtu o 30 měsíců (do 20. července 2028), ale směr je jednoznačný: granulární sledovatelnost u vysoce rizikových potravin již není volitelná. Společnosti, které začnou implementovat dnes, budou mít obrovskou konkurenční výhodu, až bude dodržování povinné. V tomto článku vytváříme kompletní systém v Pythonu – od návrhu databáze po REST API, od systému výstrah až po správu odvolání – který splňuje požadavky FSMA 204.
Co se naučíte
- Kompletní struktura FSMA 204: Food Traceability List, CTE a KDE
- Návrh databáze PostgreSQL pro víceúrovňovou sledovatelnost
- FastAPI REST API s modely Pydantic pro registraci CTE a dotazy typu one-up/one-down
- Immutable Event Sourcing s Apache Kafka pro kompletní audit trail
- Výstražný modul pro anomálie (studený řetězec, podezřelé dávky) s vícekanálovými upozorněními
- Pracovní postup správy stažení: zpětné sledování/dopředné sledování za méně než 24 hodin (požadavek FDA)
- Předstírané cvičení s metrikami výkonu
- Srovnání FSMA 204 vs EU Reg. 178/2002 a dopad na italské exportéry
Řada FoodTech: Kde jsme
Toto je šestý článek ze série FoodTech na federicocalo.dev. Here is the complete map:
| # | Titul | Úroveň | Stát |
|---|---|---|---|
| 01 | IoT Pipeline pro přesné zemědělství | Střední | Publikováno |
| 02 | Počítačová vize pro kontrolu kvality potravin | Střední | Publikováno |
| 03 | ML a Edge Computing pro predikci plodin | Moderní | Publikováno |
| 04 | Blockchain a transparentní dodavatelský řetězec potravin | Moderní | Publikováno |
| 05 | Předpovídání poptávky pro velký maloobchod s Prophet a LSTM | Moderní | Publikováno |
| 06 | FSMA 204 Automation: Tracking, Alert and Recall přes Python | Moderní | JSTE TADY |
| 07 | Vertikální zemědělství: Dashboard a automatické řízení | Střední | Brzy |
| 08 | Satelitní API pro sledování plodin | Moderní | Brzy |
| 09 | Dashboard od farmy po vidličku s funkcí Streamlit | Střední | Brzy |
| 10 | Odolnost dodavatelského řetězce: Optimalizace pomocí OR-Tools | Moderní | Brzy |
FSMA Rule 204: Nejvýznamnější regulační změna od roku 2011
Zákon o modernizaci bezpečnosti potravin (FSMA) z roku 2011 změnil přístup FDA k bezpečnosti potravin z reaktivního (reagujícího na kontaminaci) na preventivní (zabraňující jí). Oddíl 204 delegoval na FDA pravomoc identifikovat vysoce rizikové potraviny a ukládat další požadavky na sledovatelnost. Výsledkem je Konečné pravidlo sledovatelnosti potravin, zveřejněné v listopadu 2022.
Cíl je radikální: v případě propuknutí nebo kontaminace musí být operátoři schopni identifikovat celý distribuční řetězec šarže – od pole až po vidličku – a dodat záznamy FDA v elektronickém formátu, který si lze objednat 24 hodin od požadavku. Před FSMA 204 tento proces trval v průměru 7-10 dní. Dopad na snížení počtu nemocí a úmrtí způsobených kontaminovanými potravinami odhaduje FDA na stovky případů, kterým se ročně zabrání.
Seznam sledovatelnosti potravin (FTL): Zapojené potraviny
FTL zahrnuje potraviny historicky spojené s vážnými ohnisky. Hlavní kategorie jsou:
| Kategorie | Konkrétní příklady | Primární riziko |
|---|---|---|
| Čerstvé sýry | Měkká/poloměkká nevyzrálá, čerstvá mozzarella, ricotta | Listeria monocytogenes |
| Vejce ve skořápce | Nepasterizovaná slepičí vejce | Salmonella Enteritidis |
| Mořské plody - ryby | Tuňák, losos, treska, mečoun (čerstvý/mražený) | Scombroid, Listeria |
| Korýši | Krevety, krab, humr | Vibrio, Norovirus |
| Mlži | Ústřice, škeble, mušle | Norovirus, Vibrio |
| RTE saláty | Bramborový salát, vejce, mořské plody | Listeria, Salmonella |
| Čerstvé ovoce a zelenina | Okurky, bylinky, listová zelenina, melouny, papriky, klíčky, rajčata, tropické ovoce | E. coli O157:H7, Salmonella |
| Ořechové máslo | Arašídové máslo, mandle, kešu | Salmonella |
Důležitá poznámka pro italské exportéry
I čerstvé sýry (mozzarella, burrata, ricotta, stracchino) jsou výslovně v FTL. Italské společnosti, které vyvážejí tyto produkty do USA, musí zavést sledovatelnost FSMA 204 pro všechny šarže určené pro americký trh. Tvrdé zrající sýry (Parmigiano Reggiano, Grana Padano, Pecorino Romano) v současné FTL nejsou.
Kritické události sledování (CTE)
CTE jsou momenty v dodavatelském řetězci, ve kterých musí být zaznamenána sledovatelnost. FSMA 204 definuje šest hlavních, každý se specifickými KDE:
| CTE | Popis | Použitelné na |
|---|---|---|
| Rostoucí | Sběr čerstvých produktů přímo z terénu | Ovoce a zelenina, bylinky |
| Sklizeň | Sklizeň nepěstovaných potravin (měkkýši, volně žijící ryby) | Divoké mořské plody |
| Chlazení | První operace chlazení po sklizni | Ovoce a zelenina, mořské plody |
| Počáteční balení | Poprvé je produkt zabalen v konečné podobě | Všechny položky FTL |
| Doprava | Jakýkoli převod vlastnictví nebo úschovy | Všechny položky FTL |
| Příjem | Příjem FTL položek od jiného operátora | Všechny položky FTL |
| Transformace | Když je položka FTL začleněna do nového produktu | Transformátory/výrobci |
Klíčové datové prvky (KDE) pro CTE
Pro každé CTE pravidlo definuje povinné KDE. Příklad pro Doprava CTE:
- Kód šarže sledovatelnosti (TLC) – jedinečný identifikátor šarže
- Dodané množství a měrná jednotka
- Datum odeslání
- Popis umístění místa odeslání (odkaz na zdroj TLC)
- Referenční číslo přepravního dokladu
- Jméno a adresa příjemce
- Popis potraviny (včetně registračního čísla potravinářského zařízení FDA)
Pro Příjem CTE:
- TLC přijato (z předchozí zásilky)
- Přijaté množství
- Datum přijetí
- Umístění místa příjmu
- Odkaz na přepravní doklad
- Zdroj TLC (původ šarže)
Architektura FSMA 204 Compliance System
Systém shody FSMA 204 ve výrobě musí řídit tři hlavní toky: zachytit data v reálném čase během operací, zpracování a skladování událostí neměnným způsobem a rychlá odezva v případě žádosti FDA nebo odvolání. Architektura, kterou navrhujeme, je řízená událostmi a cloudová:
Zásobník technologií
- Vrstva API: FastAPI (Python 3.11+) — koncový bod REST pro registraci CTE
- Primární databáze: PostgreSQL 16 — relační schéma pro sledovatelnost
- Streamování události: Apache Kafka — události CTE jako neměnné zprávy
- Procházení grafu: NetworkX nebo Neo4j — traceback/traceforward dodavatelského řetězce
- Výstražný modul: Python + Celery + Redis — vícekanálová asynchronní pravidla
- Správa odvolání: Uložené procedury FastAPI + PostgreSQL
- Hlášení: automatické generování upozornění FDA na stažení v JSON/XML
Tok end-to-end:
- Sběr dat: operátoři zaznamenávají CTE přes mobilní/webové API nebo integraci ERP
- Zpracování události: každý CTE je ověřen, obohacen a zveřejněn na Kafka
- Graf sledovatelnosti: genealogický graf partií je aktualizován v reálném čase
- Výstražný modul: pravidla událostí detekují anomálie a posílají upozornění
- Správa odvolání: v případě kritické výstrahy automaticky zahájí pracovní postup odvolání
- Hlášení FDA: vygenerování tříditelného seznamu v elektronické podobě do 24 hodin
Návrh databáze PostgreSQL pro sledovatelnost FSMA 204
Schéma musí splňovat dva protichůdné požadavky: výkon pro každodenní provoz (vložení CTE, obchodní dotaz) e extrémní rychlost pro dotazy zpětného sledování v případě odvolání. Pro proměnné KDE pro CTE používáme PostgreSQL s kombinací relačních tabulek a indexů JSONB.
-- ============================================================
-- FSMA 204 Compliance Database Schema
-- PostgreSQL 16
-- ============================================================
-- Estensione per UUID generation
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "ltree"; -- Per path-based queries su genealogia
-- ============================================================
-- TABELLA: locations
-- Tutti i luoghi della filiera (aziende, magazzini, porti)
-- ============================================================
CREATE TABLE locations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fda_facility_id VARCHAR(12) UNIQUE, -- FDA Food Facility Registration Number
name VARCHAR(255) NOT NULL,
address_line1 VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
country_code CHAR(2) NOT NULL, -- ISO 3166-1 alpha-2
postal_code VARCHAR(20),
location_type VARCHAR(50) NOT NULL
CHECK (location_type IN (
'farm', 'packing_house', 'processor',
'distributor', 'retailer', 'port', 'cold_storage'
)),
contact_email VARCHAR(255),
contact_phone VARCHAR(50),
coordinates POINT, -- lat/lon per geo-queries
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: food_items
-- Catalogo alimenti FTL con classificazione FSMA
-- ============================================================
CREATE TABLE food_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
ftl_category VARCHAR(50) NOT NULL
CHECK (ftl_category IN (
'fresh_cheese', 'shell_eggs', 'finfish',
'crustaceans', 'bivalve_mollusks', 'nut_butter',
'rte_deli_salads', 'fresh_produce'
)),
name VARCHAR(255) NOT NULL,
description TEXT,
fda_product_code VARCHAR(7), -- FDA product code
unit_of_measure VARCHAR(20) NOT NULL DEFAULT 'kg',
storage_temp_min DECIMAL(5,2), -- Celsius
storage_temp_max DECIMAL(5,2),
shelf_life_days INTEGER,
is_ftl_covered BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: traceability_lots
-- Core: ogni lotto tracciabile con TLC (Traceability Lot Code)
-- ============================================================
CREATE TABLE traceability_lots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tlc VARCHAR(50) UNIQUE NOT NULL, -- Traceability Lot Code
food_item_id UUID NOT NULL REFERENCES food_items(id),
origin_location_id UUID NOT NULL REFERENCES locations(id),
-- Quantità e dimensione lotto
initial_quantity DECIMAL(12,4) NOT NULL,
remaining_quantity DECIMAL(12,4) NOT NULL,
unit_of_measure VARCHAR(20) NOT NULL,
-- Date critiche
production_date DATE,
harvest_date DATE,
best_before_date DATE,
expiry_date DATE,
-- Genealogia: da quali lotti parent e stato derivato
parent_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
genealogy_path ltree, -- Es: "LOT_A.LOT_B.LOT_C"
-- Stato del lotto
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN (
'active', 'recalled', 'quarantined',
'consumed', 'disposed', 'archived'
)),
-- Attributi FDA specifici
tlc_source_reference TEXT, -- Descrizione provenienza TLC
-- Metadati extra (variabili per tipo alimento)
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indice per query di genealogia rapida
CREATE INDEX idx_lots_genealogy ON traceability_lots USING GIST (genealogy_path);
CREATE INDEX idx_lots_status ON traceability_lots(status);
CREATE INDEX idx_lots_tlc ON traceability_lots(tlc);
CREATE INDEX idx_lots_food_item ON traceability_lots(food_item_id);
CREATE INDEX idx_lots_origin ON traceability_lots(origin_location_id);
CREATE INDEX idx_lots_attributes ON traceability_lots USING GIN (attributes);
-- ============================================================
-- TABELLA: cte_events
-- Registro immutabile di tutti i Critical Tracking Events
-- Questa tabella NON deve mai avere UPDATE o DELETE
-- ============================================================
CREATE TABLE cte_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'growing', 'harvesting', 'cooling',
'initial_packing', 'shipping', 'receiving',
'transformation'
)),
-- Lotti coinvolti
lot_id UUID NOT NULL REFERENCES traceability_lots(id),
related_lot_ids UUID[] DEFAULT ARRAY[]::UUID[], -- per transformation
-- Localizzazione evento
location_id UUID NOT NULL REFERENCES locations(id),
-- Timestamp evento (quando e accaduto, non quando e stato registrato)
event_timestamp TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW(),
recorded_by VARCHAR(255) NOT NULL, -- user/system che ha registrato
-- KDE come JSONB (struttura varia per tipo CTE)
kde JSONB NOT NULL,
-- Per spedizioni: destinatario
destination_location_id UUID REFERENCES locations(id),
-- Documento di riferimento (bill of lading, PO, etc.)
reference_document_number VARCHAR(100),
reference_document_type VARCHAR(50),
-- Quantità movimentata
quantity DECIMAL(12,4),
unit_of_measure VARCHAR(20),
-- Hash per immutabilita (SHA-256 del payload)
event_hash CHAR(64) NOT NULL,
-- Firma digitale opzionale
digital_signature TEXT,
-- Metadati
notes TEXT,
metadata JSONB DEFAULT '{}'
-- NO updated_at: questo record e immutabile
);
-- Gli eventi NON si modificano: trigger di protezione
CREATE RULE no_update_cte AS ON UPDATE TO cte_events DO INSTEAD NOTHING;
CREATE RULE no_delete_cte AS ON DELETE TO cte_events DO INSTEAD NOTHING;
CREATE INDEX idx_cte_lot ON cte_events(lot_id);
CREATE INDEX idx_cte_type ON cte_events(event_type);
CREATE INDEX idx_cte_timestamp ON cte_events(event_timestamp DESC);
CREATE INDEX idx_cte_location ON cte_events(location_id);
CREATE INDEX idx_cte_destination ON cte_events(destination_location_id);
CREATE INDEX idx_cte_kde ON cte_events USING GIN (kde);
-- ============================================================
-- TABELLA: recall_events
-- Gestione recall con workflow completo
-- ============================================================
CREATE TABLE recall_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
recall_number VARCHAR(20) UNIQUE NOT NULL, -- Es: "RECALL-2026-001"
recall_type VARCHAR(20) NOT NULL
CHECK (recall_type IN ('class_i', 'class_ii', 'class_iii', 'market_withdrawal')),
-- Root cause
trigger_lot_ids UUID[] NOT NULL, -- Lotti che hanno scatenato il recall
trigger_reason VARCHAR(50) NOT NULL
CHECK (trigger_reason IN (
'pathogen_contamination', 'allergen_undeclared',
'foreign_material', 'chemical_contamination',
'mislabeling', 'temperature_abuse', 'other'
)),
trigger_description TEXT NOT NULL,
-- Scope calcolato automaticamente
affected_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
affected_quantity DECIMAL(12,4),
affected_unit VARCHAR(20),
-- Workflow stato
status VARCHAR(30) NOT NULL DEFAULT 'identified'
CHECK (status IN (
'identified', 'scope_determined', 'notifications_sent',
'removal_in_progress', 'effectiveness_check',
'closed', 'fda_reported'
)),
-- Timestamp workflow
identified_at TIMESTAMPTZ DEFAULT NOW(),
scope_determined_at TIMESTAMPTZ,
notifications_sent_at TIMESTAMPTZ,
fda_reported_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
-- FDA notification (entro 24h da richiesta)
fda_notified BOOLEAN DEFAULT false,
fda_report_json JSONB,
-- Responsabile
initiated_by VARCHAR(255) NOT NULL,
-- Metriche drill
traceback_seconds INTEGER, -- Tempo per completare traceback
traceforward_seconds INTEGER,
lots_traced_pct DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alert_rules
-- Configurazione regole per alert engine
-- ============================================================
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
description TEXT,
rule_type VARCHAR(30) NOT NULL
CHECK (rule_type IN (
'temperature_breach', 'lot_age', 'missing_cte',
'quantity_discrepancy', 'supplier_blacklist',
'pathogen_alert', 'custom'
)),
condition_json JSONB NOT NULL, -- Regola in formato JSON
severity VARCHAR(10) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
channels TEXT[] NOT NULL, -- ['email', 'sms', 'webhook', 'slack']
recipients JSONB NOT NULL, -- {"email": [...], "phone": [...]}
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alerts
-- Alert generati dall'engine
-- ============================================================
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
rule_id UUID REFERENCES alert_rules(id),
lot_id UUID REFERENCES traceability_lots(id),
cte_event_id UUID REFERENCES cte_events(id),
severity VARCHAR(10) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
data JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'open'
CHECK (status IN ('open', 'acknowledged', 'resolved', 'escalated')),
acknowledged_by VARCHAR(255),
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
auto_recall_triggered BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_alerts_lot ON alerts(lot_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_status ON alerts(status);
CREATE INDEX idx_alerts_created ON alerts(created_at DESC);
REST API s FastAPI: Pydantické modely a CTE koncové body
API je vstupním bodem pro všechny operátory v dodavatelském řetězci. FastAPI používáme pro jeho rychlost, automatické ověřování pomocí Pydantic a automatické generování dokumentace OpenAPI — užitečné pro audity FDA. Pydantické šablony odrážejí prostředí KDE vyžadovaná pravidlem.
# fsma204/models.py
# Pydantic v2 models per FSMA 204 API
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
from uuid import UUID
from enum import Enum
class CTEEventType(str, Enum):
GROWING = "growing"
HARVESTING = "harvesting"
COOLING = "cooling"
INITIAL_PACKING = "initial_packing"
SHIPPING = "shipping"
RECEIVING = "receiving"
TRANSFORMATION = "transformation"
class FTLCategory(str, Enum):
FRESH_CHEESE = "fresh_cheese"
SHELL_EGGS = "shell_eggs"
FINFISH = "finfish"
CRUSTACEANS = "crustaceans"
BIVALVE_MOLLUSKS = "bivalve_mollusks"
NUT_BUTTER = "nut_butter"
RTE_DELI_SALADS = "rte_deli_salads"
FRESH_PRODUCE = "fresh_produce"
class LotStatus(str, Enum):
ACTIVE = "active"
RECALLED = "recalled"
QUARANTINED = "quarantined"
CONSUMED = "consumed"
DISPOSED = "disposed"
# ── KDE Models per CTE type ──────────────────────────────────
class ShippingKDE(BaseModel):
"""Key Data Elements per Shipping CTE - FSMA 204 §1.1330"""
tlc: str = Field(..., description="Traceability Lot Code")
quantity_shipped: float = Field(..., gt=0)
unit_of_measure: str
ship_date: date
tlc_source_reference: str = Field(..., description="Descrizione provenienza TLC")
transport_document_number: str
transport_document_type: str = Field(default="bill_of_lading")
receiver_fda_facility_id: str = Field(..., description="FDA registration del destinatario")
receiver_name: str
receiver_address: str
class ReceivingKDE(BaseModel):
"""Key Data Elements per Receiving CTE - FSMA 204 §1.1335"""
tlc_received: str = Field(..., description="TLC come apparso nel documento di spedizione")
quantity_received: float = Field(..., gt=0)
unit_of_measure: str
receive_date: date
location_description: str
reference_document_number: str
reference_document_type: str
lot_code_generator_location: Optional[str] = None
class GrowingKDE(BaseModel):
"""Key Data Elements per Growing CTE"""
tlc: str
harvest_date: date
field_id: str # Identificativo campo/parcella
grower_fda_facility_id: str
commodity: str
growing_method: str = "conventional" # conventional, organic, hydroponic
class InitialPackingKDE(BaseModel):
"""Key Data Elements per Initial Packing CTE"""
tlc: str
pack_date: date
quantity_packed: float
unit_of_measure: str
packaging_type: str
facility_fda_id: str
lot_code_generator_location: str
class TransformationKDE(BaseModel):
"""Key Data Elements per Transformation CTE"""
new_tlc: str # TLC del nuovo prodotto
input_tlcs: List[str] # TLC degli ingredienti usati
transformation_date: date
facility_fda_id: str
product_description: str
quantity_produced: float
unit_of_measure: str
# ── Request/Response Models ──────────────────────────────────
class CreateLotRequest(BaseModel):
food_item_id: UUID
origin_location_id: UUID
tlc: str = Field(..., min_length=3, max_length=50)
initial_quantity: float = Field(..., gt=0)
unit_of_measure: str = Field(default="kg")
production_date: Optional[date] = None
harvest_date: Optional[date] = None
best_before_date: Optional[date] = None
expiry_date: Optional[date] = None
parent_lot_ids: List[UUID] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
@field_validator('tlc')
@classmethod
def validate_tlc(cls, v: str) -> str:
"""TLC non deve contenere caratteri ambigui"""
forbidden = set('IO0l') # caratteri ambigui
if any(c in forbidden for c in v.upper()):
raise ValueError("TLC contiene caratteri ambigui (I, O, 0, l)")
return v.upper()
class RegisterCTERequest(BaseModel):
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_by: str = Field(..., min_length=3)
kde: Dict[str, Any] = Field(..., description="KDE specifici per il tipo di CTE")
destination_location_id: Optional[UUID] = None
reference_document_number: Optional[str] = None
reference_document_type: Optional[str] = None
quantity: Optional[float] = None
unit_of_measure: Optional[str] = None
notes: Optional[str] = None
class CTEEventResponse(BaseModel):
id: UUID
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_at: datetime
recorded_by: str
kde: Dict[str, Any]
event_hash: str
class TracebackRequest(BaseModel):
lot_id: UUID
depth: int = Field(default=10, ge=1, le=50)
class TracebackResult(BaseModel):
lot_id: UUID
tlc: str
ancestors: List[Dict[str, Any]] # lotti upstream
descendants: List[Dict[str, Any]] # lotti downstream
events: List[Dict[str, Any]]
total_ancestors: int
total_descendants: int
trace_depth: int
computation_ms: int
class SortableListResponse(BaseModel):
"""Formato sortable list richiesto da FDA (§1.1375)"""
generated_at: datetime
requesting_lots: List[str] # TLC richiesti
records: List[Dict[str, Any]] # Record ordinabili
total_records: int
format_version: str = "FSMA204_v1"
# fsma204/api.py
# FastAPI application per FSMA 204 compliance
import hashlib
import json
import time
from uuid import UUID
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .models import (
CreateLotRequest, RegisterCTERequest,
CTEEventResponse, TracebackRequest,
TracebackResult, SortableListResponse
)
from .database import get_db
from .kafka_producer import publish_cte_event
from .alert_engine import check_alert_rules
app = FastAPI(
title="FSMA 204 Traceability API",
description="Food Traceability Compliance System per FDA Rule 204",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def compute_event_hash(payload: dict) -> str:
"""SHA-256 hash del payload CTE per immutabilita"""
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Lot Management ──────────────────────────────────────────
@app.post("/api/v1/lots", status_code=201)
async def create_lot(
request: CreateLotRequest,
db: AsyncSession = Depends(get_db)
):
"""
Crea un nuovo lotto tracciabile.
Il TLC deve essere unico nel sistema.
"""
# Verifica unicita TLC
result = await db.execute(
text("SELECT id FROM traceability_lots WHERE tlc = :tlc"),
{"tlc": request.tlc}
)
if result.fetchone():
raise HTTPException(
status_code=409,
detail=f"TLC '{request.tlc}' già esistente nel sistema"
)
# Costruisce genealogy_path
genealogy_path = request.tlc
if request.parent_lot_ids:
# Recupera path del parent principale
parent_result = await db.execute(
text("SELECT genealogy_path, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.parent_lot_ids[0])}
)
parent = parent_result.fetchone()
if parent and parent.genealogy_path:
genealogy_path = f"{parent.genealogy_path}.{request.tlc}"
await db.execute(
text("""
INSERT INTO traceability_lots (
food_item_id, origin_location_id, tlc,
initial_quantity, remaining_quantity, unit_of_measure,
production_date, harvest_date, best_before_date, expiry_date,
parent_lot_ids, genealogy_path, attributes
) VALUES (
:food_item_id, :origin_location_id, :tlc,
:initial_quantity, :initial_quantity, :unit_of_measure,
:production_date, :harvest_date, :best_before_date, :expiry_date,
:parent_lot_ids::uuid[], :genealogy_path::ltree, :attributes::jsonb
)
"""),
{
"food_item_id": str(request.food_item_id),
"origin_location_id": str(request.origin_location_id),
"tlc": request.tlc,
"initial_quantity": request.initial_quantity,
"unit_of_measure": request.unit_of_measure,
"production_date": request.production_date,
"harvest_date": request.harvest_date,
"best_before_date": request.best_before_date,
"expiry_date": request.expiry_date,
"parent_lot_ids": [str(x) for x in request.parent_lot_ids],
"genealogy_path": genealogy_path,
"attributes": json.dumps(request.attributes)
}
)
await db.commit()
return {"message": "Lotto creato", "tlc": request.tlc}
# ── CTE Registration ────────────────────────────────────────
@app.post("/api/v1/cte-events", response_model=CTEEventResponse, status_code=201)
async def register_cte_event(
request: RegisterCTERequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Registra un Critical Tracking Event.
Il record e IMMUTABILE una volta creato (no update/delete).
"""
# Verifica che il lotto esista e sia attivo
lot_result = await db.execute(
text("SELECT id, status, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.lot_id)}
)
lot = lot_result.fetchone()
if not lot:
raise HTTPException(status_code=404, detail="Lotto non trovato")
if lot.status in ('recalled', 'disposed'):
raise HTTPException(
status_code=422,
detail=f"Impossibile registrare CTE su lotto con stato '{lot.status}'"
)
# Calcola hash per immutabilita
payload = {
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp.isoformat(),
"kde": request.kde
}
event_hash = compute_event_hash(payload)
# Inserisce CTE (immutabile)
result = await db.execute(
text("""
INSERT INTO cte_events (
event_type, lot_id, location_id,
event_timestamp, recorded_by, kde,
destination_location_id, reference_document_number,
reference_document_type, quantity, unit_of_measure,
event_hash, notes
) VALUES (
:event_type, :lot_id, :location_id,
:event_timestamp, :recorded_by, :kde::jsonb,
:destination_location_id, :reference_document_number,
:reference_document_type, :quantity, :unit_of_measure,
:event_hash, :notes
)
RETURNING id, recorded_at
"""),
{
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp,
"recorded_by": request.recorded_by,
"kde": json.dumps(request.kde),
"destination_location_id": (
str(request.destination_location_id)
if request.destination_location_id else None
),
"reference_document_number": request.reference_document_number,
"reference_document_type": request.reference_document_type,
"quantity": request.quantity,
"unit_of_measure": request.unit_of_measure,
"event_hash": event_hash,
"notes": request.notes
}
)
row = result.fetchone()
await db.commit()
# Aggiorna remaining_quantity per spedizioni
if request.event_type == "shipping" and request.quantity:
await db.execute(
text("""
UPDATE traceability_lots
SET remaining_quantity = remaining_quantity - :qty
WHERE id = :id
"""),
{"qty": request.quantity, "id": str(request.lot_id)}
)
await db.commit()
# Background: pubblica su Kafka e verifica alert rules
event_data = {
"id": str(row.id),
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"tlc": lot.tlc,
"kde": request.kde,
"event_timestamp": request.event_timestamp.isoformat()
}
background_tasks.add_task(publish_cte_event, event_data)
background_tasks.add_task(check_alert_rules, event_data, str(request.lot_id))
return CTEEventResponse(
id=row.id,
event_type=request.event_type,
lot_id=request.lot_id,
location_id=request.location_id,
event_timestamp=request.event_timestamp,
recorded_at=row.recorded_at,
recorded_by=request.recorded_by,
kde=request.kde,
event_hash=event_hash
)
# ── Traceback / Traceforward ────────────────────────────────
@app.get("/api/v1/lots/{lot_id}/traceback")
async def traceback(lot_id: UUID, depth: int = 10, db: AsyncSession = Depends(get_db)):
"""
One-step-up AND full traceback: risale la filiera upstream.
Requisito FDA: completare entro 24h da richiesta.
"""
start_ms = time.monotonic()
# Recupera lotto root
root_result = await db.execute(
text("SELECT id, tlc, genealogy_path FROM traceability_lots WHERE id = :id"),
{"id": str(lot_id)}
)
root = root_result.fetchone()
if not root:
raise HTTPException(status_code=404, detail="Lotto non trovato")
# Usa ltree per trovare tutti gli antenati
ancestors_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id, initial_quantity
FROM traceability_lots
WHERE genealogy_path @> :path::ltree
AND id != :lot_id
ORDER BY nlevel(genealogy_path) ASC
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
ancestors = [dict(r._mapping) for r in ancestors_result.fetchall()]
# Trova tutti i discendenti (traceforward)
descendants_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id
FROM traceability_lots
WHERE genealogy_path <@ :path::ltree
AND id != :lot_id
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
descendants = [dict(r._mapping) for r in descendants_result.fetchall()]
# Recupera tutti gli eventi CTE per il lotto
events_result = await db.execute(
text("""
SELECT event_type, event_timestamp, recorded_by,
kde, location_id, event_hash
FROM cte_events
WHERE lot_id = :lot_id
ORDER BY event_timestamp ASC
"""),
{"lot_id": str(lot_id)}
)
events = [dict(r._mapping) for r in events_result.fetchall()]
elapsed_ms = int((time.monotonic() - start_ms) * 1000)
return TracebackResult(
lot_id=lot_id,
tlc=root.tlc,
ancestors=ancestors,
descendants=descendants,
events=events,
total_ancestors=len(ancestors),
total_descendants=len(descendants),
trace_depth=depth,
computation_ms=elapsed_ms
)
# ── FDA Sortable List ───────────────────────────────────────
@app.get("/api/v1/lots/sortable-list", response_model=SortableListResponse)
async def fda_sortable_list(
tlcs: str, # comma-separated TLC list
db: AsyncSession = Depends(get_db)
):
"""
Genera il sortable list in formato elettronico richiesto da FDA.
Deve essere producibile entro 24h da richiesta FDA (§1.1375).
"""
tlc_list = [t.strip() for t in tlcs.split(",")]
# Recupera tutti i record CTE per i TLC richiesti
records_result = await db.execute(
text("""
SELECT
tl.tlc,
ce.event_type,
ce.event_timestamp,
ce.recorded_by,
ce.kde,
ce.quantity,
ce.unit_of_measure,
l.name AS location_name,
l.fda_facility_id,
ce.reference_document_number,
ce.event_hash
FROM cte_events ce
JOIN traceability_lots tl ON ce.lot_id = tl.id
JOIN locations l ON ce.location_id = l.id
WHERE tl.tlc = ANY(:tlcs)
ORDER BY tl.tlc, ce.event_timestamp ASC
"""),
{"tlcs": tlc_list}
)
records = []
for row in records_result.fetchall():
r = dict(row._mapping)
r['event_timestamp'] = r['event_timestamp'].isoformat()
records.append(r)
return SortableListResponse(
generated_at=__import__('datetime').datetime.utcnow(),
requesting_lots=tlc_list,
records=records,
total_records=len(records)
)
Immutable Event Sourcing s Apache Kafka
Neměnná povaha CTE dokonale zapadá do vzoru Event Sourcing. Každá událost je neměnným faktem o tom, co se stalo v dodavatelském řetězci. Kafka zajišťuje, aby tyto události byly:
- Odolný: konfigurovatelné uchování (např. 7 let pro FSMA 204)
- Znovu hratelné: k obnově státu v jakémkoli historickém bodě
- Seřazeno podle oddílu: garance objednávky dávek (rozdělení podle TLC)
- Distribuováno: škálovatelné na miliony událostí denně
# fsma204/kafka_producer.py
# Kafka producer per CTE events
import json
import logging
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from datetime import datetime
logger = logging.getLogger(__name__)
# Schema Avro per CTE event (per Schema Registry)
CTE_EVENT_SCHEMA = """
{
"type": "record",
"name": "CTEEvent",
"namespace": "com.fsma204.traceability",
"fields": [
{"name": "id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "lot_id", "type": "string"},
{"name": "tlc", "type": "string"},
{"name": "event_timestamp", "type": "string"},
{"name": "kde", "type": "string"},
{"name": "event_hash", "type": "string"}
]
}
"""
# Configurazione Kafka
kafka_config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'fsma204-producer',
'acks': 'all', # Garanzia durabilita
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'snappy'
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
logger.error(f"Kafka delivery failed: {err}")
else:
logger.debug(
f"CTE event delivered to {msg.topic()} "
f"partition {msg.partition()} offset {msg.offset()}"
)
async def publish_cte_event(event_data: dict):
"""
Pubblica un CTE event su Kafka.
Usa il TLC come partition key per garantire ordine per lotto.
"""
try:
message = json.dumps(event_data, default=str).encode('utf-8')
# Partition key = TLC: tutti gli eventi dello stesso lotto
# vanno alla stessa partizione, garantendo ordinamento
partition_key = event_data.get('tlc', event_data['lot_id'])
producer.produce(
topic='fsma204.cte.events',
key=partition_key.encode('utf-8'),
value=message,
callback=delivery_callback
)
producer.poll(0) # Flush asincrono
except Exception as e:
logger.error(f"Failed to publish CTE event: {e}", exc_info=True)
# ── Consumer per Alert Engine ───────────────────────────────
# fsma204/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import asyncio
async def start_cte_consumer():
"""
Consumer Kafka per il real-time alert processing.
Eseguito come background service separato dall'API.
"""
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'fsma204-alert-engine',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Commit manuale per at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['fsma204.cte.events'])
logger.info("CTE Consumer avviato, in ascolto su fsma204.cte.events")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
await process_cte_for_alerts(event)
consumer.commit(msg)
except Exception as e:
logger.error(f"Error processing CTE event: {e}", exc_info=True)
# Non committiamo: l'evento verrà riprocessato
finally:
consumer.close()
async def process_cte_for_alerts(event: dict):
"""Pipeline di processing per ogni CTE event"""
event_type = event['event_type']
kde = json.loads(event.get('kde', '{}'))
# 1. Controlla cold chain per eventi di spedizione/ricezione
if event_type in ('shipping', 'receiving'):
await check_cold_chain_continuity(event, kde)
# 2. Verifica completezza KDE per il tipo di evento
await validate_kde_completeness(event_type, kde, event['lot_id'])
# 3. Controlla se il fornitore e in blacklist
if 'supplier_fda_id' in kde:
await check_supplier_blacklist(kde['supplier_fda_id'], event['lot_id'])
# 4. Aggiorna grafo di tracciabilita in memoria (NetworkX)
await update_traceability_graph(event)
Alert Engine: Detekce anomálií v reálném čase
Výstražný modul monitoruje nepřetržitý tok událostí CTE a aplikuje konfigurovatelná pravidla. Oznámení jsou odesílána na více kanálech podle závažnosti. Upozornění critical může automaticky spustit pracovní postup odvolání.
# fsma204/alert_engine.py
# Alert engine per FSMA 204 compliance monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
import httpx
import smtplib
from email.mime.text import MIMEText
from twilio.rest import Client as TwilioClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .database import get_db_session
logger = logging.getLogger(__name__)
class AlertSeverity:
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertRule:
"""Base class per le regole di alert"""
def __init__(self, rule_id: str, name: str, severity: str):
self.rule_id = rule_id
self.name = name
self.severity = severity
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
"""Ritorna un dict con i dettagli dell'alert, o None se ok"""
raise NotImplementedError
class ColdChainBreachRule(AlertRule):
"""
Rileva interruzioni nella catena del freddo.
Regola: se la temperatura registrata nei KDE supera la soglia
prevista per la categoria di alimento.
"""
TEMP_THRESHOLDS = {
'fresh_cheese': {'min': 1.0, 'max': 7.0},
'finfish': {'min': -2.0, 'max': 4.0},
'crustaceans': {'min': 0.0, 'max': 4.0},
'shell_eggs': {'min': 5.0, 'max': 7.2}, # FDA: < 45°F
'fresh_produce': {'min': 1.0, 'max': 10.0},
}
def __init__(self):
super().__init__("cold_chain_breach", "Cold Chain Breach", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
temp_recorded = kde.get('temperature_celsius')
if temp_recorded is None:
return None
# Recupera categoria alimento del lotto
result = await db.execute(
text("""
SELECT fi.ftl_category
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
WHERE tl.id = :lot_id
"""),
{"lot_id": event['lot_id']}
)
row = result.fetchone()
if not row:
return None
category = row.ftl_category
threshold = self.TEMP_THRESHOLDS.get(category)
if not threshold:
return None
breach = False
breach_detail = ""
if temp_recorded < threshold['min']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sotto minimo {threshold['min']}°C"
)
elif temp_recorded > threshold['max']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sopra massimo {threshold['max']}°C"
)
if breach:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Cold Chain Breach - TLC {event.get('tlc', 'N/A')}",
"description": (
f"{breach_detail} per categoria {category}. "
f"CTE: {event['event_type']}, "
f"timestamp: {event['event_timestamp']}"
),
"lot_id": event['lot_id'],
"data": {
"temperature": temp_recorded,
"threshold": threshold,
"category": category,
"event_type": event['event_type']
}
}
return None
class MissingKDERule(AlertRule):
"""
Verifica che tutti i KDE obbligatori siano presenti per il tipo CTE.
Mancanza di KDE = non-compliance FSMA 204.
"""
REQUIRED_KDE = {
'shipping': ['tlc', 'quantity_shipped', 'ship_date', 'receiver_name'],
'receiving': ['tlc_received', 'quantity_received', 'receive_date'],
'initial_packing': ['tlc', 'pack_date', 'quantity_packed', 'facility_fda_id'],
'growing': ['tlc', 'harvest_date', 'field_id', 'grower_fda_facility_id'],
'transformation': ['new_tlc', 'input_tlcs', 'transformation_date'],
}
def __init__(self):
super().__init__("missing_kde", "Missing KDE Fields", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
event_type = event['event_type']
required = self.REQUIRED_KDE.get(event_type, [])
if not required:
return None
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
missing = [f for f in required if f not in kde or kde[f] is None]
if missing:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"KDE Mancanti - CTE {event_type}",
"description": (
f"CTE {event_type} per TLC {event.get('tlc')} "
f"manca dei KDE obbligatori: {', '.join(missing)}"
),
"lot_id": event['lot_id'],
"data": {"missing_fields": missing, "event_type": event_type}
}
return None
# ── Notification System ──────────────────────────────────────
class MultiChannelNotifier:
def __init__(self, config: dict):
self.config = config
self.twilio = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
async def send_alert(self, alert_data: dict, rule_config: dict):
channels = rule_config.get('channels', ['email'])
severity = alert_data['severity']
tasks = []
if 'email' in channels:
tasks.append(self._send_email(alert_data, rule_config))
if 'sms' in channels and severity in ('high', 'critical'):
tasks.append(self._send_sms(alert_data, rule_config))
if 'webhook' in channels:
tasks.append(self._send_webhook(alert_data, rule_config))
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
logger.error(f"Notification failed: {r}")
async def _send_email(self, alert: dict, config: dict):
recipients = config.get('recipients', {}).get('email', [])
msg = MIMEText(
f"FSMA 204 Alert\n\n"
f"Severity: {alert['severity'].upper()}\n"
f"Title: {alert['title']}\n"
f"Description: {alert['description']}\n"
f"Lot ID: {alert['lot_id']}\n"
f"Timestamp: {datetime.utcnow().isoformat()}"
)
msg['Subject'] = f"[FSMA204-{alert['severity'].upper()}] {alert['title']}"
msg['From'] = self.config['smtp_from']
msg['To'] = ', '.join(recipients)
# In produzione usare asyncio-friendly SMTP
with smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.sendmail(self.config['smtp_from'], recipients, msg.as_string())
async def _send_sms(self, alert: dict, config: dict):
phones = config.get('recipients', {}).get('phone', [])
message_body = (
f"FSMA204 ALERT {alert['severity'].upper()}: "
f"{alert['title']} - Lot: {alert['lot_id']}"
)
for phone in phones:
self.twilio.messages.create(
body=message_body,
from_=self.config['twilio_from'],
to=phone
)
async def _send_webhook(self, alert: dict, config: dict):
webhook_url = config.get('recipients', {}).get('webhook_url')
if not webhook_url:
return
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"event": "fsma204_alert",
"severity": alert['severity'],
"data": alert
},
timeout=10.0
)
# ── Main check function ──────────────────────────────────────
ACTIVE_RULES = [
ColdChainBreachRule(),
MissingKDERule(),
# Aggiungere: SupplierBlacklistRule(), LotAgeRule(), etc.
]
async def check_alert_rules(event: dict, lot_id: str):
"""Chiamato come background task dopo ogni registrazione CTE"""
async with get_db_session() as db:
for rule in ACTIVE_RULES:
try:
alert_data = await rule.evaluate(event, db)
if alert_data:
# Salva alert su DB
await db.execute(
text("""
INSERT INTO alerts (rule_id, lot_id, severity, title, description, data)
SELECT ar.id, :lot_id, :severity, :title, :description, :data::jsonb
FROM alert_rules ar WHERE ar.name = :rule_name
"""),
{
"lot_id": lot_id,
"severity": alert_data['severity'],
"title": alert_data['title'],
"description": alert_data['description'],
"data": json.dumps(alert_data.get('data', {})),
"rule_name": rule.name
}
)
await db.commit()
# Auto-trigger recall se critical
if alert_data['severity'] == AlertSeverity.CRITICAL:
await auto_trigger_recall(lot_id, alert_data, db)
logger.warning(
f"Alert generato: {alert_data['severity']} - {alert_data['title']}"
)
except Exception as e:
logger.error(f"Error evaluating rule {rule.rule_id}: {e}", exc_info=True)
Správa stažení: Dokončete pracovní postup za méně než 24 hodin
Nejpřísnějším požadavkem FSMA 204 je schopnost dokončit traceback a traceforward a doručit seřaditelný seznam FDA do 24 hodin od požadavku. Automatizovaný pracovní postup odvolání je jediný způsob, jak zaručit tuto smlouvu SLA ve velkém měřítku. V roce 2024 zaznamenal FDA 241 stažení potravin z trhu s průměrnou dobou dokončení 73 hodin – což je výrazně nad rámec požadavků nového pravidla.
# fsma204/recall_manager.py
# Recall Management con workflow automatizzato
import asyncio
import json
import logging
from datetime import datetime
from uuid import UUID, uuid4
from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import networkx as nx
logger = logging.getLogger(__name__)
class RecallWorkflow:
"""
Gestisce il workflow completo di un recall FSMA 204.
Fasi:
1. IDENTIFIED - Identificazione del problema
2. SCOPE_DETERMINED - Calcolo scope (traceback + traceforward)
3. NOTIFICATIONS_SENT - Notifiche a tutti gli operatori coinvolti
4. REMOVAL_IN_PROGRESS - Rimozione dal mercato
5. EFFECTIVENESS_CHECK - Verifica efficacia
6. CLOSED / FDA_REPORTED
"""
def __init__(self, db: AsyncSession, notifier):
self.db = db
self.notifier = notifier
async def initiate_recall(
self,
trigger_lot_ids: List[UUID],
trigger_reason: str,
trigger_description: str,
initiated_by: str,
recall_type: str = "class_i"
) -> dict:
"""
Avvia un recall. Questo metodo deve completare
traceback + traceforward + scope in < 30 minuti
per rispettare il requisito 24h FDA complessivo.
"""
recall_number = self._generate_recall_number()
start_time = datetime.utcnow()
logger.info(
f"Recall {recall_number} avviato da {initiated_by} "
f"per {len(trigger_lot_ids)} lotti"
)
# Fase 1: Crea record recall
recall_id = await self._create_recall_record(
recall_number, trigger_lot_ids, trigger_reason,
trigger_description, initiated_by, recall_type
)
# Fase 2: Determina scope (traceback + traceforward)
scope_start = datetime.utcnow()
affected_lots = await self._determine_scope(trigger_lot_ids)
scope_seconds = int((datetime.utcnow() - scope_start).total_seconds())
# Fase 3: Aggiorna recall con scope e stati
await self._update_recall_scope(recall_id, affected_lots, scope_seconds)
# Fase 4: Genera FDA Recall Notice
fda_report = await self._generate_fda_report(
recall_number, trigger_lot_ids, affected_lots,
trigger_reason, trigger_description
)
# Fase 5: Marca lotti come recalled
await self._mark_lots_recalled(affected_lots)
# Fase 6: Invia notifiche a tutti gli operatori
notify_start = datetime.utcnow()
await self._send_recall_notifications(affected_lots, recall_number, trigger_description)
notify_seconds = int((datetime.utcnow() - notify_start).total_seconds())
# Aggiorna timestamp workflow
await self.db.execute(
text("""
UPDATE recall_events SET
scope_determined_at = NOW(),
notifications_sent_at = NOW(),
status = 'notifications_sent',
fda_report_json = :fda_report::jsonb,
traceback_seconds = :traceback_seconds,
lots_traced_pct = :lots_pct
WHERE id = :id
"""),
{
"id": str(recall_id),
"fda_report": json.dumps(fda_report),
"traceback_seconds": scope_seconds,
"lots_pct": 100.0 # aggiornare con verifica reale
}
)
await self.db.commit()
total_seconds = int((datetime.utcnow() - start_time).total_seconds())
logger.info(
f"Recall {recall_number} scope determinato in {scope_seconds}s, "
f"notifiche in {notify_seconds}s, totale {total_seconds}s"
)
return {
"recall_number": recall_number,
"recall_id": str(recall_id),
"affected_lots_count": len(affected_lots),
"scope_determination_seconds": scope_seconds,
"notification_seconds": notify_seconds,
"total_seconds": total_seconds,
"fda_report": fda_report
}
async def _determine_scope(self, trigger_lot_ids: List[UUID]) -> List[str]:
"""
Determina scope usando graph traversal.
Usa NetworkX per BFS su grafo di genealogia.
"""
# Costruisce grafo da DB
G = await self._build_traceability_graph()
affected = set()
for lot_id in trigger_lot_ids:
lot_str = str(lot_id)
if lot_str not in G:
continue
# Traceback: tutti gli antenati (upstream)
ancestors = nx.ancestors(G, lot_str)
affected.update(ancestors)
# Traceforward: tutti i discendenti (downstream)
descendants = nx.descendants(G, lot_str)
affected.update(descendants)
affected.add(lot_str)
return list(affected)
async def _build_traceability_graph(self) -> nx.DiGraph:
"""
Costruisce un grafo orientato della genealogia lotti
usando gli eventi CTE di tipo shipping/receiving/transformation.
"""
G = nx.DiGraph()
# Nodi: tutti i lotti attivi
lots_result = await self.db.execute(
text("SELECT id, tlc, status FROM traceability_lots WHERE status != 'archived'")
)
for row in lots_result.fetchall():
G.add_node(str(row.id), tlc=row.tlc, status=row.status)
# Archi: da shipping events
edges_result = await self.db.execute(
text("""
SELECT
s.lot_id AS from_lot,
r.lot_id AS to_lot
FROM cte_events s
JOIN cte_events r ON s.reference_document_number = r.reference_document_number
WHERE s.event_type = 'shipping'
AND r.event_type = 'receiving'
""")
)
for row in edges_result.fetchall():
G.add_edge(str(row.from_lot), str(row.to_lot))
# Archi: da transformation events
transform_result = await self.db.execute(
text("""
SELECT lot_id, related_lot_ids
FROM cte_events
WHERE event_type = 'transformation'
""")
)
for row in transform_result.fetchall():
for parent_id in (row.related_lot_ids or []):
G.add_edge(str(parent_id), str(row.lot_id))
return G
async def _generate_fda_report(
self,
recall_number: str,
trigger_lots: List[UUID],
affected_lots: List[str],
reason: str,
description: str
) -> dict:
"""
Genera il recall notice in formato FDA.
Struttura basata su FDA 21 CFR 7.46.
"""
# Recupera dettagli lotti coinvolti
lots_result = await self.db.execute(
text("""
SELECT tl.tlc, tl.initial_quantity, tl.unit_of_measure,
fi.name AS food_name, fi.ftl_category,
l.name AS origin_name, l.fda_facility_id,
tl.production_date, tl.best_before_date
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
JOIN locations l ON tl.origin_location_id = l.id
WHERE tl.id = ANY(:affected)
"""),
{"affected": affected_lots}
)
affected_details = [dict(r._mapping) for r in lots_result.fetchall()]
return {
"recall_number": recall_number,
"report_date": datetime.utcnow().isoformat(),
"regulation": "FSMA Section 204 - 21 CFR Part 1 Subpart S",
"recall_type": "Voluntary",
"reason": reason,
"description": description,
"affected_products": affected_details,
"total_affected_lots": len(affected_lots),
"fsma_compliance": {
"rule": "Food Traceability Final Rule",
"response_format": "Electronic Sortable",
"24h_compliance": True
}
}
def _generate_recall_number(self) -> str:
year = datetime.utcnow().year
unique = str(uuid4())[:8].upper()
return f"RECALL-{year}-{unique}"
async def _create_recall_record(
self, recall_number, trigger_lot_ids, reason,
description, initiated_by, recall_type
) -> UUID:
result = await self.db.execute(
text("""
INSERT INTO recall_events (
recall_number, recall_type, trigger_lot_ids,
trigger_reason, trigger_description, initiated_by
) VALUES (
:number, :type, :lots::uuid[],
:reason, :desc, :by
)
RETURNING id
"""),
{
"number": recall_number,
"type": recall_type,
"lots": [str(l) for l in trigger_lot_ids],
"reason": reason,
"desc": description,
"by": initiated_by
}
)
row = result.fetchone()
await self.db.commit()
return row.id
async def _mark_lots_recalled(self, affected_lot_ids: List[str]):
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'recalled', updated_at = NOW()
WHERE id = ANY(:ids::uuid[])
"""),
{"ids": affected_lot_ids}
)
await self.db.commit()
async def _update_recall_scope(self, recall_id, affected_lots, scope_seconds):
await self.db.execute(
text("""
UPDATE recall_events SET
affected_lot_ids = :lots::uuid[],
status = 'scope_determined',
traceback_seconds = :seconds
WHERE id = :id
"""),
{
"lots": affected_lots,
"seconds": scope_seconds,
"id": str(recall_id)
}
)
await self.db.commit()
async def _send_recall_notifications(
self,
affected_lot_ids: List[str],
recall_number: str,
description: str
):
"""Notifica tutti gli operatori che hanno movimentato i lotti coinvolti"""
# Recupera contatti di tutti i location coinvolti
contacts_result = await self.db.execute(
text("""
SELECT DISTINCT l.contact_email, l.contact_phone, l.name
FROM cte_events ce
JOIN locations l ON ce.location_id = l.id
WHERE ce.lot_id = ANY(:lots::uuid[])
AND l.contact_email IS NOT NULL
"""),
{"lots": affected_lot_ids}
)
for contact in contacts_result.fetchall():
alert_payload = {
"severity": "critical",
"title": f"RECALL NOTICE {recall_number}",
"description": (
f"I lotti che avete ricevuto/spedito sono coinvolti nel recall {recall_number}. "
f"Motivo: {description}. "
"Sospendere immediatamente la distribuzione e contattare il responsabile."
),
"lot_id": "multiple"
}
rule_config = {
"channels": ["email"],
"recipients": {
"email": [contact.contact_email],
"phone": [contact.contact_phone] if contact.contact_phone else []
}
}
try:
await self.notifier.send_alert(alert_payload, rule_config)
except Exception as e:
logger.error(f"Failed to notify {contact.name}: {e}")
Mock Recall Drill: Periodické testování výkonu
FDA důrazně doporučuje i falešný vzpomínací vrták — pravidelné simulace procesu stahování, aby se ověřilo, že systém splňuje 24hodinový požadavek. Cvičení by se mělo provádět alespoň každých 6 měsíců a výsledky by měly být dokumentovány pro audity. Klíčové metriky jsou: celkový čas dokončení, procento sledovaných dávek, čas určení rozsahu.
# fsma204/mock_recall_drill.py
# Mock recall drill con metriche performance
import asyncio
import random
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
logger = logging.getLogger(__name__)
class MockRecallDrill:
"""
Simula un recall scenario per testare le performance del sistema.
Il drill usa lotti MOCK (non production) con dati sintetici.
"""
def __init__(self, db, recall_workflow, lot_count: int = 100):
self.db = db
self.workflow = recall_workflow
self.lot_count = lot_count
self.drill_id = f"DRILL-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
async def run_drill(self) -> Dict[str, Any]:
"""
Esegue il drill completo e ritorna le metriche.
"""
logger.info(f"Avvio mock recall drill {self.drill_id}")
metrics = {
"drill_id": self.drill_id,
"started_at": datetime.utcnow().isoformat(),
"phases": {}
}
# Fase 1: Seed dati mock
phase_start = time.monotonic()
mock_lots = await self._seed_mock_data()
metrics['phases']['data_seeding'] = {
"duration_seconds": round(time.monotonic() - phase_start, 2),
"lots_created": len(mock_lots)
}
# Fase 2: Seleziona lotto trigger casuale
trigger_lot = random.choice(mock_lots[10:40]) # Dal mezzo della supply chain
logger.info(f"Drill trigger lot: {trigger_lot['tlc']}")
# Fase 3: Esegue recall
phase_start = time.monotonic()
recall_result = await self.workflow.initiate_recall(
trigger_lot_ids=[trigger_lot['id']],
trigger_reason="pathogen_contamination",
trigger_description=f"[DRILL] Mock Listeria contamination test - {self.drill_id}",
initiated_by="recall_drill_system",
recall_type="class_i"
)
recall_duration = time.monotonic() - phase_start
metrics['phases']['recall_execution'] = {
"duration_seconds": round(recall_duration, 2),
"scope_seconds": recall_result['scope_determination_seconds'],
"notification_seconds": recall_result['notification_seconds'],
"affected_lots": recall_result['affected_lots_count'],
"recall_number": recall_result['recall_number']
}
# Fase 4: Verifica compliance SLA
sla_24h_seconds = 24 * 3600
compliance_ok = recall_duration < sla_24h_seconds
metrics['sla_compliance'] = {
"target_seconds": sla_24h_seconds,
"actual_seconds": round(recall_duration, 2),
"compliant": compliance_ok,
"margin_hours": round((sla_24h_seconds - recall_duration) / 3600, 1)
}
# Fase 5: Cleanup dati mock
await self._cleanup_mock_data(mock_lots)
metrics['completed_at'] = datetime.utcnow().isoformat()
metrics['overall_pass'] = compliance_ok
self._log_drill_report(metrics)
return metrics
async def _seed_mock_data(self) -> list:
"""Crea lotti sintetici con CTE events per il drill"""
from sqlalchemy import text
mock_lots = []
for i in range(self.lot_count):
tlc = f"MOCK-{self.drill_id}-LOT-{i:04d}"
lot_data = {
"id": None,
"tlc": tlc,
"is_mock": True
}
# In produzione: inserire tramite lot creation API
# Per semplicità usiamo insert diretto nel drill
mock_lots.append(lot_data)
logger.info(f"Creati {len(mock_lots)} lotti mock per drill {self.drill_id}")
return mock_lots
async def _cleanup_mock_data(self, mock_lots: list):
"""Rimuove dati mock dopo il drill"""
from sqlalchemy import text
tlcs = [l['tlc'] for l in mock_lots]
# I mock TLC iniziano con "MOCK-" per identificazione sicura
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'archived'
WHERE tlc LIKE 'MOCK-%' AND tlc = ANY(:tlcs)
"""),
{"tlcs": tlcs}
)
await self.db.commit()
logger.info(f"Cleanup drill {self.drill_id} completato")
def _log_drill_report(self, metrics: dict):
report_lines = [
f"",
f"=== MOCK RECALL DRILL REPORT ===",
f"Drill ID: {metrics['drill_id']}",
f"Started: {metrics['started_at']}",
f"",
f"FASI:",
]
for phase, data in metrics.get('phases', {}).items():
report_lines.append(f" {phase}: {json.dumps(data)}")
sla = metrics.get('sla_compliance', {})
status = "PASS ✓" if sla.get('compliant') else "FAIL ✗"
report_lines.extend([
f"",
f"SLA 24H COMPLIANCE: {status}",
f" Target: {sla.get('target_seconds')}s",
f" Actual: {sla.get('actual_seconds')}s",
f" Margin: {sla.get('margin_hours')}h",
f"",
f"OVERALL: {'PASS' if metrics.get('overall_pass') else 'FAIL'}",
f"================================",
])
logger.info('\n'.join(report_lines))
Regulatorní srovnání: FSMA 204 vs EU Reg. 178/2002 vs britský zákon o bezpečnosti potravin
Pro operátory vyvážející na více trhů je zásadní porozumět rozdílům mezi regulačními rámci. I když sdílejí zásadu „jeden krok nahoru, jeden krok dolů“, výrazně se liší v granularitě a sankcích.
| Velikost | FSMA 204 (USA) | EU Reg. 178/2002 | UK Food Safety Act 1990 + Reg. |
|---|---|---|---|
| Košťata | Pouze potraviny FTL (konkrétní seznam) | Veškeré jídlo a krmivo | Všechny potraviny ve Spojeném království |
| Zrnitost | CTE + KDE specifické pro událost | Obecný krok nahoru / jeden krok dolů | Jeden krok nahoru / jeden krok dolů |
| Doba odezvy FDA/úřadu | 24 hodin (elektronický seznam) | Žádné konkrétní podmínky | Žádné konkrétní podmínky |
| Formát záznamu | Elektronické, objednatelné, interoperabilní | Jakýkoli dokumentovaný formát | Jakýkoli dokumentovaný formát |
| Udržení | 24 měsíců | Variabilní podle kategorie | Variabilní podle kategorie |
| Sankce | Až 10 000 $/den za porušení | Liší se v jednotlivých členských státech | Až 20 000 GBP + trest vězení |
| Aplikace pro dovozce | Ano: platí pro každého, kdo dováží do USA | Ano: pro operátory na trhu EU | Ano: pro operátory na britském trhu |
| Technologické standardy | Žádný mandát (ale doporučeno GS1) | Žádný příkaz | Žádný příkaz |
Praktické důsledky pro exportéry
Italská společnost, která vyváží čerstvou mozzarellu do USA, musí vyhovět je EU Reg. 178/2002 než FSMA 204. Protože je FSMA 204 přísnější (specifické CTE/KDE, 24h odpověď, elektronický formát), je systém vyhovující FSMA 204 automaticky také v souladu s nařízením EU. Optimální strategií je implementovat FSMA 204 jako základ a přizpůsobit jej požadavkům EU/UK.
Dopad na italské vývozce do USA
Itálie je třetím největším vývozcem potravin a nápojů do USA (po Kanadě a Mexiku) s hodnotou přibližně 8 miliard eur v roce 2024. Hlavní kategorie ovlivněné FSMA 204:
| Kategorie | Export IT-USA 2024 (odhad) | Pokrytí FSMA 204 | Je vyžadována akce |
|---|---|---|---|
| Víno | ~2,1 miliardy dolarů | Ne (ne v FTL) | Pouze registrace FDA |
| Zralé sýry (Parmezán, Grana, Pecorino) | ~350 milionů dolarů | Ne (kromě zkušených lidí) | Pouze registrace FDA + předchozí upozornění |
| Čerstvé sýry (mozzarella, burrata, ricotta) | ~120 milionů dolarů | ANO — kritická priorita | Plná implementace CTE/KDE |
| Olivový olej | ~706 milionů dolarů | No | Pouze registrace FDA |
| Zeleninové konzervy (rajčata, protlak) | ~200 milionů dolarů | Částečná (čerstvá rajčata v FTL) | Zkontrolujte, zda přísady zůstávají čerstvé |
Odhad nákladů na dodržování předpisů pro italský potravinářský MSP
| Komponent | Nastavení (jednorázové) | Roční provozní náklady |
|---|---|---|
| Sledovatelnost softwaru (SaaS nebo vlastní) | 15 000 – 80 000 EUR | 8 000 – 20 000 EUR/rok |
| Stávající integrace ERP/MES | 10 000 – 40 000 EUR | 2 000 – 5 000 EUR/rok |
| Hardware (skenery, čtečky RFID, senzory) | 5 000 – 25 000 EUR | 1 000 – 3 000 EUR/rok (údržba) |
| Osobní trénink | 3 000 – 10 000 EUR | 1 500 – 3 000 EUR/rok |
| Právní poradenství / soulad s FDA | 5 000 – 20 000 EUR | 3 000 – 8 000 EUR/rok |
| Odhadovaný součet | 38 000 – 175 000 EUR | 15 500 – 39 000 EUR/rok |
Dostupné financování: PNRR Transition 5.0
Italské potravinářské malé a střední podniky mají přístup k fondům PNRR Transition 5.0 (přiděleno 12,7 miliardy EUR, daňový kredit až 45 %) na náklady na zavedení systémů digitální sledovatelnosti. Systémy FSMA 204 se kvalifikují jako „digitalizace výrobních procesů“, pokud integrují IoT senzory pro monitorování chladicího řetězce. Ověřte si platnost u svého účetního.
Plán shody Doporučený pro italské malé a střední podniky
- 1.–2. měsíc: Gap analysis — mapování stávajících toků, identifikace FTL produktů, hodnocení současných IT systémů
- 3.–4. měsíc: Návrh databáze + vývoj API (jako základ použijte kód z tohoto článku)
- 5.–6. měsíc: Integrace se stávajícím ERP (SAP, Dynamics, Sage), školení operátorů
- 7.–8. měsíc: Otestujte produkt/řadu, ověřte úplnost KDE, nejprve simulujte cvičení
- 9.–10. měsíc: Kompletní zavedení, aktivní výstražný modul, dokumentace FDA
- 11.–12. měsíc: Druhý model cvičení, doladění, příprava na audit FDA
Osvědčené postupy a anti-vzorce
Nejlepší postupy
- Jedinečné a jednoznačné TLC: vyhněte se vizuálně podobným znakům (I/l/1, O/0). Použijte GS1-128 nebo SSCC jako standard pro interoperabilitu s americkým dodavatelským řetězcem
- Neměnnost zaručena na úrovni DB: používejte spouštěče PostgreSQL, které zabraňují UPDATE/DELETE na cte_events, nejen kódové konvence
- Hash události pro auditní stopu: SHA-256 užitečného zatížení v době vytvoření – umožňuje ověřit, že záznamy nebyly změněny
- 24hodinový ALS test pravidelně: každých 6 měsíců provádějte simulovaná cvičení se syntetickými daty, zdokumentujte výsledky
- Synchronizace s GS1: použijte GLN (Global Location Number) pro location_ids — kompatibilní s EDI a partnerskými systémy v USA
- Verze KDE: FSMA 204 se může vyvíjet; mějte ve svých záznamech pole kde_schema_version, abyste usnadnili migraci
Anti-vzory, kterým je třeba se vyhnout
- KDE pouze na papíře: mnoho společností udržuje KDE ve formátu PDF nebo Excel. U těchto systémů není možné splnit požadavek „elektronický, tříditelný formát“ do 24 hodin
- Nestandardní TLC: použití interních čísel šarží, která nejsou sdělena následným partnerům, znemožňuje zpětné sledování napříč společnostmi
- Upozornění bez eskalace: výstražný systém, který posílá e-maily, ale nemá eskalační protokol pro kritické problémy (žádná odpověď do 2 hodin → automatické volání) nerespektuje ducha FSMA
- Pouze interní sledovatelnost: FSMA 204 vyžaduje u partnerů jeden krok nahoru A jeden krok dolů. Samotný interní systém nestačí
- Chyba synchronizace UTC: Časová razítka CTE musí být v UTC nebo explicitně posunutá. Nejednoznačná časová razítka znemožňují rekonstrukci časové osy v případě mezinárodního odvolání
Závěry a další kroky
FSMA Rule 204 představuje posun paradigmatu: od sledovatelnosti jako papírového dokumentu k sledovatelné jako softwarová infrastruktura v reálném čase. Systém, který jsme vytvořili v tomto článku — PostgreSQL pro úložiště, FastAPI pro vrstvu API, Kafka pro streamování událostí, NetworkX pro procházení grafů — je schopen uspokojit nejkritičtější požadavek: kompletní seznam seřazený pro FDA do 24 hodin od jakékoli žádosti.
Dobrá zpráva pro italské společnosti: prodloužení termínu do července 2028 nabízí dostatek času na postupnou implementaci. Špatná zpráva: Společnosti, které čekají na začátek do roku 2027, najdou trh poradců a prodejců softwaru za přemrštěné ceny a s dlouhými frontami.
Údaje z roku 2024 hovoří jasně: 241 stažení potravin FDA z trhu, 1 392 nemocných lidí, počet hospitalizací se ve srovnání s rokem 2023 více než zdvojnásobil. Granulární sledovatelnost není byrokracie – je to prevence měřitelná na lidských životech.
FoodTech Series: Další článek
Další článek v seriálu je 07 — Vertikální zemědělství: Dashboard and Automated Control, kde vybudujeme řídicí systém v reálném čase pro hydroponické farmy: IoT senzory (světlo, CO2, pH, EC), Streamlit dashboardy a optimalizační algoritmy pro maximalizaci výnosu na metr čtvereční. Zůstaňte naladěni na federicocalo.dev.
Cross-Series: Související statistiky
- MLOps (řada 12): jak nasadit modely detekce anomálií výstražného enginu v produkci s MLflow
- AI Engineering (řada 13): RAG na dokumentaci FDA – LLM, která odpovídá na otázky shody citováním správného regulačního textu
- PostgreSQL AI (řada 22): pgvector pro vyhledávání podobnosti na profilech kontaminace – najde šarže s podobnými vlastnostmi jako ty, které se účastnily historických ohnisek
- Frontend DevOps (řada 9): CI/CD kanál pro nepřetržité nasazení FSMA 204 API na Kubernetes







