FSMA 204 Otomasyon: Python ile Takip, Uyarı ve Geri Çağırma
20 Ocak 2026, Amerikan gıda güvenliği tarihinde bir dönüm noktası olmalıydı. Orada FSMA Kural 204 — Gıda İzlenebilirliği Nihai Kuralı FDA, gıda tedarik zincirindeki tüm operatörlerin düzinelerce yüksek riskli gıda kategorisi için ayrıntılı izlenebilirlik sistemleri uygulamasını zorunlu kıldı. 2011 Gıda Güvenliği Modernizasyon Yasası'ndan küresel yankı uyandıran benzeri görülmemiş bir düzenleyici güncelleme: İtalyan şarap, peynir ve zeytinyağı şirketleri de dahil olmak üzere ABD pazarına ihracat yapan herkes hedef tahtasındaydı.
Mart 2025'te FDA, son tarihi 30 ay (20 Temmuz 2028'e kadar) uzattı, ancak yön net: yüksek riskli gıdalar için ayrıntılı izlenebilirlik artık isteğe bağlı değil. Bugün uygulamaya başlayan şirketler, uyumluluk zorunlu hale geldiğinde büyük bir rekabet avantajına sahip olacak. Bu makalede Python'da veritabanı tasarımından REST API'ye, uyarı motorundan geri çağırma yönetimine kadar FSMA 204 gereksinimlerini karşılayan eksiksiz bir sistem oluşturuyoruz.
Ne Öğreneceksiniz
- FSMA 204'ün tam yapısı: Gıda İzlenebilirlik Listesi, CTE ve KDE
- Çok düzeyli izlenebilirlik için PostgreSQL veritabanı tasarımı
- CTE kaydı ve bir yukarı/bir aşağı sorgular için Pydantic modellerine sahip FastAPI REST API
- Tam denetim takibi için Apache Kafka ile Değişmez Olay Kaynağı Kullanımı
- Çok kanallı bildirimlerle anormalliklere (soğuk zincir, şüpheli partiler) karşı uyarı motoru
- Geri çağırma yönetimi iş akışı: 24 saatten kısa sürede geri izleme/ileri izleme (FDA gerekliliği)
- Performans ölçümleriyle sahte geri çağırma alıştırması
- Karşılaştırma FSMA 204 ile AB Reg. 178/2002 ve İtalyan ihracatçılara etkisi
FoodTech Serisi: Neredeyiz
Bu, federicocalo.dev'deki FoodTech serisinin altıncı makalesidir. İşte tam harita:
| # | Başlık | Seviye | Durum |
|---|---|---|---|
| 01 | Hassas Tarım için IoT Boru Hattı | Orta seviye | Yayınlandı |
| 02 | Gıda Kalite Kontrolü için Bilgisayarlı Görü | Orta seviye | Yayınlandı |
| 03 | Mahsul Tahmini için ML ve Edge Computing | Gelişmiş | Yayınlandı |
| 04 | Blockchain ve Şeffaf Gıda Tedarik Zinciri | Gelişmiş | Yayınlandı |
| 05 | Prophet ve LSTM ile büyük ölçekli perakende ticaret için Talep Tahmini | Gelişmiş | Yayınlandı |
| 06 | FSMA 204 Otomasyon: Python ile Takip, Uyarı ve Geri Çağırma | Gelişmiş | BURADASINIZ |
| 07 | Dikey Tarım: Kontrol Paneli ve Otomatik Kontrol | Orta seviye | Yakında |
| 08 | Mahsul İzleme için Uydu API'si | Gelişmiş | Yakında |
| 09 | Streamlit'li Tarladan Çatala Kontrol Paneli | Orta seviye | Yakında |
| 10 | Tedarik Zinciri Esnekliği: OR-Tools ile Optimizasyon | Gelişmiş | Yakında |
FSMA Kural 204: 2011'den Bu Yana En Önemli Mevzuat Değişikliği
2011 Gıda Güvenliği Modernizasyon Yasası (FSMA), FDA'nın gıda güvenliğine yönelik yaklaşımını reaktif (kontaminasyona tepki vermek) yaklaşımından önleyici (kirlenmeyi önleyen) yaklaşımına dönüştürdü. Bölüm 204, FDA'ya yüksek riskli gıdaları belirleme ve ek izlenebilirlik gereklilikleri uygulama yetkisi vermiştir. Sonuç şudur: Gıda İzlenebilirliği Nihai Kuralı, Kasım 2022'de yayınlandı.
Amaç radikaldir: Bir salgın veya kontaminasyon durumunda, operatörler bir partinin tüm dağıtım zincirini (tarladan çatala) tanımlayabilmeli ve kayıtları FDA'ya, sipariş edilebilecek elektronik bir formatta teslim edebilmelidir. Talepten 24 saat sonra. FSMA 204 öncesinde bu süreç ortalama 7-10 gün sürüyordu. Kirlenmiş gıdalardan kaynaklanan hastalıkların ve ölümlerin azaltılması üzerindeki etkisi, FDA tarafından her yıl kaçınılan yüzlerce vaka olarak tahmin edilmektedir.
Gıda İzlenebilirlik Listesi (FTL): İlgili Gıdalar
FTL, tarihsel olarak şiddetli salgınlarla ilişkilendirilen gıdaları içerir. Ana kategoriler şunlardır:
| Kategori | Spesifik örnekler | Birincil risk |
|---|---|---|
| Taze peynirler | Yumuşak/yarı yumuşak olgunlaşmamış, taze mozzarella peyniri, ricotta | Listeria monositogenleri |
| Kabuklu yumurta | Pastörize edilmemiş tavuk yumurtası | Salmonella Enteritidis |
| Deniz ürünleri – balık | Ton balığı, somon, morina, kılıç balığı (taze/dondurulmuş) | Scombroid, Listeria |
| Kabuklular | Karides, yengeç, ıstakoz | Vibrio, Norovirüs |
| Çift kabuklu yumuşakçalar | İstiridye, istiridye, midye | Norovirüs, Vibrio |
| RTE salataları | Patates salatası, yumurta, deniz ürünleri | Listeria, Salmonella |
| Taze meyve ve sebzeler | Salatalık, otlar, yapraklı yeşillikler, kavunlar, biberler, filizler, domatesler, tropik meyveler | E. coli O157:H7, Salmonella |
| Fındık ezmesi | Fıstık ezmesi, badem, kaju fıstığı | Salmonella |
İtalyan İhracatçılara Önemli Not
I taze peynirler (mozzarella, burrata, ricotta, stracchino) açıkça FTL'dedir. Bu ürünleri ABD'ye ihraç eden İtalyan şirketlerinin, Amerika pazarına yönelik tüm partiler için FSMA 204 izlenebilirliğini uygulaması gerekiyor. Sert olgun peynirler (Parmigiano Reggiano, Grana Padano, Pecorino Romano) mevcut FTL'de yer almamaktadır.
Kritik İzleme Olayları (CTE)
CTE'ler tedarik zincirinde izlenebilirliğin kaydedilmesi gereken anlardır. FSMA 204, her biri belirli KDE'lere sahip olan altı ana KDE'yi tanımlar:
| CTE | Tanım | Uygulanabilir |
|---|---|---|
| Büyüyor | Taze ürünlerin doğrudan tarladan toplanması | Meyve ve sebzeler, otlar |
| Hasat | Ekilmemiş gıdaların hasadı (kabuklu deniz ürünleri, yabani balıklar) | Yabani deniz ürünleri |
| Soğutma | Hasat sonrası ilk soğutma işlemi | Meyve ve sebzeler, deniz ürünleri |
| İlk Paketleme | Ürün ilk kez son haliyle paketleniyor | Tüm FTL öğeleri |
| Nakliye | Herhangi bir mülkiyet veya velayet devri | Tüm FTL öğeleri |
| Alma | FTL öğelerini başka bir operatörden alma | Tüm FTL öğeleri |
| Dönüşüm | Bir FTL öğesi yeni bir ürüne dahil edildiğinde | Transformatörler/üreticiler |
CTE için Anahtar Veri Öğeleri (KDE)
Her CTE için kural zorunlu KDE'leri tanımlar. Örnek: Nakliye CTE'si:
- İzlenebilirlik lot kodu (TLC) — benzersiz parti tanımlayıcı
- Sevk edilen miktar ve ölçü birimi
- Gönderim tarihi
- Gönderim noktasının konum açıklaması (TLC kaynak referansı)
- Taşıma belgesi referans numarası
- Alıcının adı ve adresi
- Gıdanın tanımı (FDA Gıda Tesisi Kayıt numarası dahil)
için CTE alma:
- Alınan TLC (önceki sevkiyattan)
- Alınan miktar
- Alındığı tarih
- Kabul noktasının konumu
- Gönderim belgesine referans
- TLC kaynağı (toplu kaynak)
FSMA 204 Uyumluluk Sisteminin Mimarisi
Üretimdeki bir FSMA 204 uyumluluk sistemi üç ana akışı yönetmelidir: verileri gerçek zamanlı olarak yakalayın operasyonlar sırasında, işleme ve depolama olayların değişmez bir şekilde ve hızlı yanıt FDA'nın talebi veya geri çekilmesi durumunda. Önerdiğimiz mimari olay odaklı ve bulutta yereldir:
Teknoloji Yığını
- API Katmanı: FastAPI (Python 3.11+) — CTE kaydı için REST uç noktası
- Birincil veritabanı: PostgreSQL 16 — izlenebilirlik için ilişkisel şema
- Etkinlik akışı: Apache Kafka - Değişmez mesajlar olarak CTE olayları
- Grafik geçişi: NetworkX veya Neo4j — tedarik zinciri geri izleme/ileri izleme
- Uyarı motoru: Python + Kereviz + Redis — çok kanallı eşzamansız kurallar
- Geri çağırma yönetimi: FastAPI + PostgreSQL saklı prosedürleri
- Raporlama: JSON/XML'de otomatik FDA geri çağırma bildirimi oluşturulması
Uçtan uca akış:
- Veri Yakalama: operatörler CTE'yi mobil/web API veya ERP entegrasyonu aracılığıyla kaydeder
- Olay İşleme: her CTE doğrulanır, zenginleştirilir ve Kafka'da yayınlanır
- İzlenebilirlik Grafiği: partilerin şecere grafiği gerçek zamanlı olarak güncellenir
- Uyarı Motoru: Etkinlik kuralları anormallikleri tespit eder ve bildirim gönderir
- Geri Çağırma Yönetimi: Kritik bir uyarı durumunda otomatik olarak geri çağırma iş akışını başlatır
- FDA Raporlaması: sıralanabilir listenin 24 saat içinde elektronik formatta oluşturulması
FSMA 204 İzlenebilirlik için PostgreSQL Veritabanı Tasarımı
Plan iki zıt gereksinimi karşılamalıdır: günlük operasyonlar için performans (CTE eki, iş sorgusu) e Geri çağırma durumunda geri izleme sorguları için olağanüstü hız. CTE'ye yönelik KDE değişkenleri için PostgreSQL'i ilişkisel tablolar ve JSONB dizinlerinin bir kombinasyonuyla kullanıyoruz.
-- ============================================================
-- FSMA 204 Compliance Database Schema
-- PostgreSQL 16
-- ============================================================
-- Estensione per UUID generation
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE EXTENSION IF NOT EXISTS "ltree"; -- Per path-based queries su genealogia
-- ============================================================
-- TABELLA: locations
-- Tutti i luoghi della filiera (aziende, magazzini, porti)
-- ============================================================
CREATE TABLE locations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
fda_facility_id VARCHAR(12) UNIQUE, -- FDA Food Facility Registration Number
name VARCHAR(255) NOT NULL,
address_line1 VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state_province VARCHAR(100),
country_code CHAR(2) NOT NULL, -- ISO 3166-1 alpha-2
postal_code VARCHAR(20),
location_type VARCHAR(50) NOT NULL
CHECK (location_type IN (
'farm', 'packing_house', 'processor',
'distributor', 'retailer', 'port', 'cold_storage'
)),
contact_email VARCHAR(255),
contact_phone VARCHAR(50),
coordinates POINT, -- lat/lon per geo-queries
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: food_items
-- Catalogo alimenti FTL con classificazione FSMA
-- ============================================================
CREATE TABLE food_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
ftl_category VARCHAR(50) NOT NULL
CHECK (ftl_category IN (
'fresh_cheese', 'shell_eggs', 'finfish',
'crustaceans', 'bivalve_mollusks', 'nut_butter',
'rte_deli_salads', 'fresh_produce'
)),
name VARCHAR(255) NOT NULL,
description TEXT,
fda_product_code VARCHAR(7), -- FDA product code
unit_of_measure VARCHAR(20) NOT NULL DEFAULT 'kg',
storage_temp_min DECIMAL(5,2), -- Celsius
storage_temp_max DECIMAL(5,2),
shelf_life_days INTEGER,
is_ftl_covered BOOLEAN DEFAULT true,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: traceability_lots
-- Core: ogni lotto tracciabile con TLC (Traceability Lot Code)
-- ============================================================
CREATE TABLE traceability_lots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tlc VARCHAR(50) UNIQUE NOT NULL, -- Traceability Lot Code
food_item_id UUID NOT NULL REFERENCES food_items(id),
origin_location_id UUID NOT NULL REFERENCES locations(id),
-- Quantità e dimensione lotto
initial_quantity DECIMAL(12,4) NOT NULL,
remaining_quantity DECIMAL(12,4) NOT NULL,
unit_of_measure VARCHAR(20) NOT NULL,
-- Date critiche
production_date DATE,
harvest_date DATE,
best_before_date DATE,
expiry_date DATE,
-- Genealogia: da quali lotti parent e stato derivato
parent_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
genealogy_path ltree, -- Es: "LOT_A.LOT_B.LOT_C"
-- Stato del lotto
status VARCHAR(20) NOT NULL DEFAULT 'active'
CHECK (status IN (
'active', 'recalled', 'quarantined',
'consumed', 'disposed', 'archived'
)),
-- Attributi FDA specifici
tlc_source_reference TEXT, -- Descrizione provenienza TLC
-- Metadati extra (variabili per tipo alimento)
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indice per query di genealogia rapida
CREATE INDEX idx_lots_genealogy ON traceability_lots USING GIST (genealogy_path);
CREATE INDEX idx_lots_status ON traceability_lots(status);
CREATE INDEX idx_lots_tlc ON traceability_lots(tlc);
CREATE INDEX idx_lots_food_item ON traceability_lots(food_item_id);
CREATE INDEX idx_lots_origin ON traceability_lots(origin_location_id);
CREATE INDEX idx_lots_attributes ON traceability_lots USING GIN (attributes);
-- ============================================================
-- TABELLA: cte_events
-- Registro immutabile di tutti i Critical Tracking Events
-- Questa tabella NON deve mai avere UPDATE o DELETE
-- ============================================================
CREATE TABLE cte_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'growing', 'harvesting', 'cooling',
'initial_packing', 'shipping', 'receiving',
'transformation'
)),
-- Lotti coinvolti
lot_id UUID NOT NULL REFERENCES traceability_lots(id),
related_lot_ids UUID[] DEFAULT ARRAY[]::UUID[], -- per transformation
-- Localizzazione evento
location_id UUID NOT NULL REFERENCES locations(id),
-- Timestamp evento (quando e accaduto, non quando e stato registrato)
event_timestamp TIMESTAMPTZ NOT NULL,
recorded_at TIMESTAMPTZ DEFAULT NOW(),
recorded_by VARCHAR(255) NOT NULL, -- user/system che ha registrato
-- KDE come JSONB (struttura varia per tipo CTE)
kde JSONB NOT NULL,
-- Per spedizioni: destinatario
destination_location_id UUID REFERENCES locations(id),
-- Documento di riferimento (bill of lading, PO, etc.)
reference_document_number VARCHAR(100),
reference_document_type VARCHAR(50),
-- Quantità movimentata
quantity DECIMAL(12,4),
unit_of_measure VARCHAR(20),
-- Hash per immutabilita (SHA-256 del payload)
event_hash CHAR(64) NOT NULL,
-- Firma digitale opzionale
digital_signature TEXT,
-- Metadati
notes TEXT,
metadata JSONB DEFAULT '{}'
-- NO updated_at: questo record e immutabile
);
-- Gli eventi NON si modificano: trigger di protezione
CREATE RULE no_update_cte AS ON UPDATE TO cte_events DO INSTEAD NOTHING;
CREATE RULE no_delete_cte AS ON DELETE TO cte_events DO INSTEAD NOTHING;
CREATE INDEX idx_cte_lot ON cte_events(lot_id);
CREATE INDEX idx_cte_type ON cte_events(event_type);
CREATE INDEX idx_cte_timestamp ON cte_events(event_timestamp DESC);
CREATE INDEX idx_cte_location ON cte_events(location_id);
CREATE INDEX idx_cte_destination ON cte_events(destination_location_id);
CREATE INDEX idx_cte_kde ON cte_events USING GIN (kde);
-- ============================================================
-- TABELLA: recall_events
-- Gestione recall con workflow completo
-- ============================================================
CREATE TABLE recall_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
recall_number VARCHAR(20) UNIQUE NOT NULL, -- Es: "RECALL-2026-001"
recall_type VARCHAR(20) NOT NULL
CHECK (recall_type IN ('class_i', 'class_ii', 'class_iii', 'market_withdrawal')),
-- Root cause
trigger_lot_ids UUID[] NOT NULL, -- Lotti che hanno scatenato il recall
trigger_reason VARCHAR(50) NOT NULL
CHECK (trigger_reason IN (
'pathogen_contamination', 'allergen_undeclared',
'foreign_material', 'chemical_contamination',
'mislabeling', 'temperature_abuse', 'other'
)),
trigger_description TEXT NOT NULL,
-- Scope calcolato automaticamente
affected_lot_ids UUID[] DEFAULT ARRAY[]::UUID[],
affected_quantity DECIMAL(12,4),
affected_unit VARCHAR(20),
-- Workflow stato
status VARCHAR(30) NOT NULL DEFAULT 'identified'
CHECK (status IN (
'identified', 'scope_determined', 'notifications_sent',
'removal_in_progress', 'effectiveness_check',
'closed', 'fda_reported'
)),
-- Timestamp workflow
identified_at TIMESTAMPTZ DEFAULT NOW(),
scope_determined_at TIMESTAMPTZ,
notifications_sent_at TIMESTAMPTZ,
fda_reported_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ,
-- FDA notification (entro 24h da richiesta)
fda_notified BOOLEAN DEFAULT false,
fda_report_json JSONB,
-- Responsabile
initiated_by VARCHAR(255) NOT NULL,
-- Metriche drill
traceback_seconds INTEGER, -- Tempo per completare traceback
traceforward_seconds INTEGER,
lots_traced_pct DECIMAL(5,2),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alert_rules
-- Configurazione regole per alert engine
-- ============================================================
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
description TEXT,
rule_type VARCHAR(30) NOT NULL
CHECK (rule_type IN (
'temperature_breach', 'lot_age', 'missing_cte',
'quantity_discrepancy', 'supplier_blacklist',
'pathogen_alert', 'custom'
)),
condition_json JSONB NOT NULL, -- Regola in formato JSON
severity VARCHAR(10) NOT NULL CHECK (severity IN ('low', 'medium', 'high', 'critical')),
channels TEXT[] NOT NULL, -- ['email', 'sms', 'webhook', 'slack']
recipients JSONB NOT NULL, -- {"email": [...], "phone": [...]}
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ============================================================
-- TABELLA: alerts
-- Alert generati dall'engine
-- ============================================================
CREATE TABLE alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
rule_id UUID REFERENCES alert_rules(id),
lot_id UUID REFERENCES traceability_lots(id),
cte_event_id UUID REFERENCES cte_events(id),
severity VARCHAR(10) NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
data JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'open'
CHECK (status IN ('open', 'acknowledged', 'resolved', 'escalated')),
acknowledged_by VARCHAR(255),
acknowledged_at TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
auto_recall_triggered BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_alerts_lot ON alerts(lot_id);
CREATE INDEX idx_alerts_severity ON alerts(severity);
CREATE INDEX idx_alerts_status ON alerts(status);
CREATE INDEX idx_alerts_created ON alerts(created_at DESC);
FastAPI ile REST API: Pydantic Modelleri ve CTE Uç Noktaları
API, tedarik zincirindeki tüm operatörler için giriş noktasıdır. Hızı, Pydantic ile otomatik doğrulaması ve FDA denetimleri için yararlı olan OpenAPI belgelerinin otomatik oluşturulması nedeniyle FastAPI'yi kullanıyoruz. Pydantic şablonları kuralın gerektirdiği KDE'leri yansıtır.
# fsma204/models.py
# Pydantic v2 models per FSMA 204 API
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
from uuid import UUID
from enum import Enum
class CTEEventType(str, Enum):
GROWING = "growing"
HARVESTING = "harvesting"
COOLING = "cooling"
INITIAL_PACKING = "initial_packing"
SHIPPING = "shipping"
RECEIVING = "receiving"
TRANSFORMATION = "transformation"
class FTLCategory(str, Enum):
FRESH_CHEESE = "fresh_cheese"
SHELL_EGGS = "shell_eggs"
FINFISH = "finfish"
CRUSTACEANS = "crustaceans"
BIVALVE_MOLLUSKS = "bivalve_mollusks"
NUT_BUTTER = "nut_butter"
RTE_DELI_SALADS = "rte_deli_salads"
FRESH_PRODUCE = "fresh_produce"
class LotStatus(str, Enum):
ACTIVE = "active"
RECALLED = "recalled"
QUARANTINED = "quarantined"
CONSUMED = "consumed"
DISPOSED = "disposed"
# ── KDE Models per CTE type ──────────────────────────────────
class ShippingKDE(BaseModel):
"""Key Data Elements per Shipping CTE - FSMA 204 §1.1330"""
tlc: str = Field(..., description="Traceability Lot Code")
quantity_shipped: float = Field(..., gt=0)
unit_of_measure: str
ship_date: date
tlc_source_reference: str = Field(..., description="Descrizione provenienza TLC")
transport_document_number: str
transport_document_type: str = Field(default="bill_of_lading")
receiver_fda_facility_id: str = Field(..., description="FDA registration del destinatario")
receiver_name: str
receiver_address: str
class ReceivingKDE(BaseModel):
"""Key Data Elements per Receiving CTE - FSMA 204 §1.1335"""
tlc_received: str = Field(..., description="TLC come apparso nel documento di spedizione")
quantity_received: float = Field(..., gt=0)
unit_of_measure: str
receive_date: date
location_description: str
reference_document_number: str
reference_document_type: str
lot_code_generator_location: Optional[str] = None
class GrowingKDE(BaseModel):
"""Key Data Elements per Growing CTE"""
tlc: str
harvest_date: date
field_id: str # Identificativo campo/parcella
grower_fda_facility_id: str
commodity: str
growing_method: str = "conventional" # conventional, organic, hydroponic
class InitialPackingKDE(BaseModel):
"""Key Data Elements per Initial Packing CTE"""
tlc: str
pack_date: date
quantity_packed: float
unit_of_measure: str
packaging_type: str
facility_fda_id: str
lot_code_generator_location: str
class TransformationKDE(BaseModel):
"""Key Data Elements per Transformation CTE"""
new_tlc: str # TLC del nuovo prodotto
input_tlcs: List[str] # TLC degli ingredienti usati
transformation_date: date
facility_fda_id: str
product_description: str
quantity_produced: float
unit_of_measure: str
# ── Request/Response Models ──────────────────────────────────
class CreateLotRequest(BaseModel):
food_item_id: UUID
origin_location_id: UUID
tlc: str = Field(..., min_length=3, max_length=50)
initial_quantity: float = Field(..., gt=0)
unit_of_measure: str = Field(default="kg")
production_date: Optional[date] = None
harvest_date: Optional[date] = None
best_before_date: Optional[date] = None
expiry_date: Optional[date] = None
parent_lot_ids: List[UUID] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
@field_validator('tlc')
@classmethod
def validate_tlc(cls, v: str) -> str:
"""TLC non deve contenere caratteri ambigui"""
forbidden = set('IO0l') # caratteri ambigui
if any(c in forbidden for c in v.upper()):
raise ValueError("TLC contiene caratteri ambigui (I, O, 0, l)")
return v.upper()
class RegisterCTERequest(BaseModel):
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_by: str = Field(..., min_length=3)
kde: Dict[str, Any] = Field(..., description="KDE specifici per il tipo di CTE")
destination_location_id: Optional[UUID] = None
reference_document_number: Optional[str] = None
reference_document_type: Optional[str] = None
quantity: Optional[float] = None
unit_of_measure: Optional[str] = None
notes: Optional[str] = None
class CTEEventResponse(BaseModel):
id: UUID
event_type: CTEEventType
lot_id: UUID
location_id: UUID
event_timestamp: datetime
recorded_at: datetime
recorded_by: str
kde: Dict[str, Any]
event_hash: str
class TracebackRequest(BaseModel):
lot_id: UUID
depth: int = Field(default=10, ge=1, le=50)
class TracebackResult(BaseModel):
lot_id: UUID
tlc: str
ancestors: List[Dict[str, Any]] # lotti upstream
descendants: List[Dict[str, Any]] # lotti downstream
events: List[Dict[str, Any]]
total_ancestors: int
total_descendants: int
trace_depth: int
computation_ms: int
class SortableListResponse(BaseModel):
"""Formato sortable list richiesto da FDA (§1.1375)"""
generated_at: datetime
requesting_lots: List[str] # TLC richiesti
records: List[Dict[str, Any]] # Record ordinabili
total_records: int
format_version: str = "FSMA204_v1"
# fsma204/api.py
# FastAPI application per FSMA 204 compliance
import hashlib
import json
import time
from uuid import UUID
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .models import (
CreateLotRequest, RegisterCTERequest,
CTEEventResponse, TracebackRequest,
TracebackResult, SortableListResponse
)
from .database import get_db
from .kafka_producer import publish_cte_event
from .alert_engine import check_alert_rules
app = FastAPI(
title="FSMA 204 Traceability API",
description="Food Traceability Compliance System per FDA Rule 204",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def compute_event_hash(payload: dict) -> str:
"""SHA-256 hash del payload CTE per immutabilita"""
canonical = json.dumps(payload, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Lot Management ──────────────────────────────────────────
@app.post("/api/v1/lots", status_code=201)
async def create_lot(
request: CreateLotRequest,
db: AsyncSession = Depends(get_db)
):
"""
Crea un nuovo lotto tracciabile.
Il TLC deve essere unico nel sistema.
"""
# Verifica unicita TLC
result = await db.execute(
text("SELECT id FROM traceability_lots WHERE tlc = :tlc"),
{"tlc": request.tlc}
)
if result.fetchone():
raise HTTPException(
status_code=409,
detail=f"TLC '{request.tlc}' già esistente nel sistema"
)
# Costruisce genealogy_path
genealogy_path = request.tlc
if request.parent_lot_ids:
# Recupera path del parent principale
parent_result = await db.execute(
text("SELECT genealogy_path, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.parent_lot_ids[0])}
)
parent = parent_result.fetchone()
if parent and parent.genealogy_path:
genealogy_path = f"{parent.genealogy_path}.{request.tlc}"
await db.execute(
text("""
INSERT INTO traceability_lots (
food_item_id, origin_location_id, tlc,
initial_quantity, remaining_quantity, unit_of_measure,
production_date, harvest_date, best_before_date, expiry_date,
parent_lot_ids, genealogy_path, attributes
) VALUES (
:food_item_id, :origin_location_id, :tlc,
:initial_quantity, :initial_quantity, :unit_of_measure,
:production_date, :harvest_date, :best_before_date, :expiry_date,
:parent_lot_ids::uuid[], :genealogy_path::ltree, :attributes::jsonb
)
"""),
{
"food_item_id": str(request.food_item_id),
"origin_location_id": str(request.origin_location_id),
"tlc": request.tlc,
"initial_quantity": request.initial_quantity,
"unit_of_measure": request.unit_of_measure,
"production_date": request.production_date,
"harvest_date": request.harvest_date,
"best_before_date": request.best_before_date,
"expiry_date": request.expiry_date,
"parent_lot_ids": [str(x) for x in request.parent_lot_ids],
"genealogy_path": genealogy_path,
"attributes": json.dumps(request.attributes)
}
)
await db.commit()
return {"message": "Lotto creato", "tlc": request.tlc}
# ── CTE Registration ────────────────────────────────────────
@app.post("/api/v1/cte-events", response_model=CTEEventResponse, status_code=201)
async def register_cte_event(
request: RegisterCTERequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""
Registra un Critical Tracking Event.
Il record e IMMUTABILE una volta creato (no update/delete).
"""
# Verifica che il lotto esista e sia attivo
lot_result = await db.execute(
text("SELECT id, status, tlc FROM traceability_lots WHERE id = :id"),
{"id": str(request.lot_id)}
)
lot = lot_result.fetchone()
if not lot:
raise HTTPException(status_code=404, detail="Lotto non trovato")
if lot.status in ('recalled', 'disposed'):
raise HTTPException(
status_code=422,
detail=f"Impossibile registrare CTE su lotto con stato '{lot.status}'"
)
# Calcola hash per immutabilita
payload = {
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp.isoformat(),
"kde": request.kde
}
event_hash = compute_event_hash(payload)
# Inserisce CTE (immutabile)
result = await db.execute(
text("""
INSERT INTO cte_events (
event_type, lot_id, location_id,
event_timestamp, recorded_by, kde,
destination_location_id, reference_document_number,
reference_document_type, quantity, unit_of_measure,
event_hash, notes
) VALUES (
:event_type, :lot_id, :location_id,
:event_timestamp, :recorded_by, :kde::jsonb,
:destination_location_id, :reference_document_number,
:reference_document_type, :quantity, :unit_of_measure,
:event_hash, :notes
)
RETURNING id, recorded_at
"""),
{
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"location_id": str(request.location_id),
"event_timestamp": request.event_timestamp,
"recorded_by": request.recorded_by,
"kde": json.dumps(request.kde),
"destination_location_id": (
str(request.destination_location_id)
if request.destination_location_id else None
),
"reference_document_number": request.reference_document_number,
"reference_document_type": request.reference_document_type,
"quantity": request.quantity,
"unit_of_measure": request.unit_of_measure,
"event_hash": event_hash,
"notes": request.notes
}
)
row = result.fetchone()
await db.commit()
# Aggiorna remaining_quantity per spedizioni
if request.event_type == "shipping" and request.quantity:
await db.execute(
text("""
UPDATE traceability_lots
SET remaining_quantity = remaining_quantity - :qty
WHERE id = :id
"""),
{"qty": request.quantity, "id": str(request.lot_id)}
)
await db.commit()
# Background: pubblica su Kafka e verifica alert rules
event_data = {
"id": str(row.id),
"event_type": request.event_type,
"lot_id": str(request.lot_id),
"tlc": lot.tlc,
"kde": request.kde,
"event_timestamp": request.event_timestamp.isoformat()
}
background_tasks.add_task(publish_cte_event, event_data)
background_tasks.add_task(check_alert_rules, event_data, str(request.lot_id))
return CTEEventResponse(
id=row.id,
event_type=request.event_type,
lot_id=request.lot_id,
location_id=request.location_id,
event_timestamp=request.event_timestamp,
recorded_at=row.recorded_at,
recorded_by=request.recorded_by,
kde=request.kde,
event_hash=event_hash
)
# ── Traceback / Traceforward ────────────────────────────────
@app.get("/api/v1/lots/{lot_id}/traceback")
async def traceback(lot_id: UUID, depth: int = 10, db: AsyncSession = Depends(get_db)):
"""
One-step-up AND full traceback: risale la filiera upstream.
Requisito FDA: completare entro 24h da richiesta.
"""
start_ms = time.monotonic()
# Recupera lotto root
root_result = await db.execute(
text("SELECT id, tlc, genealogy_path FROM traceability_lots WHERE id = :id"),
{"id": str(lot_id)}
)
root = root_result.fetchone()
if not root:
raise HTTPException(status_code=404, detail="Lotto non trovato")
# Usa ltree per trovare tutti gli antenati
ancestors_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id, initial_quantity
FROM traceability_lots
WHERE genealogy_path @> :path::ltree
AND id != :lot_id
ORDER BY nlevel(genealogy_path) ASC
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
ancestors = [dict(r._mapping) for r in ancestors_result.fetchall()]
# Trova tutti i discendenti (traceforward)
descendants_result = await db.execute(
text("""
SELECT id, tlc, status, genealogy_path,
food_item_id, origin_location_id
FROM traceability_lots
WHERE genealogy_path <@ :path::ltree
AND id != :lot_id
LIMIT :depth
"""),
{
"path": str(root.genealogy_path),
"lot_id": str(lot_id),
"depth": depth
}
)
descendants = [dict(r._mapping) for r in descendants_result.fetchall()]
# Recupera tutti gli eventi CTE per il lotto
events_result = await db.execute(
text("""
SELECT event_type, event_timestamp, recorded_by,
kde, location_id, event_hash
FROM cte_events
WHERE lot_id = :lot_id
ORDER BY event_timestamp ASC
"""),
{"lot_id": str(lot_id)}
)
events = [dict(r._mapping) for r in events_result.fetchall()]
elapsed_ms = int((time.monotonic() - start_ms) * 1000)
return TracebackResult(
lot_id=lot_id,
tlc=root.tlc,
ancestors=ancestors,
descendants=descendants,
events=events,
total_ancestors=len(ancestors),
total_descendants=len(descendants),
trace_depth=depth,
computation_ms=elapsed_ms
)
# ── FDA Sortable List ───────────────────────────────────────
@app.get("/api/v1/lots/sortable-list", response_model=SortableListResponse)
async def fda_sortable_list(
tlcs: str, # comma-separated TLC list
db: AsyncSession = Depends(get_db)
):
"""
Genera il sortable list in formato elettronico richiesto da FDA.
Deve essere producibile entro 24h da richiesta FDA (§1.1375).
"""
tlc_list = [t.strip() for t in tlcs.split(",")]
# Recupera tutti i record CTE per i TLC richiesti
records_result = await db.execute(
text("""
SELECT
tl.tlc,
ce.event_type,
ce.event_timestamp,
ce.recorded_by,
ce.kde,
ce.quantity,
ce.unit_of_measure,
l.name AS location_name,
l.fda_facility_id,
ce.reference_document_number,
ce.event_hash
FROM cte_events ce
JOIN traceability_lots tl ON ce.lot_id = tl.id
JOIN locations l ON ce.location_id = l.id
WHERE tl.tlc = ANY(:tlcs)
ORDER BY tl.tlc, ce.event_timestamp ASC
"""),
{"tlcs": tlc_list}
)
records = []
for row in records_result.fetchall():
r = dict(row._mapping)
r['event_timestamp'] = r['event_timestamp'].isoformat()
records.append(r)
return SortableListResponse(
generated_at=__import__('datetime').datetime.utcnow(),
requesting_lots=tlc_list,
records=records,
total_records=len(records)
)
Apache Kafka ile Değişmez Olay Kaynak Kullanımı
CTE'lerin değişmez doğası, Event Sourcing modeliyle mükemmel bir şekilde uyum sağlar. Her olay, tedarik zincirinde olup bitenlerle ilgili değişmez bir gerçektir. Kafka bu olayların:
- Dayanıklı: yapılandırılabilir saklama (ör. FSMA 204 için 7 yıl)
- Tekrar oynanabilir: herhangi bir tarihi noktada devleti yeniden inşa etmek
- Bölüme göre sıralanmış: Toplu sipariş garantisi (TLC ile bölümlendirme)
- Dağıtılmış: günde milyonlarca etkinliğe ölçeklenebilir
# fsma204/kafka_producer.py
# Kafka producer per CTE events
import json
import logging
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from datetime import datetime
logger = logging.getLogger(__name__)
# Schema Avro per CTE event (per Schema Registry)
CTE_EVENT_SCHEMA = """
{
"type": "record",
"name": "CTEEvent",
"namespace": "com.fsma204.traceability",
"fields": [
{"name": "id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "lot_id", "type": "string"},
{"name": "tlc", "type": "string"},
{"name": "event_timestamp", "type": "string"},
{"name": "kde", "type": "string"},
{"name": "event_hash", "type": "string"}
]
}
"""
# Configurazione Kafka
kafka_config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'fsma204-producer',
'acks': 'all', # Garanzia durabilita
'retries': 5,
'retry.backoff.ms': 200,
'compression.type': 'snappy'
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
logger.error(f"Kafka delivery failed: {err}")
else:
logger.debug(
f"CTE event delivered to {msg.topic()} "
f"partition {msg.partition()} offset {msg.offset()}"
)
async def publish_cte_event(event_data: dict):
"""
Pubblica un CTE event su Kafka.
Usa il TLC come partition key per garantire ordine per lotto.
"""
try:
message = json.dumps(event_data, default=str).encode('utf-8')
# Partition key = TLC: tutti gli eventi dello stesso lotto
# vanno alla stessa partizione, garantendo ordinamento
partition_key = event_data.get('tlc', event_data['lot_id'])
producer.produce(
topic='fsma204.cte.events',
key=partition_key.encode('utf-8'),
value=message,
callback=delivery_callback
)
producer.poll(0) # Flush asincrono
except Exception as e:
logger.error(f"Failed to publish CTE event: {e}", exc_info=True)
# ── Consumer per Alert Engine ───────────────────────────────
# fsma204/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import asyncio
async def start_cte_consumer():
"""
Consumer Kafka per il real-time alert processing.
Eseguito come background service separato dall'API.
"""
consumer_config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'fsma204-alert-engine',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Commit manuale per at-least-once
}
consumer = Consumer(consumer_config)
consumer.subscribe(['fsma204.cte.events'])
logger.info("CTE Consumer avviato, in ascolto su fsma204.cte.events")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka consumer error: {msg.error()}")
continue
try:
event = json.loads(msg.value().decode('utf-8'))
await process_cte_for_alerts(event)
consumer.commit(msg)
except Exception as e:
logger.error(f"Error processing CTE event: {e}", exc_info=True)
# Non committiamo: l'evento verrà riprocessato
finally:
consumer.close()
async def process_cte_for_alerts(event: dict):
"""Pipeline di processing per ogni CTE event"""
event_type = event['event_type']
kde = json.loads(event.get('kde', '{}'))
# 1. Controlla cold chain per eventi di spedizione/ricezione
if event_type in ('shipping', 'receiving'):
await check_cold_chain_continuity(event, kde)
# 2. Verifica completezza KDE per il tipo di evento
await validate_kde_completeness(event_type, kde, event['lot_id'])
# 3. Controlla se il fornitore e in blacklist
if 'supplier_fda_id' in kde:
await check_supplier_blacklist(kde['supplier_fda_id'], event['lot_id'])
# 4. Aggiorna grafo di tracciabilita in memoria (NetworkX)
await update_traceability_graph(event)
Uyarı Motoru: Gerçek Zamanlı Anormallik Tespiti
Uyarı motoru, CTE olaylarının sürekli akışını izler ve yapılandırılabilir kuralları uygular. Bildirimler önem derecesine göre birden fazla kanal üzerinden gönderilir. Bir uyarı critical geri çağırma iş akışını otomatik olarak tetikleyebilir.
# fsma204/alert_engine.py
# Alert engine per FSMA 204 compliance monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
import httpx
import smtplib
from email.mime.text import MIMEText
from twilio.rest import Client as TwilioClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from .database import get_db_session
logger = logging.getLogger(__name__)
class AlertSeverity:
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertRule:
"""Base class per le regole di alert"""
def __init__(self, rule_id: str, name: str, severity: str):
self.rule_id = rule_id
self.name = name
self.severity = severity
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
"""Ritorna un dict con i dettagli dell'alert, o None se ok"""
raise NotImplementedError
class ColdChainBreachRule(AlertRule):
"""
Rileva interruzioni nella catena del freddo.
Regola: se la temperatura registrata nei KDE supera la soglia
prevista per la categoria di alimento.
"""
TEMP_THRESHOLDS = {
'fresh_cheese': {'min': 1.0, 'max': 7.0},
'finfish': {'min': -2.0, 'max': 4.0},
'crustaceans': {'min': 0.0, 'max': 4.0},
'shell_eggs': {'min': 5.0, 'max': 7.2}, # FDA: < 45°F
'fresh_produce': {'min': 1.0, 'max': 10.0},
}
def __init__(self):
super().__init__("cold_chain_breach", "Cold Chain Breach", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
temp_recorded = kde.get('temperature_celsius')
if temp_recorded is None:
return None
# Recupera categoria alimento del lotto
result = await db.execute(
text("""
SELECT fi.ftl_category
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
WHERE tl.id = :lot_id
"""),
{"lot_id": event['lot_id']}
)
row = result.fetchone()
if not row:
return None
category = row.ftl_category
threshold = self.TEMP_THRESHOLDS.get(category)
if not threshold:
return None
breach = False
breach_detail = ""
if temp_recorded < threshold['min']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sotto minimo {threshold['min']}°C"
)
elif temp_recorded > threshold['max']:
breach = True
breach_detail = (
f"Temperatura {temp_recorded}°C sopra massimo {threshold['max']}°C"
)
if breach:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"Cold Chain Breach - TLC {event.get('tlc', 'N/A')}",
"description": (
f"{breach_detail} per categoria {category}. "
f"CTE: {event['event_type']}, "
f"timestamp: {event['event_timestamp']}"
),
"lot_id": event['lot_id'],
"data": {
"temperature": temp_recorded,
"threshold": threshold,
"category": category,
"event_type": event['event_type']
}
}
return None
class MissingKDERule(AlertRule):
"""
Verifica che tutti i KDE obbligatori siano presenti per il tipo CTE.
Mancanza di KDE = non-compliance FSMA 204.
"""
REQUIRED_KDE = {
'shipping': ['tlc', 'quantity_shipped', 'ship_date', 'receiver_name'],
'receiving': ['tlc_received', 'quantity_received', 'receive_date'],
'initial_packing': ['tlc', 'pack_date', 'quantity_packed', 'facility_fda_id'],
'growing': ['tlc', 'harvest_date', 'field_id', 'grower_fda_facility_id'],
'transformation': ['new_tlc', 'input_tlcs', 'transformation_date'],
}
def __init__(self):
super().__init__("missing_kde", "Missing KDE Fields", AlertSeverity.HIGH)
async def evaluate(self, event: dict, db: AsyncSession) -> Optional[dict]:
import json
event_type = event['event_type']
required = self.REQUIRED_KDE.get(event_type, [])
if not required:
return None
kde = event.get('kde') if isinstance(event.get('kde'), dict) else json.loads(event.get('kde', '{}'))
missing = [f for f in required if f not in kde or kde[f] is None]
if missing:
return {
"rule_id": self.rule_id,
"severity": self.severity,
"title": f"KDE Mancanti - CTE {event_type}",
"description": (
f"CTE {event_type} per TLC {event.get('tlc')} "
f"manca dei KDE obbligatori: {', '.join(missing)}"
),
"lot_id": event['lot_id'],
"data": {"missing_fields": missing, "event_type": event_type}
}
return None
# ── Notification System ──────────────────────────────────────
class MultiChannelNotifier:
def __init__(self, config: dict):
self.config = config
self.twilio = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
async def send_alert(self, alert_data: dict, rule_config: dict):
channels = rule_config.get('channels', ['email'])
severity = alert_data['severity']
tasks = []
if 'email' in channels:
tasks.append(self._send_email(alert_data, rule_config))
if 'sms' in channels and severity in ('high', 'critical'):
tasks.append(self._send_sms(alert_data, rule_config))
if 'webhook' in channels:
tasks.append(self._send_webhook(alert_data, rule_config))
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
logger.error(f"Notification failed: {r}")
async def _send_email(self, alert: dict, config: dict):
recipients = config.get('recipients', {}).get('email', [])
msg = MIMEText(
f"FSMA 204 Alert\n\n"
f"Severity: {alert['severity'].upper()}\n"
f"Title: {alert['title']}\n"
f"Description: {alert['description']}\n"
f"Lot ID: {alert['lot_id']}\n"
f"Timestamp: {datetime.utcnow().isoformat()}"
)
msg['Subject'] = f"[FSMA204-{alert['severity'].upper()}] {alert['title']}"
msg['From'] = self.config['smtp_from']
msg['To'] = ', '.join(recipients)
# In produzione usare asyncio-friendly SMTP
with smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.sendmail(self.config['smtp_from'], recipients, msg.as_string())
async def _send_sms(self, alert: dict, config: dict):
phones = config.get('recipients', {}).get('phone', [])
message_body = (
f"FSMA204 ALERT {alert['severity'].upper()}: "
f"{alert['title']} - Lot: {alert['lot_id']}"
)
for phone in phones:
self.twilio.messages.create(
body=message_body,
from_=self.config['twilio_from'],
to=phone
)
async def _send_webhook(self, alert: dict, config: dict):
webhook_url = config.get('recipients', {}).get('webhook_url')
if not webhook_url:
return
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"event": "fsma204_alert",
"severity": alert['severity'],
"data": alert
},
timeout=10.0
)
# ── Main check function ──────────────────────────────────────
ACTIVE_RULES = [
ColdChainBreachRule(),
MissingKDERule(),
# Aggiungere: SupplierBlacklistRule(), LotAgeRule(), etc.
]
async def check_alert_rules(event: dict, lot_id: str):
"""Chiamato come background task dopo ogni registrazione CTE"""
async with get_db_session() as db:
for rule in ACTIVE_RULES:
try:
alert_data = await rule.evaluate(event, db)
if alert_data:
# Salva alert su DB
await db.execute(
text("""
INSERT INTO alerts (rule_id, lot_id, severity, title, description, data)
SELECT ar.id, :lot_id, :severity, :title, :description, :data::jsonb
FROM alert_rules ar WHERE ar.name = :rule_name
"""),
{
"lot_id": lot_id,
"severity": alert_data['severity'],
"title": alert_data['title'],
"description": alert_data['description'],
"data": json.dumps(alert_data.get('data', {})),
"rule_name": rule.name
}
)
await db.commit()
# Auto-trigger recall se critical
if alert_data['severity'] == AlertSeverity.CRITICAL:
await auto_trigger_recall(lot_id, alert_data, db)
logger.warning(
f"Alert generato: {alert_data['severity']} - {alert_data['title']}"
)
except Exception as e:
logger.error(f"Error evaluating rule {rule.rule_id}: {e}", exc_info=True)
Geri Çağırma Yönetimi: İş Akışını 24 Saatten Kısa Sürede Tamamlayın
FSMA 204'ün en sıkı gereksinimi, geri izleme ve ileri izlemeyi tamamlama ve sıralanabilir listeyi FDA'ya iletme yeteneğidir. Talepten 24 saat sonra. Otomatik geri çağırma iş akışı, bu SLA'nın ölçekli operasyonlarını garanti etmenin tek yoludur. 2024 yılında FDA, ortalama tamamlanma süresi 73 saat olan ve yeni kuralın gerekliliklerinin çok ötesinde 241 gıda geri çağırma işlemi gerçekleştirdi.
# fsma204/recall_manager.py
# Recall Management con workflow automatizzato
import asyncio
import json
import logging
from datetime import datetime
from uuid import UUID, uuid4
from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
import networkx as nx
logger = logging.getLogger(__name__)
class RecallWorkflow:
"""
Gestisce il workflow completo di un recall FSMA 204.
Fasi:
1. IDENTIFIED - Identificazione del problema
2. SCOPE_DETERMINED - Calcolo scope (traceback + traceforward)
3. NOTIFICATIONS_SENT - Notifiche a tutti gli operatori coinvolti
4. REMOVAL_IN_PROGRESS - Rimozione dal mercato
5. EFFECTIVENESS_CHECK - Verifica efficacia
6. CLOSED / FDA_REPORTED
"""
def __init__(self, db: AsyncSession, notifier):
self.db = db
self.notifier = notifier
async def initiate_recall(
self,
trigger_lot_ids: List[UUID],
trigger_reason: str,
trigger_description: str,
initiated_by: str,
recall_type: str = "class_i"
) -> dict:
"""
Avvia un recall. Questo metodo deve completare
traceback + traceforward + scope in < 30 minuti
per rispettare il requisito 24h FDA complessivo.
"""
recall_number = self._generate_recall_number()
start_time = datetime.utcnow()
logger.info(
f"Recall {recall_number} avviato da {initiated_by} "
f"per {len(trigger_lot_ids)} lotti"
)
# Fase 1: Crea record recall
recall_id = await self._create_recall_record(
recall_number, trigger_lot_ids, trigger_reason,
trigger_description, initiated_by, recall_type
)
# Fase 2: Determina scope (traceback + traceforward)
scope_start = datetime.utcnow()
affected_lots = await self._determine_scope(trigger_lot_ids)
scope_seconds = int((datetime.utcnow() - scope_start).total_seconds())
# Fase 3: Aggiorna recall con scope e stati
await self._update_recall_scope(recall_id, affected_lots, scope_seconds)
# Fase 4: Genera FDA Recall Notice
fda_report = await self._generate_fda_report(
recall_number, trigger_lot_ids, affected_lots,
trigger_reason, trigger_description
)
# Fase 5: Marca lotti come recalled
await self._mark_lots_recalled(affected_lots)
# Fase 6: Invia notifiche a tutti gli operatori
notify_start = datetime.utcnow()
await self._send_recall_notifications(affected_lots, recall_number, trigger_description)
notify_seconds = int((datetime.utcnow() - notify_start).total_seconds())
# Aggiorna timestamp workflow
await self.db.execute(
text("""
UPDATE recall_events SET
scope_determined_at = NOW(),
notifications_sent_at = NOW(),
status = 'notifications_sent',
fda_report_json = :fda_report::jsonb,
traceback_seconds = :traceback_seconds,
lots_traced_pct = :lots_pct
WHERE id = :id
"""),
{
"id": str(recall_id),
"fda_report": json.dumps(fda_report),
"traceback_seconds": scope_seconds,
"lots_pct": 100.0 # aggiornare con verifica reale
}
)
await self.db.commit()
total_seconds = int((datetime.utcnow() - start_time).total_seconds())
logger.info(
f"Recall {recall_number} scope determinato in {scope_seconds}s, "
f"notifiche in {notify_seconds}s, totale {total_seconds}s"
)
return {
"recall_number": recall_number,
"recall_id": str(recall_id),
"affected_lots_count": len(affected_lots),
"scope_determination_seconds": scope_seconds,
"notification_seconds": notify_seconds,
"total_seconds": total_seconds,
"fda_report": fda_report
}
async def _determine_scope(self, trigger_lot_ids: List[UUID]) -> List[str]:
"""
Determina scope usando graph traversal.
Usa NetworkX per BFS su grafo di genealogia.
"""
# Costruisce grafo da DB
G = await self._build_traceability_graph()
affected = set()
for lot_id in trigger_lot_ids:
lot_str = str(lot_id)
if lot_str not in G:
continue
# Traceback: tutti gli antenati (upstream)
ancestors = nx.ancestors(G, lot_str)
affected.update(ancestors)
# Traceforward: tutti i discendenti (downstream)
descendants = nx.descendants(G, lot_str)
affected.update(descendants)
affected.add(lot_str)
return list(affected)
async def _build_traceability_graph(self) -> nx.DiGraph:
"""
Costruisce un grafo orientato della genealogia lotti
usando gli eventi CTE di tipo shipping/receiving/transformation.
"""
G = nx.DiGraph()
# Nodi: tutti i lotti attivi
lots_result = await self.db.execute(
text("SELECT id, tlc, status FROM traceability_lots WHERE status != 'archived'")
)
for row in lots_result.fetchall():
G.add_node(str(row.id), tlc=row.tlc, status=row.status)
# Archi: da shipping events
edges_result = await self.db.execute(
text("""
SELECT
s.lot_id AS from_lot,
r.lot_id AS to_lot
FROM cte_events s
JOIN cte_events r ON s.reference_document_number = r.reference_document_number
WHERE s.event_type = 'shipping'
AND r.event_type = 'receiving'
""")
)
for row in edges_result.fetchall():
G.add_edge(str(row.from_lot), str(row.to_lot))
# Archi: da transformation events
transform_result = await self.db.execute(
text("""
SELECT lot_id, related_lot_ids
FROM cte_events
WHERE event_type = 'transformation'
""")
)
for row in transform_result.fetchall():
for parent_id in (row.related_lot_ids or []):
G.add_edge(str(parent_id), str(row.lot_id))
return G
async def _generate_fda_report(
self,
recall_number: str,
trigger_lots: List[UUID],
affected_lots: List[str],
reason: str,
description: str
) -> dict:
"""
Genera il recall notice in formato FDA.
Struttura basata su FDA 21 CFR 7.46.
"""
# Recupera dettagli lotti coinvolti
lots_result = await self.db.execute(
text("""
SELECT tl.tlc, tl.initial_quantity, tl.unit_of_measure,
fi.name AS food_name, fi.ftl_category,
l.name AS origin_name, l.fda_facility_id,
tl.production_date, tl.best_before_date
FROM traceability_lots tl
JOIN food_items fi ON tl.food_item_id = fi.id
JOIN locations l ON tl.origin_location_id = l.id
WHERE tl.id = ANY(:affected)
"""),
{"affected": affected_lots}
)
affected_details = [dict(r._mapping) for r in lots_result.fetchall()]
return {
"recall_number": recall_number,
"report_date": datetime.utcnow().isoformat(),
"regulation": "FSMA Section 204 - 21 CFR Part 1 Subpart S",
"recall_type": "Voluntary",
"reason": reason,
"description": description,
"affected_products": affected_details,
"total_affected_lots": len(affected_lots),
"fsma_compliance": {
"rule": "Food Traceability Final Rule",
"response_format": "Electronic Sortable",
"24h_compliance": True
}
}
def _generate_recall_number(self) -> str:
year = datetime.utcnow().year
unique = str(uuid4())[:8].upper()
return f"RECALL-{year}-{unique}"
async def _create_recall_record(
self, recall_number, trigger_lot_ids, reason,
description, initiated_by, recall_type
) -> UUID:
result = await self.db.execute(
text("""
INSERT INTO recall_events (
recall_number, recall_type, trigger_lot_ids,
trigger_reason, trigger_description, initiated_by
) VALUES (
:number, :type, :lots::uuid[],
:reason, :desc, :by
)
RETURNING id
"""),
{
"number": recall_number,
"type": recall_type,
"lots": [str(l) for l in trigger_lot_ids],
"reason": reason,
"desc": description,
"by": initiated_by
}
)
row = result.fetchone()
await self.db.commit()
return row.id
async def _mark_lots_recalled(self, affected_lot_ids: List[str]):
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'recalled', updated_at = NOW()
WHERE id = ANY(:ids::uuid[])
"""),
{"ids": affected_lot_ids}
)
await self.db.commit()
async def _update_recall_scope(self, recall_id, affected_lots, scope_seconds):
await self.db.execute(
text("""
UPDATE recall_events SET
affected_lot_ids = :lots::uuid[],
status = 'scope_determined',
traceback_seconds = :seconds
WHERE id = :id
"""),
{
"lots": affected_lots,
"seconds": scope_seconds,
"id": str(recall_id)
}
)
await self.db.commit()
async def _send_recall_notifications(
self,
affected_lot_ids: List[str],
recall_number: str,
description: str
):
"""Notifica tutti gli operatori che hanno movimentato i lotti coinvolti"""
# Recupera contatti di tutti i location coinvolti
contacts_result = await self.db.execute(
text("""
SELECT DISTINCT l.contact_email, l.contact_phone, l.name
FROM cte_events ce
JOIN locations l ON ce.location_id = l.id
WHERE ce.lot_id = ANY(:lots::uuid[])
AND l.contact_email IS NOT NULL
"""),
{"lots": affected_lot_ids}
)
for contact in contacts_result.fetchall():
alert_payload = {
"severity": "critical",
"title": f"RECALL NOTICE {recall_number}",
"description": (
f"I lotti che avete ricevuto/spedito sono coinvolti nel recall {recall_number}. "
f"Motivo: {description}. "
"Sospendere immediatamente la distribuzione e contattare il responsabile."
),
"lot_id": "multiple"
}
rule_config = {
"channels": ["email"],
"recipients": {
"email": [contact.contact_email],
"phone": [contact.contact_phone] if contact.contact_phone else []
}
}
try:
await self.notifier.send_alert(alert_payload, rule_config)
except Exception as e:
logger.error(f"Failed to notify {contact.name}: {e}")
Sahte Geri Çağırma Alıştırması: Periyodik Performans Testi
FDA şiddetle tavsiye ediyor sahte hatırlama tatbikatı - Sistemin 24 saat gerekliliğine uygunluğunu doğrulamak için geri çağırma sürecinin periyodik simülasyonları. En az 6 ayda bir tatbikat yapılmalı ve sonuçları denetim için belgelenmelidir. Temel ölçümler şunlardır: tamamlanması gereken toplam süre, takip edilen partilerin yüzdesi, kapsamın belirlenmesi için geçen süre.
# fsma204/mock_recall_drill.py
# Mock recall drill con metriche performance
import asyncio
import random
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
logger = logging.getLogger(__name__)
class MockRecallDrill:
"""
Simula un recall scenario per testare le performance del sistema.
Il drill usa lotti MOCK (non production) con dati sintetici.
"""
def __init__(self, db, recall_workflow, lot_count: int = 100):
self.db = db
self.workflow = recall_workflow
self.lot_count = lot_count
self.drill_id = f"DRILL-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
async def run_drill(self) -> Dict[str, Any]:
"""
Esegue il drill completo e ritorna le metriche.
"""
logger.info(f"Avvio mock recall drill {self.drill_id}")
metrics = {
"drill_id": self.drill_id,
"started_at": datetime.utcnow().isoformat(),
"phases": {}
}
# Fase 1: Seed dati mock
phase_start = time.monotonic()
mock_lots = await self._seed_mock_data()
metrics['phases']['data_seeding'] = {
"duration_seconds": round(time.monotonic() - phase_start, 2),
"lots_created": len(mock_lots)
}
# Fase 2: Seleziona lotto trigger casuale
trigger_lot = random.choice(mock_lots[10:40]) # Dal mezzo della supply chain
logger.info(f"Drill trigger lot: {trigger_lot['tlc']}")
# Fase 3: Esegue recall
phase_start = time.monotonic()
recall_result = await self.workflow.initiate_recall(
trigger_lot_ids=[trigger_lot['id']],
trigger_reason="pathogen_contamination",
trigger_description=f"[DRILL] Mock Listeria contamination test - {self.drill_id}",
initiated_by="recall_drill_system",
recall_type="class_i"
)
recall_duration = time.monotonic() - phase_start
metrics['phases']['recall_execution'] = {
"duration_seconds": round(recall_duration, 2),
"scope_seconds": recall_result['scope_determination_seconds'],
"notification_seconds": recall_result['notification_seconds'],
"affected_lots": recall_result['affected_lots_count'],
"recall_number": recall_result['recall_number']
}
# Fase 4: Verifica compliance SLA
sla_24h_seconds = 24 * 3600
compliance_ok = recall_duration < sla_24h_seconds
metrics['sla_compliance'] = {
"target_seconds": sla_24h_seconds,
"actual_seconds": round(recall_duration, 2),
"compliant": compliance_ok,
"margin_hours": round((sla_24h_seconds - recall_duration) / 3600, 1)
}
# Fase 5: Cleanup dati mock
await self._cleanup_mock_data(mock_lots)
metrics['completed_at'] = datetime.utcnow().isoformat()
metrics['overall_pass'] = compliance_ok
self._log_drill_report(metrics)
return metrics
async def _seed_mock_data(self) -> list:
"""Crea lotti sintetici con CTE events per il drill"""
from sqlalchemy import text
mock_lots = []
for i in range(self.lot_count):
tlc = f"MOCK-{self.drill_id}-LOT-{i:04d}"
lot_data = {
"id": None,
"tlc": tlc,
"is_mock": True
}
# In produzione: inserire tramite lot creation API
# Per semplicità usiamo insert diretto nel drill
mock_lots.append(lot_data)
logger.info(f"Creati {len(mock_lots)} lotti mock per drill {self.drill_id}")
return mock_lots
async def _cleanup_mock_data(self, mock_lots: list):
"""Rimuove dati mock dopo il drill"""
from sqlalchemy import text
tlcs = [l['tlc'] for l in mock_lots]
# I mock TLC iniziano con "MOCK-" per identificazione sicura
await self.db.execute(
text("""
UPDATE traceability_lots
SET status = 'archived'
WHERE tlc LIKE 'MOCK-%' AND tlc = ANY(:tlcs)
"""),
{"tlcs": tlcs}
)
await self.db.commit()
logger.info(f"Cleanup drill {self.drill_id} completato")
def _log_drill_report(self, metrics: dict):
report_lines = [
f"",
f"=== MOCK RECALL DRILL REPORT ===",
f"Drill ID: {metrics['drill_id']}",
f"Started: {metrics['started_at']}",
f"",
f"FASI:",
]
for phase, data in metrics.get('phases', {}).items():
report_lines.append(f" {phase}: {json.dumps(data)}")
sla = metrics.get('sla_compliance', {})
status = "PASS ✓" if sla.get('compliant') else "FAIL ✗"
report_lines.extend([
f"",
f"SLA 24H COMPLIANCE: {status}",
f" Target: {sla.get('target_seconds')}s",
f" Actual: {sla.get('actual_seconds')}s",
f" Margin: {sla.get('margin_hours')}h",
f"",
f"OVERALL: {'PASS' if metrics.get('overall_pass') else 'FAIL'}",
f"================================",
])
logger.info('\n'.join(report_lines))
Mevzuat Karşılaştırması: FSMA 204 ile AB Reg. 178/2002 ve Birleşik Krallık Gıda Güvenliği Yasası
Birden fazla pazara ihracat yapan operatörler için düzenleyici çerçeveler arasındaki farkları anlamak kritik öneme sahiptir. "Bir adım yukarı, bir adım aşağı" prensibini paylaşsalar da ayrıntı düzeyi ve yaptırımlar açısından önemli ölçüde farklılaşıyorlar.
| Boyut | FSMA 204 (ABD) | AB Reg. 178/2002 | Birleşik Krallık Gıda Güvenliği Yasası 1990 + Reg. |
|---|---|---|---|
| Süpürgeler | Yalnızca FTL yiyecekleri (özel liste) | Tüm yiyecek ve yemler | Birleşik Krallık'taki tüm yiyecekler |
| Parçalılık | Etkinliğe özel CTE + KDE | Genel bir adım yukarı / bir adım aşağı | Bir adım yukarı/bir adım aşağı |
| FDA/yetkili yanıt süresi | 24 saat (elektronik sıralanabilir liste) | Belirli bir şart yok | Belirli bir şart yok |
| Kayıt formatı | Elektronik, sipariş edilebilir, birlikte çalışabilir | Belgelenmiş herhangi bir format | Belgelenmiş herhangi bir format |
| Tutulma | 24 ay | Kategoriye göre değişken | Kategoriye göre değişken |
| Yaptırımlar | İhlal başına günde 10.000 ABD dolarına kadar | Üye Devlete göre değişir | 20.000 GBP'ye kadar + hapis cezası |
| İthalatçılara başvuru | Evet: ABD'ye ithalat yapan herkes için geçerlidir | Evet: AB pazarındaki operatörler için | Evet: Birleşik Krallık pazarındaki operatörler için |
| Teknolojik standartlar | Yetki yok (ancak GS1 önerilir) | Arama emri yok | Arama emri yok |
İhracatçılar İçin Pratik Uygulamalar
ABD'ye taze mozzarella peyniri ihraç eden bir İtalyan şirketinin uyması gerekiyor öyle AB Reg. FSMA 204'e göre 178/2002. FSMA 204 daha katı olduğundan (özel CTE/KDE, 24 saat yanıt, elektronik format), FSMA 204 ile uyumlu bir sistem otomatik olarak AB Yönetmeliğiyle de uyumludur. En uygun strateji, FSMA 204'ü temel olarak uygulamak ve bunu AB/İngiltere gerekliliklerine uyarlamaktır.
ABD'ye İtalyan ihracatçıları üzerindeki etkisi
İtalya, ABD'ye (Kanada ve Meksika'dan sonra) yaklaşık 200 milyon dolarlık değerle üçüncü büyük gıda ve içecek ürünleri ihracatçısı konumundadır. 2024'te 8 milyar euro. FSMA 204'ten etkilenen ana kategoriler:
| Kategori | İhracat IT-ABD 2024 (tahmini) | FSMA 204 kapsamı | İşlem gerekli |
|---|---|---|---|
| Şarap | ~2,1 milyar dolar | Hayır (FTL'de değil) | Yalnızca FDA kaydı |
| Olgun peynirler (Parmesan, Grana, Pecorino) | ~350 milyon dolar | Hayır (tecrübeli kişiler hariç) | Yalnızca FDA kaydı + Ön Bildirim |
| Taze peynirler (mozzarella, burrata, ricotta) | ~120 milyon dolar | EVET — kritik öncelik | Tam CTE/KDE uygulaması |
| Zeytinyağı | ~706 milyon dolar | No | Yalnızca FDA kaydı |
| Sebze konserveleri (domates, püre) | ~200 milyon dolar | Kısmi (FTL'de taze domates) | Malzemelerin taze kalıp kalmadığını kontrol edin |
İtalyan Gıda KOBİ'si için Uyum Maliyetlerinin Tahmini
| Bileşen | Kurulum (tek seferlik) | Yıllık işletme maliyeti |
|---|---|---|
| Yazılım izlenebilirliği (SaaS veya özel) | 15.000 € – 80.000 € | 8.000 € – 20.000 €/yıl |
| Mevcut ERP/MES entegrasyonu | 10.000 € – 40.000 € | 2.000 € – 5.000 €/yıl |
| Donanım (tarayıcılar, RFID okuyucular, sensörler) | 5.000 € – 25.000 € | 1.000 € – 3.000 €/yıl (bakım) |
| Kişisel eğitim | 3.000 € – 10.000 € | 1.500 € – 3.000 €/yıl |
| Yasal tavsiye/FDA uyumluluğu | 5.000 € – 20.000 € | 3.000 € – 8.000 €/yıl |
| Tahmini toplam | 38.000 € – 175.000 € | 15.500 € – 39.000 €/yıl |
Mevcut Finansman: PNRR Geçiş 5.0
İtalyan gıda KOBİ'leri, dijital izlenebilirlik sistemlerini uygulama maliyetleri için PNRR Geçiş 5.0 fonlarına (12,7 milyar Avro tahsis edildi, %45'e kadar vergi kredisi) erişebilir. FSMA 204 sistemleri, soğuk zincir izleme için IoT sensörlerini entegre etmeleri durumunda "üretim süreçlerinin dijitalleştirilmesi" olarak nitelendirilir. Muhasebecinizle uygulanabilirliğini kontrol edin.
İtalyan KOBİ'leri için Önerilen Uyum Yol Haritası
- 1-2. Ay: Boşluk analizi — mevcut akışların haritasını çıkarın, FTL ürünlerini belirleyin, mevcut BT sistemlerini değerlendirin
- 3-4. Ay: Veritabanı tasarımı + API geliştirme (temel olarak bu makaledeki kodu kullanın)
- 5-6. Ay: Mevcut ERP (SAP, Dynamics, Sage) ile entegrasyon, operatör eğitimi
- 7-8. Ay: Bir ürün/hat üzerinde pilot çalışma yapın, KDE'nin eksiksizliğini doğrulayın, ilk deneme alıştırmasını yapın
- 9-10 Ay: Eksiksiz kullanıma sunma, aktif uyarı motoru, FDA belgeleri
- 11-12 Ay: İkinci deneme tatbikatı, ince ayar, FDA denetimine hazırlık
En İyi Uygulamalar ve Anti-Kalıplar
En İyi Uygulamalar
- Benzersiz ve net TLC: görsel olarak benzer karakterlerden kaçının (I/l/1, O/0). ABD tedarik zinciriyle birlikte çalışabilirlik için standart olarak GS1-128 veya SSCC'yi kullanın
- DB düzeyinde garanti edilen değişmezlik: yalnızca kod kurallarını değil, cte_events'te UPDATE/DELETE'i önleyen PostgreSQL tetikleyicilerini kullanın
- Denetim takibi için olay karması: Oluşturulma sırasındaki yükün SHA-256'sı — kayıtların değiştirilmediğini doğrulamanıza olanak tanır
- Düzenli olarak 24 saatlik ALS testi: Sentetik verilerle her 6 ayda bir deneme tatbikatları yapın, sonuçları belgeleyin
- GS1 ile senkronizasyon: konum_kimlikleri için GLN'yi (Küresel Konum Numarası) kullanın — EDI ve ABD ortak sistemleriyle uyumludur
- KDE Sürümü Oluşturma: FSMA 204 gelişebilir; Geçişleri kolaylaştırmak için kayıtlarınızda bir kde_schema_version alanı bulundurun
Kaçınılması Gereken Anti-Desenler
- KDE yalnızca kağıt üzerinde: birçok şirket KDE'leri PDF veya Excel sayfalarında saklar. Bu sistemlerle "elektronik, sıralanabilir format" ihtiyacının 24 saatte karşılanması mümkün değil
- Standart olmayan TLC: Alt ortaklara iletilmeyen dahili parti numaralarının kullanılması, şirketler arası geri izlemeyi imkansız hale getirir
- Yükselmeden uyarı: e-posta gönderen ancak kritik sorunlar için bir yükseltme protokolüne sahip olmayan bir uyarı sistemi (2 saat içinde yanıt yok → otomatik çağrı) FSMA ruhuna saygı göstermez
- Yalnızca dahili izlenebilirlik: FSMA 204 ortaklara bir adım yukarı VE bir adım aşağı gitmeyi gerektirir. İç sistem tek başına yeterli değil
- UTC senkronizasyon hatası: CTE zaman damgaları UTC cinsinden olmalı veya açıkça farklı olmalıdır. Belirsiz zaman damgaları, uluslararası geri çağırma durumunda zaman çizelgesinin yeniden oluşturulmasını imkansız hale getiriyor
Sonuçlar ve Sonraki Adımlar
FSMA Kuralı 204, bir paradigma değişimini temsil ediyor: kağıt belge olarak izlenebilirlikten, gerçek zamanlı yazılım altyapısı olarak izlenebilir. Bu makalede oluşturduğumuz sistem (depolama için PostgreSQL, API katmanı için FastAPI, olay akışı için Kafka, grafik geçişi için NetworkX) en kritik gereksinimi karşılama kapasitesine sahiptir: herhangi bir talepten sonra 24 saat içinde FDA'ya gönderilecek tam sıralanabilir liste.
İtalyan şirketleri için iyi haber: Son tarihin Temmuz 2028'e uzatılması, kademeli uygulama için yeterli süre sunuyor. Kötü haber: Başlamak için 2027'ye kadar bekleyen şirketler, şişirilmiş fiyatlarla ve uzun bekleme kuyruklarıyla danışmanlardan ve yazılım satıcılarından oluşan bir pazar bulacak.
2024 verileri açıkça konuşuyor: 241 FDA gıda geri çağırma, 1.392 hasta, 2023'e kıyasla hastaneye kaldırılan vakalar iki kattan fazla arttı. Parçalı izlenebilirlik bürokrasi değil, insan yaşamında ölçülebilir bir önlemedir.
FoodTech Serisi: Sonraki Makale
Serinin bir sonraki makalesi 07 — Dikey Tarım: Kontrol Paneli ve Otomatik KontrolHidroponik çiftlikler için gerçek zamanlı bir kontrol sistemi kuracağımız yer: IoT sensörleri (ışık, CO2, pH, EC), Streamlit kontrol panelleri ve metrekare başına verimi en üst düzeye çıkarmak için optimizasyon algoritmaları. Federicocalo.dev'i takip etmeye devam edin.
Seriler Arası: İlgili Bilgiler
- MLOps (Seri 12): MLflow ile üretimde uyarı motoru anormallik algılama modellerinin nasıl dağıtılacağı
- Yapay Zeka Mühendisliği (Seri 13): FDA belgelerine ilişkin RAG - doğru düzenleyici metinden alıntı yaparak uyumluluk sorularını yanıtlayan bir LLM
- PostgreSQL AI (Seri 22): Kontaminasyon profillerinde benzerlik araştırması için pgvector — geçmişteki salgınlarda yer alanlarla benzer özelliklere sahip partiler bulur
- DevOps Ön Ucu (Seri 9): FSMA 204 API'nin Kubernetes'te sürekli dağıtımı için CI/CD hattı







