Potravinový řetězec: ETL vzor od farmy k maloobchodníkovi
Každý rok přibližně třetina všech potravin vyrobených pro lidskou spotřebu je ztracena nebo vyplýtvána v potravinovém řetězci: hodnota, která jen v USA přesahuje 473 miliard dolarů a která se promítá do v miliardě jídel vyhozených každý den na celém světě. Paradoxně významný podíl z tento odpad nezávisí na klimatických nebo biologických faktorech, ale na datovém problému: viditelnosti nedostatečné, starší systémy nejsou integrované, KPI vypočítané pozdě a rozhodnutí učiněná na základě informací neúplné nebo nesprávné.
Globální trh řízení potravinového řetězce má v roce 2025 hodnotu 182,81 miliardy dolarů a do roku 2034 vzroste na 359,39 miliard (CAGR 7,8 %). Přesto 48 % dodavatelů potravin stále používá tabulkové procesory pro každodenní operace a 60 % uvádí opakující se manuální úlohy, které spotřebovávají drahocenný čas. Propast mezi tím, co technologie umožňuje a co zemědělsko-potravinářské společnosti skutečně implementují a obrovské.
Tento článek – nejnovější ze série FoodTech – se zabývá problémem u jeho kořenů: jak postavit a Robustní a škálovatelné potrubí ETL/ELT integrovat heterogenní data z farmy k maloobchodníkovi, organizujte toky pomocí Apache Airflow, transformujte data pomocí dbt, zajistěte kvalitu pomocí Great Očekávání a měření výkonu pomocí klíčových ukazatelů výkonu dodavatelského řetězce specifických pro potravinářský sektor.
Co se naučíte
- Kompletní datová architektura potravinového dodavatelského řetězce (farma → zpracování → distribuce → maloobchod)
- Heterogenní zdroje dat: ERP (SAP/Oracle), MES, LIMS, WMS, TMS, POS, IoT, EDI
- ETL vs ELT: kdy zvolit, jaký přístup k údajům o potravinách
- Kompletní Apache Airflow DAG pro potravinové potrubí s chybovým zpracováním a opakováním
- Modely DBT pro KPI dodavatelského řetězce: míra plýtvání, OTIF, míra plnění, dny dodávky
- Kvalita dat s velkými očekáváními pro chladící řetězce a data expirace
- Integrace starších systémů: SAP RFC/BAPI, EDI EDIFACT, CDC s Debezium
- Případová studie: zemědělské družstvo 500 farem, 3 zpracovatelská střediska, 200 prodejních míst
- Kompletní shrnutí celé série FoodTech (10 článků)
Řada FoodTech: Kde jsme
Toto je desátá a poslední epizoda seriálu FoodTech, která prozkoumala ty hlavní digitální technologie použitelné v celém potravinovém řetězci: od sběru dat v terénu až po maloobchod. Zde je celý obrázek:
| # | Titul | Technologie | Úroveň |
|---|---|---|---|
| 00 | IoT Pipeline pro přesné zemědělství s Pythonem a MQTT | IoT, MQTT, Python, Data Lake | Moderní |
| 01 | ML Edge for Crop Disease Detection: TensorFlow Lite na Raspberry Pi | TensorFlow Lite, Edge ML, Raspberry Pi | Moderní |
| 02 | Satelitní a meteorologická rozhraní API pro AgriTech: Prediktivní data | Sentinel, Planet, Weather API, NDVI | Moderní |
| 03 | Systém sledování potravin: Blockchain, RFID a IoT | Blockchain, RFID, IoT, Compliance | Moderní |
| 04 | Počítačová vize pro kontrolu kvality potravin s PyTorch YOLO | YOLO, PyTorch, počítačové vidění | Moderní |
| 05 | FSMA 204 Automation: Tracking, Alert and Recall přes Python | FSMA, Compliance, Python, Recall | Moderní |
| 06 | Automatizace vertikálního zemědělství: Robotické ovládání přes API | Robotika, API, automatizace | Moderní |
| 07 | Prognóza poptávky po redukci odpadu: ML Time-Series | LSTM, Prophet, Time-Series ML | Moderní |
| 08 | Dashboard v reálném čase pro Farm IoT s Angular a Grafana | Angular, Grafana, InfluxDB, Real-Time | Moderní |
| 09 | Potravinový řetězec: ETL vzor od farmy k maloobchodníkovi <-- Jste tady | Průtok vzduchu, dbt, velká očekávání, ETL | Moderní |
protože tento článek uzavírá sérii
Předchozí články se zabývaly jednotlivými uzly v řetězci: sběr dat IoT v terénu, ML edge modely, sledovatelnost blockchainu, počítačová vize pro kvalitu, soulad s FSMA, vertikální zemědělství, prognózování poptávky a řídicí panely v reálném čase. Tento článek je všechny spojuje: potrubí ETL a páteř, která integruje každý zdroj dat, transformuje nezpracované informace do použitelných KPI a přináší komplexní viditelnost z farmy do regálu supermarketu.
Datová architektura potravinového dodavatelského řetězce
Potravinový řetězec se skládá z odlišných uzlů, z nichž každý má své vlastní informační systémy, formáty data a frekvence aktualizace. ETL potrubí musí tuto heterogenitu robustně zvládnout.
# Architettura logica della supply chain alimentare
# (dati generati ad ogni step)
FARM SYSTEMS
|-- Sensori IoT (MQTT): temp suolo, umidita, pH, EC
|-- Sistema ERP agricolo: colture pianificate, costi input
|-- Registro trattamenti fitosanitari (LIMS)
|-- Geo-dati GPS macchinari agricoli
|-- API meteo (OpenMeteo, Copernicus)
|
v
PROCESSING PLANTS (Centri di Lavorazione)
|-- MES (Manufacturing Execution System): ordini produzione, rese
|-- LIMS: analisi chimiche, microbiologiche, allergeni
|-- ERP SAP/Oracle: distinta base, lotti, tracciabilita
|-- WMS (Warehouse Management): magazzino materie prime/finiti
|-- Sensori cold chain: temperatura/umidita durante lavorazione
|
v
DISTRIBUTION CENTERS
|-- WMS: gestione stock, picking, slotting
|-- TMS (Transport Management): spedizioni, veicoli, routing
|-- Sensori cold chain: logger temperatura nei camion frigoriferi
|-- EDI: ordini da retailer (EDIFACT ORDERS, DESADV)
|-- Tracker GPS: posizione veicoli in tempo reale
|
v
RETAIL POS
|-- POS data: vendite per SKU, store, ora
|-- Gestione shelf life: prodotti prossimi scadenza
|-- EDI: conferme ricezione (RECADV), fatture (INVOIC)
|-- Inventario store: stock disponibile per punto vendita
|
v
CONSUMER (segnali indiretti)
|-- App loyalty: acquisti per cliente, frequenza
|-- Reviews prodotto: qualità percepita
|-- Social sentiment: trend di consumo
Každý uzel generuje data v různých formátech (relační SQL, CSV/EDI plochý soubor, JSON z REST API, binární zprávy MQTT, starší soubory SFTP) s frekvencemi od milisekund (senzory IoT) až týdně (ERP report). Úkolem inženýra potravinových dat je toto vše sjednotit do jediného koherentního analytického modelu.
Heterogenní zdroje dat: Referenční tabulka
Před projektováním jakéhokoli potrubí je nezbytné prozkoumat zdroje. Zde jsou ty hlavní pro středně složitý potravinový dodavatelský řetězec:
| Systém | Typ | Formát | Objem/den | Frekvence | Přijatelná latence |
|---|---|---|---|---|---|
| SAP ERP (farma/závod) | ERP | RFC/BAPI, IDoc, SQL | 50 000–500 000 záznamů | Jednodenní dávka / CDC | T+1h |
| Oracle EBS | ERP | SQL, REST API | Rekord 100 000–1 M | Dávka / CDC | T+1h |
| MES (výroba) | MES | OPC-UA, SQL, REST | 1M–10M událostí | Téměř v reálném čase (1 min) | 5 min |
| LIMS (laboratoř) | LIMS | HL7, CSV, REST | Analýza 1K–10K | Denní | T+4h |
| WMS (sklad) | WMS | SQL, EDI, REST | 10–100 tisíc pohybů | Každých 15 min | 15 min |
| TMS (doprava) | TMS | REST API, EDI | Zásilky 5-50 tisíc | Každých 5 minut (GPS) | 5 min |
| Maloobchodní POS | POS | CSV, REST, SQL | 1 až 50 milionů transakcí | Hodinová/večerní dávka | T+2h |
| IoT senzory studeného řetězce | IoT | MQTT, JSON, Protobuf | 100M+ čtení | Každých 30 sekund – 5 min | V reálném čase (<1 min) |
| EDI partner (EDIFACT) | EDI | EDIFACT, X12, GS1 XML | 100–10 tisíc zpráv | Událostí/dávka | T+30 min |
| Počasí API (OpenMeteo) | Externí API | JSON REST | 1 000–10 000 hovorů | Hodinově | T+1h |
| AGEA (dotace SZP) | Veřejný orgán | XML, CSV SFTP | 1–100 souborů | Měsíční/sezónní | T+24h |
| GPS sledovač vozidel | IoT/telematika | REST, WebSockets | 10M+ GPS bodů | Každých 30 sec | <2 min |
Pozor: Starší ERP a "Extrakční Windows"
Mnoho zemědělských a zemědělsko-potravinářských ERP nepodporuje nativní CDC. Často jediný způsob, jak extrahovat dat a prostřednictvím nočních dávkových úloh, které zamykají tabulky, nebo prostřednictvím rozhraní SAP RFC/BAPI pevná údržbová okna. Naplánujte si okna těžby předem a předvídejte mechanismy obnovy v případě selhání noční dávky.
ETL vs ELT pro potravinový dodavatelský řetězec
Debata o ETL vs ELT je zvláště důležitá v potravinářském sektoru, kde data koexistují vysokofrekvenční (IoT cold chain) s tradičními dávkovými daty (SAP ERP) a regulačními požadavky přísné (sledovatelnost, FSMA 204).
| Kritérium | Tradiční ETL | Moderní ELT (dbt + Data Lake) |
|---|---|---|
| Transformace | Před nahráním (pracovní server) | V datovém skladu (nativní SQL) |
| Škálovatelnost | Omezeno serverem ETL | Stupnice s DWH (Snowflake, BigQuery, DuckDB) |
| Uchovávání nezpracovaných dat | Často ne (data již byla transformována) | Ano: Bronzová vrstva zachovává syrovou barvu |
| Auditovatelnost (FSMA) | Tvrdý: Rozmazaný rodokmen | Vynikající: kompletní graf linie dbt |
| Latence | Dávka (T+1h, T+1d) | Mikrodávka nebo streamování (Flink/Spark) |
| Chyby studeného řetězu | Ztráta dat, pokud selže ETL | Raw vždy uložen; pouze opakovat transformaci |
| Vyžaduje týmové dovednosti | Vývojář Java/Informatica/SSIS | SQL + Python (dbt + proudění vzduchu) |
| Náklady na infrastrukturu | Vždy zapnutý vyhrazený server | Platba za dotaz (Sněhová vločka/BigQuery) |
Pro moderní potravinový dodavatelský řetězec je doporučením architektura Medailon ELT:
# Architettura Medallion per Food Supply Chain
BRONZE LAYER (Raw, immutabile)
|-- Tutti i dati sorgente caricati as-is
|-- Partitioned by: source_system, ingestion_date
|-- Retention: 7 anni (compliance FSMA/EU)
|-- Formato: Parquet su S3/GCS o Delta Lake
|
v
SILVER LAYER (Cleansed, deduplicato)
|-- Dbt models: normalizzazione schemi eterogenei
|-- Deduplicazione lotti, SKU, location codes
|-- Type casting: date, temperature (C/F), pesi (kg/lb)
|-- NULL handling: sensori offline, ERP null fields
|-- PII masking: dati operatori, conducenti
|
v
GOLD LAYER (KPI pronti per business)
|-- Aggregazioni: fill rate, waste rate, OTIF per SKU
|-- Metriche cold chain: violazioni temperatura per lotto
|-- Dashboard BI: Power BI, Tableau, Grafana
|-- API ML: feature store per demand forecasting
|
v
DATA MART
|-- Retail mart: sell-through, shelf life at receipt
|-- Operations mart: OEE impianti, rendimenti produzione
|-- Finance mart: costo per lotto, margine per SKU
|-- Compliance mart: tracciabilita FSMA, recall readiness
Apache Airflow pro Food Pipeline Orchestration
Apache Airflow je de facto standardem pro orchestraci komplexních pracovních postupů ETL. V dodavatelském řetězci energie, jeden DAG musí koordinovat desítky zdrojů s latencemi, závislostmi a politikami různých pokusů. Podívejme se na kompletní DAG pro denní potrubí.
Hlavní DAG: Food Supply Chain Daily ETL
# food_supply_chain_dag.py
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="food_supply_chain_daily_etl",
description="Pipeline ETL giornaliera: farm -> processing -> distribution -> retail",
default_args=DEFAULT_ARGS,
schedule_interval="0 2 * * *", # ogni notte alle 02:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["foodtech", "supply-chain", "etl", "production"],
max_active_runs=1,
doc_md="""
## Food Supply Chain Daily ETL
Estrae dati da: SAP ERP, MES impianti, WMS, TMS, POS retail.
Carica su Bronze S3, poi lancia dbt per Silver e Gold.
""",
) as dag:
# ----------------------------------------------------------------
# STEP 1: Health checks sulle sorgenti
# ----------------------------------------------------------------
check_sap_available = HttpSensor(
task_id="check_sap_api_available",
http_conn_id="sap_erp_api",
endpoint="/api/health",
timeout=30,
poke_interval=60,
mode="reschedule",
soft_fail=False,
)
check_mes_available = HttpSensor(
task_id="check_mes_api_available",
http_conn_id="mes_plant_api",
endpoint="/health",
timeout=30,
poke_interval=60,
mode="reschedule",
)
# ----------------------------------------------------------------
# STEP 2: Estrazione SAP ERP (lotti, ordini, inventario)
# ----------------------------------------------------------------
@task(task_id="extract_sap_erp", retries=3)
def extract_sap_erp(**context: Any) -> dict:
"""Estrae dati da SAP ERP via RFC/BAPI wrapper REST."""
from src.connectors.sap_connector import SAPConnector
execution_date = context["ds"] # 'YYYY-MM-DD'
sap = SAPConnector(conn_id="sap_erp_api")
# Estrai: lotti produzione del giorno precedente
batches = sap.get_production_batches(
date=execution_date,
plants=["PL001", "PL002", "PL003"],
)
# Estrai: movimenti magazzino
stock_movements = sap.get_stock_movements(
date=execution_date,
movement_types=["101", "261", "501", "601"],
)
# Estrai: ordini di vendita confermati
sales_orders = sap.get_sales_orders(
date=execution_date,
status=["CONF", "DLVR"],
)
# Salva su S3 Bronze layer
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/sap_erp/dt={execution_date}"
s3.load_string(
string_data=batches.to_json(orient="records", lines=True),
key=f"{prefix}/batches.jsonl",
bucket_name="food-data-lake",
replace=True,
)
s3.load_string(
string_data=stock_movements.to_json(orient="records", lines=True),
key=f"{prefix}/stock_movements.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"SAP extraction OK: %d batches, %d movements, %d orders",
len(batches),
len(stock_movements),
len(sales_orders),
)
return {
"batches_count": len(batches),
"movements_count": len(stock_movements),
"orders_count": len(sales_orders),
}
# ----------------------------------------------------------------
# STEP 3: Estrazione dati cold chain (IoT sensori temperatura)
# ----------------------------------------------------------------
@task(task_id="extract_cold_chain_iot")
def extract_cold_chain_iot(**context: Any) -> dict:
"""Estrae letture temperatura da InfluxDB (IoT cold chain)."""
from influxdb_client import InfluxDBClient
execution_date = context["ds"]
client = InfluxDBClient(
url="{{ var.value.influxdb_url }}",
token="{{ var.value.influxdb_token }}",
org="foodtech-org",
)
query_api = client.query_api()
# Query Flux: temperature readings per lotto di trasporto
flux_query = f"""
from(bucket: "cold-chain")
|> range(start: {execution_date}T00:00:00Z,
stop: {execution_date}T23:59:59Z)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "celsius")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time", "lot_id"], columnKey: ["_field"], valueColumn: "_value")
"""
result = query_api.query_data_frame(flux_query)
# Calcola violazioni: temperatura fuori range per categoria prodotto
violations = result[
(result["celsius"] < result["min_temp_required"]) |
(result["celsius"] > result["max_temp_required"])
]
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/cold_chain/dt={execution_date}"
s3.load_string(
string_data=result.to_json(orient="records", lines=True),
key=f"{prefix}/temperature_readings.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"Cold chain extraction OK: %d readings, %d violations",
len(result),
len(violations),
)
if len(violations) > 0:
logger.warning(
"COLD CHAIN VIOLATIONS: %d eventi fuori temperatura!",
len(violations),
)
return {
"readings_count": len(result),
"violations_count": len(violations),
}
# ----------------------------------------------------------------
# STEP 4: Estrazione EDI dai partner retailer
# ----------------------------------------------------------------
@task(task_id="extract_edi_partners")
def extract_edi_partners(**context: Any) -> dict:
"""Processa file EDI EDIFACT da SFTP partners."""
from src.connectors.edi_connector import EDIFACTConnector
execution_date = context["ds"]
edi = EDIFACTConnector(sftp_conn_id="edi_sftp_partners")
# Download e parsing ORDERS (EDIFACT ORDERS D.01B)
orders = edi.process_orders(date=execution_date)
# Download e parsing DESADV (Despatch Advice)
despatch_advices = edi.process_desadv(date=execution_date)
# Download e parsing RECADV (Receiving Advice)
receiving_advices = edi.process_recadv(date=execution_date)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/edi_partners/dt={execution_date}"
s3.load_string(
string_data=orders.to_json(orient="records", lines=True),
key=f"{prefix}/orders.jsonl",
bucket_name="food-data-lake",
replace=True,
)
return {
"orders_count": len(orders),
"desadv_count": len(despatch_advices),
"recadv_count": len(receiving_advices),
}
# ----------------------------------------------------------------
# STEP 5: Estrazione POS retail
# ----------------------------------------------------------------
@task(task_id="extract_pos_data")
def extract_pos_data(**context: Any) -> dict:
"""Estrae dati vendite POS da database retailer partner."""
from src.connectors.retail_connector import RetailPOSConnector
execution_date = context["ds"]
pos = RetailPOSConnector(conn_id="retail_pos_db")
# Vendite per SKU/store con shelf life info
sales = pos.get_daily_sales(
date=execution_date,
include_promotions=True,
include_markdowns=True,
)
# Near-expiry: prodotti a <3 giorni scadenza ancora in shelf
near_expiry = pos.get_near_expiry_stock(
date=execution_date,
days_threshold=3,
)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/pos_retail/dt={execution_date}"
s3.load_string(
string_data=sales.to_json(orient="records", lines=True),
key=f"{prefix}/sales.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info("POS extraction OK: %d transazioni", len(sales))
return {
"sales_count": len(sales),
"near_expiry_skus": len(near_expiry),
}
# ----------------------------------------------------------------
# STEP 6: dbt run (Silver + Gold transformations)
# ----------------------------------------------------------------
dbt_run_silver = BashOperator(
task_id="dbt_run_silver_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:silver --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
dbt_run_gold = BashOperator(
task_id="dbt_run_gold_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:gold --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
# ----------------------------------------------------------------
# STEP 7: dbt test (data quality checks)
# ----------------------------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test_all",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt test --select tag:silver tag:gold --target prod"
),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ----------------------------------------------------------------
# STEP 8: Great Expectations validation
# ----------------------------------------------------------------
@task(task_id="run_great_expectations_validation")
def run_ge_validation(**context: Any) -> dict:
"""Lancia Great Expectations per validare data quality cold chain."""
from great_expectations.data_context import get_context
context_ge = get_context(project_root_dir="/opt/great_expectations")
execution_date = context["ds"]
results = {}
suites_to_run = [
("cold_chain_temperature", "bronze_cold_chain_checkpoint"),
("lot_traceability", "silver_lots_checkpoint"),
("kpi_supply_chain", "gold_kpi_checkpoint"),
]
for suite_name, checkpoint_name in suites_to_run:
result = context_ge.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"runtime_parameters": {"path": f"s3://food-data-lake/bronze/cold_chain/dt={execution_date}"},
"batch_identifiers": {"execution_date": execution_date},
},
)
results[suite_name] = result.success
failed = [name for name, ok in results.items() if not ok]
if failed:
raise ValueError(f"GE validation FAILED per suite: {failed}")
logger.info("Great Expectations: tutte le suite OK")
return results
# ----------------------------------------------------------------
# STEP 9: KPI alert (email se waste rate > soglia)
# ----------------------------------------------------------------
@task(task_id="check_kpi_alerts", trigger_rule=TriggerRule.ALL_SUCCESS)
def check_kpi_alerts(**context: Any) -> None:
"""Controlla KPI critici e invia alert se fuori soglia."""
from src.services.kpi_alert_service import KPIAlertService
execution_date = context["ds"]
alert_service = KPIAlertService(conn_id="snowflake_dw")
kpis = alert_service.get_daily_kpis(date=execution_date)
alerts_sent = []
# Waste rate > 5%: alert critico
if kpis["waste_rate_pct"] > 5.0:
alert_service.send_alert(
severity="HIGH",
message=f"Waste rate {kpis['waste_rate_pct']:.1f}% supera soglia 5%",
kpi="waste_rate",
value=kpis["waste_rate_pct"],
)
alerts_sent.append("waste_rate")
# OTIF < 95%: alert warning
if kpis["otif_pct"] < 95.0:
alert_service.send_alert(
severity="MEDIUM",
message=f"OTIF {kpis['otif_pct']:.1f}% sotto soglia 95%",
kpi="otif",
value=kpis["otif_pct"],
)
alerts_sent.append("otif")
# Cold chain violations > 0: alert immediato
if kpis["cold_chain_violations"] > 0:
alert_service.send_alert(
severity="CRITICAL",
message=f"{kpis['cold_chain_violations']} violazioni temperatura cold chain!",
kpi="cold_chain_violations",
value=kpis["cold_chain_violations"],
)
alerts_sent.append("cold_chain")
logger.info("KPI check completato. Alert inviati: %s", alerts_sent)
# ----------------------------------------------------------------
# DAG wiring: dipendenze tra task
# ----------------------------------------------------------------
extract_sap = extract_sap_erp()
extract_cold = extract_cold_chain_iot()
extract_edi = extract_edi_partners()
extract_pos = extract_pos_data()
ge_validation = run_ge_validation()
kpi_alerts = check_kpi_alerts()
# Health checks prima di tutto
[check_sap_available, check_mes_available] >> extract_sap
# Estrazione parallela di tutte le sorgenti
[extract_sap, extract_cold, extract_edi, extract_pos] >> dbt_run_silver
# Silver prima di Gold
dbt_run_silver >> dbt_run_gold
# Test dopo Gold
dbt_run_gold >> dbt_test
# GE validation e KPI alerts dopo i test
dbt_test >> ge_validation >> kpi_alerts
dbt for Transformations: SQL Models for Food KPIs
dbt (data build tool) je ideální nástroj pro transformaci dat ve stříbrné a zlaté vrstvě. Podívejme se na hlavní modely pro potravinový dodavatelský řetězec s integrovaným testováním a dokumentací.
Schema.yml: Dokumentace a test dbt
# models/silver/schema.yml
version: 2
models:
- name: silver_lots
description: "Lotti di produzione normalizzati da SAP ERP"
config:
tags: ["silver", "lots", "traceability"]
materialized: incremental
incremental_strategy: merge
unique_key: lot_id
columns:
- name: lot_id
description: "Identificativo univoco lotto (GS1 SGTIN)"
tests:
- not_null
- unique
- name: production_date
tests:
- not_null
- name: expiry_date
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "expiry_date > production_date"
- name: plant_code
tests:
- not_null
- accepted_values:
values: ["PL001", "PL002", "PL003", "PL004"]
- name: temperature_class
tests:
- accepted_values:
values: ["ambient", "chilled", "frozen", "ultra_frozen"]
- name: net_weight_kg
tests:
- dbt_utils.expression_is_true:
expression: "net_weight_kg > 0 AND net_weight_kg < 50000"
- name: silver_shipments
description: "Spedizioni normalizzate da TMS e EDI DESADV"
config:
tags: ["silver", "logistics"]
materialized: incremental
unique_key: shipment_id
columns:
- name: shipment_id
tests: [not_null, unique]
- name: planned_delivery_date
tests: [not_null]
- name: actual_delivery_date
description: "NULL se non ancora consegnato"
- name: otif_flag
description: "True se consegnato On-Time and In-Full"
tests:
- accepted_values:
values: [true, false, null]
quote: false
- name: gold_kpi_supply_chain
description: "KPI giornalieri supply chain aggregati per SKU/plant/customer"
config:
tags: ["gold", "kpi", "dashboard"]
materialized: table
columns:
- name: kpi_date
tests: [not_null]
- name: waste_rate_pct
description: "% prodotto sprecato su prodotto totale"
tests:
- dbt_utils.expression_is_true:
expression: "waste_rate_pct >= 0 AND waste_rate_pct <= 100"
- name: otif_pct
tests:
- dbt_utils.expression_is_true:
expression: "otif_pct >= 0 AND otif_pct <= 100"
Stříbrný model: Výrobní dávky
-- models/silver/silver_lots.sql
-- {{ config(tags=["silver", "lots"], materialized="incremental") }}
WITH source_sap AS (
SELECT
lot_number AS lot_id,
material_number AS sku_code,
plant AS plant_code,
production_date AS production_date,
-- Normalizza data scadenza (SAP usa formato YYYYMMDD)
TO_DATE(CAST(expiry_date_sap AS VARCHAR), 'YYYYMMDD') AS expiry_date,
quantity_kg AS net_weight_kg,
-- Classi temperatura da tabella materiali SAP
CASE
WHEN temp_cond_sap = '0001' THEN 'ambient'
WHEN temp_cond_sap = '0002' THEN 'chilled'
WHEN temp_cond_sap = '0003' THEN 'frozen'
WHEN temp_cond_sap = '0004' THEN 'ultra_frozen'
ELSE 'unknown'
END AS temperature_class,
-- Range temperatura richiesto (gradi Celsius)
CASE
WHEN temp_cond_sap = '0001' THEN STRUCT(15.0 AS min_c, 25.0 AS max_c)
WHEN temp_cond_sap = '0002' THEN STRUCT(2.0 AS min_c, 8.0 AS max_c)
WHEN temp_cond_sap = '0003' THEN STRUCT(-25.0 AS min_c, -15.0 AS max_c)
WHEN temp_cond_sap = '0004' THEN STRUCT(-60.0 AS min_c, -30.0 AS max_c)
END AS temp_range,
batch_status AS lot_status,
_ingestion_timestamp AS ingested_at
FROM {{ source('bronze', 'sap_erp_batches') }}
WHERE lot_number IS NOT NULL
AND quantity_kg > 0
{% if is_incremental() %}
AND _ingestion_timestamp > (
SELECT MAX(ingested_at) FROM {{ this }}
)
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY lot_id
ORDER BY ingested_at DESC
) AS row_num
FROM source_sap
),
final AS (
SELECT
lot_id,
sku_code,
plant_code,
production_date,
expiry_date,
net_weight_kg,
temperature_class,
temp_range,
lot_status,
-- Shelf life totale in giorni
DATEDIFF('day', production_date, expiry_date) AS shelf_life_days,
-- Shelf life residua rispetto a oggi
DATEDIFF('day', CURRENT_DATE, expiry_date) AS shelf_life_remaining_days,
ingested_at
FROM deduplicated
WHERE row_num = 1
AND expiry_date > production_date
)
SELECT * FROM final
Zlatý model: KPI Supply Chain
-- models/gold/gold_kpi_supply_chain.sql
-- {{ config(tags=["gold", "kpi"], materialized="table") }}
WITH lots AS (
SELECT * FROM {{ ref('silver_lots') }}
),
shipments AS (
SELECT * FROM {{ ref('silver_shipments') }}
),
pos_sales AS (
SELECT * FROM {{ ref('silver_pos_sales') }}
),
inventory AS (
SELECT * FROM {{ ref('silver_inventory_snapshot') }}
),
-- KPI 1: Waste Rate (% prodotto sprecato)
waste_kpi AS (
SELECT
production_date AS kpi_date,
plant_code,
-- Prodotto scaduto o smaltito come spreco
SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END) AS wasted_kg,
SUM(net_weight_kg) AS total_produced_kg,
ROUND(
100.0 * SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END)
/ NULLIF(SUM(net_weight_kg), 0),
2
) AS waste_rate_pct
FROM lots
GROUP BY 1, 2
),
-- KPI 2: OTIF (On-Time In-Full)
otif_kpi AS (
SELECT
DATE(planned_delivery_date) AS kpi_date,
customer_code,
COUNT(*) AS total_orders,
SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END) AS otif_orders,
ROUND(
100.0 * SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0),
2
) AS otif_pct
FROM shipments
WHERE actual_delivery_date IS NOT NULL
GROUP BY 1, 2
),
-- KPI 3: Days of Supply (scorta in giorni)
dos_kpi AS (
SELECT
inv.snapshot_date AS kpi_date,
inv.sku_code,
inv.location_code,
inv.stock_qty_units,
COALESCE(avg_daily_sales.avg_daily_units, 0) AS avg_daily_demand,
ROUND(
inv.stock_qty_units
/ NULLIF(avg_daily_sales.avg_daily_units, 0),
1
) AS days_of_supply
FROM inventory inv
LEFT JOIN (
SELECT sku_code, location_code,
AVG(units_sold) AS avg_daily_units
FROM pos_sales
WHERE sale_date >= DATEADD('day', -30, CURRENT_DATE)
GROUP BY 1, 2
) avg_daily_sales
ON inv.sku_code = avg_daily_sales.sku_code
AND inv.location_code = avg_daily_sales.location_code
),
-- KPI 4: Shelf Life at Receipt (qualità al ricevimento)
slr_kpi AS (
SELECT
DATE(s.actual_delivery_date) AS kpi_date,
l.sku_code,
-- % shelf life residua al momento della consegna
ROUND(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0),
1
) AS shelf_life_pct_at_receipt,
-- Media per SKU
AVG(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0)
) OVER (PARTITION BY DATE(s.actual_delivery_date), l.sku_code)
AS avg_slr_pct
FROM shipments s
INNER JOIN lots l ON s.lot_id = l.lot_id
WHERE s.actual_delivery_date IS NOT NULL
)
-- Join finale per dashboard KPI unificato
SELECT
w.kpi_date,
w.plant_code,
w.waste_rate_pct,
w.wasted_kg,
w.total_produced_kg,
o.otif_pct,
o.total_orders,
d.days_of_supply,
d.avg_daily_demand,
s.avg_slr_pct AS shelf_life_at_receipt_pct,
-- Score complessivo supply chain (0-100)
ROUND(
(
(100.0 - COALESCE(w.waste_rate_pct, 0)) * 0.30 +
COALESCE(o.otif_pct, 0) * 0.40 +
LEAST(d.days_of_supply / 14.0 * 100, 100) * 0.15 +
COALESCE(s.avg_slr_pct, 0) * 0.15
),
1
) AS supply_chain_health_score
FROM waste_kpi w
LEFT JOIN otif_kpi o ON w.kpi_date = o.kpi_date
LEFT JOIN dos_kpi d ON w.kpi_date = d.kpi_date
LEFT JOIN slr_kpi s ON w.kpi_date = s.kpi_date
Kvalita dat pro potravinová data: Velká očekávání
V potravinářském průmyslu není kvalita dat jen otázkou čistoty: je to požadavek regulační. Výsledkem mohou být nesprávné údaje o teplotách, datech expirace nebo kódech šarží Pokuty FSMA, velmi drahé stažení a poškození pověsti. Velká očekávání vám to umožňují definovat deklarativní pravidla ověřování a integrovat je do potrubí Airflow.
# great_expectations/expectations/cold_chain_temperature_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
context = gx.get_context()
# Crea o aggiorna la suite di aspettative
suite = context.add_or_update_expectation_suite(
expectation_suite_name="cold_chain_temperature_suite"
)
validator = context.get_validator(
batch_request=BatchRequest(
datasource_name="s3_bronze_datasource",
data_connector_name="cold_chain_connector",
data_asset_name="temperature_readings",
),
expectation_suite_name="cold_chain_temperature_suite",
)
# REGOLA 1: Temperatura mai NULL per prodotti refrigerati/surgelati
validator.expect_column_values_to_not_be_null(
column="celsius",
mostly=0.99, # tolleranza 1% per sensori offline temporanei
meta={
"notes": "Sensori IoT cold chain: max 1% di letture mancanti",
"regulatory_reference": "EU Reg 37/2005 cold chain monitoring",
}
)
# REGOLA 2: Range temperatura prodotti refrigerati (2-8°C)
validator.expect_column_values_to_be_between(
column="celsius",
min_value=-30.0,
max_value=35.0,
meta={
"notes": "Range assoluto: include frozen (-25°C) e ambient (25°C)",
}
)
# REGOLA 3: Lot ID sempre presente e formato GS1 SGTIN valido
validator.expect_column_values_to_match_regex(
column="lot_id",
regex=r"^[0-9]{14}$", # GS1 GTIN-14 format
meta={"notes": "GS1 SGTIN-14 standard per tracciabilita alimentare"}
)
# REGOLA 4: Timestamp monotonicamente crescente per sensore
validator.expect_column_values_to_be_increasing(
column="reading_timestamp",
strictly=False, # allow duplicates (es. batch upload)
parse_strings_as_datetimes=True,
meta={"notes": "Verifico che i timestamp non siano retroattivi"}
)
# REGOLA 5: Numero di letture per lotto (almeno 1 ogni 30 min)
validator.expect_table_row_count_to_be_between(
min_value=48, # 24h * 2 letture/ora = 48 minimo
max_value=10000, # max ragionevole per 24h
meta={"notes": "Minimo 1 lettura ogni 30 min per cold chain attiva"}
)
# REGOLA 6: Distribuzione temperatura per classe prodotto
# Prodotti refrigerati: mediana deve essere 2-8°C
validator.expect_column_median_to_be_between(
column="celsius",
min_value=1.0,
max_value=9.0,
meta={
"notes": "Solo per lotti con temperature_class='chilled'",
"applies_to": "filtered_batches_chilled",
}
)
# Salva le aspettative
validator.save_expectation_suite(discard_failed_expectations=False)
# Checkpoint per Airflow
checkpoint = context.add_or_update_checkpoint(
name="bronze_cold_chain_checkpoint",
config_version=1,
template_name=None,
module_name="great_expectations.checkpoint",
class_name="Checkpoint",
run_name_template="%Y%m%d-%H%M%S-cold-chain",
expectation_suite_name="cold_chain_temperature_suite",
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ var.value.slack_webhook_cold_chain }}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
KPI a metriky potravinového řetězce
Definujte správné KPI a základ pro rozhodování na základě dat. Zde jsou metriky zásadní pro potravinový dodavatelský řetězec se vzorci, odvětvovými měřítky a realistickými cíli.
| KPI | Vzorec | Průmyslový benchmark | Cíl nejlepší ve své třídě | Práh výstrahy |
|---|---|---|---|---|
| Perfektní sazba objednávky | Perfektní objednávky / Celkový počet objednávek * 100 | 85–92 % | >95 % | <90 % |
| OTIF (On-Time In-Full) | (Rychlé a kompletní dodávky) / Celkem * 100 | 88–94 % | >97 % | <92 % |
| Míra odpadu % | Kg odpadu / kg celkových produktů * 100 | 3–8 % | <2 % | >5 % |
| Obrat zásob | Roční COGS / Průměrná inventura | 12-20x (čerstvé) | >25x (čerstvé) | <10x |
| Dny zásobování (DOS) | Dostupné zásoby / Průměrná denní poptávka | 3-7 dní (čerstvé) | 2-4 dny | > 10 dní |
| Míra plnění | Dodané jednotky / objednané jednotky * 100 | 92–96 % | >98 % | <94 % |
| Doba použitelnosti při převzetí (SLR) | Zbývající dny do doručení / Celková trvanlivost * 100 | 60–75 % | >80 % | <60 % |
| Shoda s chladícím řetězem | Šarže bez porušení temp / Celkový počet šarží * 100 | 97–99 % | >99,5 % | <98 % |
| Přesnost předpovědi | 1 – (MAE / průměrná poptávka) * 100 | 75–85 % | >90 % | <75 % |
| Cyklus cash-to-cash | DIO + DSO - DPO (dny) | 15-35 dní | <15 dní | > 45 dní |
Jak vypočítat skóre zdraví dodavatelského řetězce
Váha klíčových ukazatelů výkonu v jediném složeném skóre pomáhá vedení monitorovat stav celkový dodavatelský řetězec s jediným číslem:
- OTIF: váha 40 % (přímý dopad na spokojenost zákazníka)
- Míra odpadu: váha 30 % (dopad na marže a udržitelnost)
- Doba použitelnosti při převzetí: hmotnost 15 % (spotřebitelská kvalita)
- Shoda s chladicím řetězcem: hmotnost 15 % (bezpečnost potravin a shoda)
Skóre > 90: vynikající | 80-90: dobrý | 70–80: bude zlepšeno | <70: kritické
Real-Time versus Batch: Kdy použít kterou architekturu
Ne vše musí být v reálném čase. Náklady na streamovací architekturu jsou značné lepší než tradiční dávková architektura. V potravinovém řetězci, výběr právo závisí na přijatelné latenci a provozních důsledcích.
| Use Case | Architektura | Latence | Nástroje | Motivace |
|---|---|---|---|---|
| Monitorování studeného řetězce (teplota) | Streamování v reálném čase | <1 min | Kafka + Flink | Porušení teploty = okamžitá ztráta dávky |
| Řízení odvolání | V reálném čase / řízené událostmi | <5 min | Kafka + upozornění | FSMA: Sledujte dávky za méně než 2 hodiny |
| GPS sledování vozidel | Téměř v reálném čase | <2 min | MQTT + InfluxDB | Aktualizovaný ETA pro zákazníky |
| Denní KPI (OTIF, odpad) | Denní várka | T+2h | Průtok vzduchu + dbt | Ranní provozní zpráva |
| Prognóza poptávky | Denní/týdenní dávka | T+6h | Proud vzduchu + MLflow | Výrobní plán nevyžaduje real-time |
| Snímek inventáře | Mikrodávka (každých 15 minut) | 15 min | Proudění vzduchu + sněhová vločka | Obsluha skladu: dostatečná viditelnost |
| Týdenní analýza odpadu | Týdenní várka | T+24h | dbt + nástroj BI | Strategická, nikoli operativní rozhodnutí |
| Audit shody FSMA | Dávka na vyžádání | NA ZNAMENÍ | dbt + datový trezor | Plánované kontroly, ne každodenní |
# Architettura Lambda per Food Supply Chain
# Speed layer (real-time) + Batch layer (accuracy)
# SPEED LAYER: Kafka + Apache Flink
# Processa eventi in milliseconds: cold chain, recall alerts
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("cold-chain-readings") \
.set_group_id("flink-cold-chain-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
cold_chain_stream = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(
Duration.of_seconds(30)
),
source_name="cold-chain-kafka",
)
# Filtra violazioni temperatura in tempo reale
violations_stream = cold_chain_stream \
.map(lambda msg: json.loads(msg)) \
.filter(lambda r: (
r.get("temperature_class") == "chilled" and
(r["celsius"] < 2.0 or r["celsius"] > 8.0)
)) \
.map(lambda r: {
**r,
"violation_severity": "CRITICAL" if r["celsius"] > 12.0 else "WARNING",
"violation_detected_at": datetime.utcnow().isoformat(),
})
# Scrivi violations su Kafka alert topic
violations_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("cold-chain-violations")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
env.execute("food-cold-chain-monitoring")
Integrace se staršími systémy
Většina zemědělsko-potravinářských společností pracuje se staršími systémy: SAP R/3 let '90, soubory EDIFACT EDI přes SFTP, databáze Oracle bez REST API. Integrace těchto systémy vyžadují specifické přístupy.
SAP Connector: RFC a BAPI
# src/connectors/sap_connector.py
"""
Connettore SAP ERP via pyrfc (SAP RFC/BAPI wrapper).
Richiede: SAP NetWeaver RFC SDK + pyrfc library.
"""
import pyrfc
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SAPConnectionConfig:
ashost: str
sysnr: str
client: str
user: str
passwd: str
lang: str = "IT"
class SAPConnector:
"""Wrapper per SAP RFC/BAPI calls per dati supply chain."""
def __init__(self, config: SAPConnectionConfig):
self._config = config
self._conn: Optional[pyrfc.Connection] = None
def __enter__(self):
self._conn = pyrfc.Connection(
ashost=self._config.ashost,
sysnr=self._config.sysnr,
client=self._config.client,
user=self._config.user,
passwd=self._config.passwd,
lang=self._config.lang,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.close()
def get_production_batches(
self,
date_from: str,
date_to: str,
plants: list[str],
) -> pd.DataFrame:
"""
Estrae lotti di produzione da SAP tramite BAPI_BATCH_GET_DETAIL.
Args:
date_from: Data inizio formato YYYYMMDD
date_to: Data fine formato YYYYMMDD
plants: Lista codici stabilimento SAP
"""
all_batches = []
for plant in plants:
try:
# Chiama BAPI SAP per lista lotti
result = self._conn.call(
"BAPI_BATCH_GET_DETAIL",
PLANT=plant,
DATE_FROM=date_from,
DATE_TO=date_to,
BATCH_STATUS="", # tutti gli stati
)
batches = result.get("BATCH_DETAIL_LIST", [])
logger.info(
"SAP plant %s: %d lotti estratti",
plant,
len(batches),
)
for batch in batches:
all_batches.append({
"lot_id": f"{batch['MATNR']}-{batch['CHARG']}",
"material_number": batch["MATNR"].strip(),
"lot_number": batch["CHARG"].strip(),
"plant": plant,
"production_date": batch.get("HSDAT"),
"expiry_date_sap": batch.get("VFDAT"),
"batch_status": batch.get("ZUSTD"),
"quantity_kg": float(batch.get("CLABS", 0)),
"temp_cond_sap": batch.get("MHDRZ", "0001"),
"_source": "SAP_ERP",
"_extracted_at": pd.Timestamp.utcnow().isoformat(),
})
except pyrfc.ABAPApplicationError as e:
logger.error(
"SAP BAPI error per plant %s: %s",
plant,
str(e),
)
# Non fallisce tutta l'estrazione per un singolo plant
continue
return pd.DataFrame(all_batches)
def get_stock_movements(
self,
date_from: str,
date_to: str,
movement_types: list[str],
) -> pd.DataFrame:
"""
Estrae movimenti di magazzino SAP via MB51 BAPI equivalent.
movement_types: ['101'=GR, '261'=GI to production, '601'=GI to customer]
"""
result = self._conn.call(
"BAPI_GOODSMVT_GETDETAIL",
GOODSMVT_DATE_FROM=date_from,
GOODSMVT_DATE_TO=date_to,
)
movements = []
for item in result.get("GOODSMVT_ITEMS", []):
if item["BWART"] in movement_types:
movements.append({
"movement_id": f"{item['MBLNR']}-{item['ZEILE']}",
"material": item["MATNR"].strip(),
"plant": item["WERKS"],
"movement_type": item["BWART"],
"quantity": float(item.get("MENGE", 0)),
"unit": item.get("MEINS", "KG"),
"lot_number": item.get("CHARG", ""),
"posting_date": item["BUDAT"],
"_source": "SAP_MOVEMENTS",
})
return pd.DataFrame(movements)
CDC s Debezium pro streamování ze starších databází
# debezium-config/oracle-erp-connector.json
# Configurazione Debezium per CDC da Oracle EBS (ERP legacy)
{
"name": "oracle-erp-food-supply-chain",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle-erp.internal",
"database.port": "1521",
"database.user": "debezium_cdc",
"database.password": "${ORACLE_CDC_PASSWORD}",
"database.dbname": "FOOD_ERP",
"database.pdb.name": "FOOD_PDB",
"database.server.name": "oracle-erp",
"table.include.list": "FOOD_ERP.INV_TRANSACTIONS,FOOD_ERP.MTL_LOT_NUMBERS,FOOD_ERP.WSH_DELIVERY_DETAILS",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.oracle.food_erp",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.oracle.food_erp",
"transforms": "route,addFields",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.addFields.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addFields.static.field": "_cdc_source",
"transforms.addFields.static.value": "oracle-erp",
"include.schema.changes": "true",
"snapshot.mode": "initial_only",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"topic.prefix": "food-cdc"
}
}
EDIFACT EDI Parser pro maloobchodní objednávky
# src/connectors/edi_connector.py
"""
Parser EDI EDIFACT per messaggi ORDERS, DESADV, RECADV (GS1 subset).
Usa pydifact library per parsing EDIFACT messages.
"""
import pandas as pd
from pydifact.segmentcollection import SegmentCollection
from pydifact.segments import Segment
from pathlib import Path
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def parse_edifact_orders(file_path: str | Path) -> pd.DataFrame:
"""
Parsa un file EDI EDIFACT ORDERS D.01B (GS1 EANCOM).
Restituisce DataFrame con righe ordine normalizzate.
Struttura messaggio ORDERS EDIFACT:
UNB (Interchange header)
UNH (Message header)
BGM (Order message identifier)
DTM (Date/time)
NAD+BY (Buyer = retailer)
NAD+SU (Supplier = nostro sistema)
LIN (Line item)
+QTY (Quantity ordered)
+DTM (Requested delivery date)
+PIA (Product code GTIN)
UNT (Message trailer)
"""
file_content = Path(file_path).read_text(encoding="latin-1")
collection = SegmentCollection.from_str(file_content)
orders = []
current_order: dict = {}
current_line: dict = {}
for segment in collection.segments:
tag = segment.tag
if tag == "BGM":
# Inizio nuovo ordine
current_order = {
"order_number": segment.elements[1] if len(segment.elements) > 1 else None,
"document_type": segment.elements[0][0] if segment.elements else None,
"lines": [],
}
elif tag == "DTM" and current_order:
# Data ordine o data consegna richiesta
qualifier = segment.elements[0][0]
date_str = segment.elements[0][1]
date_fmt = segment.elements[0][2]
if qualifier == "137": # Document date
try:
current_order["order_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
logger.warning("DTM parse error: %s", date_str)
elif qualifier == "2": # Requested delivery date
try:
current_order["requested_delivery_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
pass
elif tag == "NAD":
qualifier = segment.elements[0]
party_id = segment.elements[1][0] if len(segment.elements) > 1 else None
party_name = segment.elements[3] if len(segment.elements) > 3 else None
if qualifier == "BY": # Buyer (retailer)
current_order["buyer_gln"] = party_id
current_order["buyer_name"] = party_name
elif qualifier == "SU": # Supplier
current_order["supplier_gln"] = party_id
elif tag == "LIN":
# Salva riga precedente se esiste
if current_line and current_order:
current_order["lines"].append(current_line)
current_line = {
"line_number": segment.elements[0],
"gtin": None,
"qty_ordered": None,
"delivery_date": None,
}
elif tag == "PIA" and current_line:
# Product identification: GTIN-13 o GTIN-14
if len(segment.elements) > 1:
qualifier = segment.elements[1][1] if len(segment.elements[1]) > 1 else ""
if qualifier in ["SRV", "GE1"]: # GTIN qualifiers
current_line["gtin"] = segment.elements[1][0]
elif tag == "QTY" and current_line:
qualifier = segment.elements[0][0]
if qualifier == "21": # Ordered quantity
current_line["qty_ordered"] = float(segment.elements[0][1])
current_line["qty_unit"] = segment.elements[0][2] if len(segment.elements[0]) > 2 else "PCE"
# Normalizza output
for order in orders:
for line in order.get("lines", []):
orders.append({
"order_number": order.get("order_number"),
"order_date": order.get("order_date"),
"requested_delivery_date": order.get("requested_delivery_date"),
"buyer_gln": order.get("buyer_gln"),
"buyer_name": order.get("buyer_name"),
"supplier_gln": order.get("supplier_gln"),
"line_number": line.get("line_number"),
"gtin": line.get("gtin"),
"qty_ordered": line.get("qty_ordered"),
"qty_unit": line.get("qty_unit"),
"_source": "EDI_EDIFACT_ORDERS",
"_parsed_at": datetime.utcnow().isoformat(),
})
return pd.DataFrame(orders)
Případová studie: Zemědělské družstvo s 500 farmami
Podívejme se, jak tyto vzorce platí v reálném případě: velké zemědělské družstvo severní Itálie s 500 přidruženými zemědělskými společnostmi, 3 zpracovatelskými a zpracovatelskými centry, a 200 prodejních míst mezi velkým maloobchodem a místními trhy.
Profil družstva
- 500 přidružených farem: ovoce a zelenina, obiloviny, luštěniny - IoT MQTT data z 8 000 senzorů
- 3 obráběcí centra: Legacy SAP R/3 (2005), vlastní MES, LIMS
- 200 prodejních míst: Heterogenní POS (3 různí prodejci), EDI EDIFACT s 15 prodejci
- Výchozí situace: Excelové listy, oddělená data, žádná viditelnost od začátku do konce
- cíle: snížit odpad o 40 %, zlepšit OTIF na 95 %, soulad s FSMA
Fáze 1: Vyhodnocení a zdrojový inventář (1.–4. týden)
# Inventario sorgenti dati: output assessment
SOURCE_INVENTORY = {
"farm_iot": {
"count": 8000,
"protocol": "MQTT v3.1.1",
"broker": "Mosquitto on-premise",
"format": "JSON custom",
"issues": [
"Schema non standardizzato tra vendor IoT diversi",
"15% sensori con timestamp errati (clock drift)",
"3 farm su 500 senza connettivita stabile (4G intermittente)",
],
"recommended_fix": "Normalizzazione schema + edge buffer (Mosquitto local)",
},
"sap_erp": {
"version": "SAP R/3 4.7 (anno 2005)",
"tables_relevant": ["MCHA", "MCH1", "MSEG", "MKPF", "VBAK", "VBAP"],
"issues": [
"No REST API: solo RFC/BAPI",
"Batch extraction window: 01:00-03:00 (finestra ristretta)",
"Codifica caratteri: ISO-8859-1 (non UTF-8)",
"Date in formato SAP (YYYYMMDD come intero)",
],
"recommended_fix": "pyrfc + ETL incremental con delta timestamp",
},
"edi_partners": {
"partners": 15,
"standards": ["EANCOM D.01B", "X12 850/856", "GS1 XML 3.1"],
"transport": "SFTP (12 partner), AS2 (3 partner)",
"issues": [
"3 retailer usano EDIFACT non-standard (varianti proprietarie)",
"Mancanza di RECADV da 5 retailer: no conferma ricezione",
],
},
"pos_retail": {
"vendors": ["Cassa Easy", "NCR Aloha", "Custom PHP legacy"],
"issues": [
"3 formati CSV diversi",
"Nessun campo shelf_life nei dati POS",
"Granularità: ora (non transazione)",
],
},
}
Fáze 2: Implementace technického balíčku (5.–16. týden)
# Stack tecnico implementato per la cooperativa
INFRASTRUCTURE = {
"data_lake": "AWS S3 (Medallion: Bronze/Silver/Gold)",
"compute": "AWS EMR Serverless (Spark per bulk load iniziale)",
"warehouse": "Snowflake (pay-per-query, snowpark per ML features)",
"orchestration": "Apache Airflow 2.9 su AWS MWAA (managed)",
"transformation": "dbt Cloud (Team plan: 8 developers)",
"data_quality": "Great Expectations + Slack alerts",
"streaming": "Apache Kafka su MSK (managed) per cold chain",
"monitoring": "Grafana + Prometheus (Airflow metrics, DWH queries)",
"ci_cd": "GitHub Actions (dbt CI: test su ogni PR)",
"secret_management": "AWS Secrets Manager",
}
TIMELINE = [
{"week": "1-4", "milestone": "Assessment sorgenti + architettura design"},
{"week": "5-7", "milestone": "Bronze layer: connettori SAP + IoT MQTT"},
{"week": "8-10", "milestone": "Bronze layer: EDI + POS + LIMS"},
{"week": "11-12","milestone": "Silver layer: dbt models + data quality"},
{"week": "13-14","milestone": "Gold layer: KPI dashboard + alerting"},
{"week": "15-16","milestone": "UAT + go-live + team training"},
}
TEAM = {
"data_engineers": 3,
"dbt_developers": 2,
"sap_consultant": 1, # part-time per RFC/BAPI
"project_manager": 1,
}
Výsledky získané po 6 měsících
| KPI | Před (základní hodnota) | Po 6 měsících | Zlepšení |
|---|---|---|---|
| Míra odpadu | 7,2 % | 4,3 % | -40 % odpadu (-2,9 pp) |
| OTIF | 88 % | 94 % | +6pp |
| Doba použitelnosti při převzetí | 58 % | 74 % | +16 str |
| Shoda s chladícím řetězem | 94 % (odhad) | 99,1 % (měřeno) | +5,1pp se skutečnou viditelností |
| Vyvolat čas rozlišení | 48-72 hodin | 2-4 hodiny | -93% doba odezvy |
| Manuální zpráva ředitele | 3-4 hodiny/týden | Automatická palubní deska | -100% ruční práce |
| Otázka přesnosti předpovědi | 71 % | 86 % | +15 str |
| Dny zásobování (čerstvé) | 8-12 dní | 4-6 dní | -50% přebytečné zásoby |
Poučení z případové studie
- SAP RFC/BAPI je pomalejší, než si myslíte: noční dávkové okno bylo nejkritičtějším úzkým hrdlem. Řešením bylo vyjednat okno s IT širší a pro minimalizaci objemu použijte přírůstkovou extrakci.
- EDI není nikdy standardní: z 15 maloobchodních partnerů mělo 11 varianty vlastníky standardu EANCOM. Dokončení analyzátoru EDIFACT trvalo 2 týdny kalibrace podle specifikací každého partnera.
- Kvalita dat je kulturní problém, nejen technický: 30 % z problémy s kvalitou vyplynuly z nesprávného ručního zadávání operátorů do SAP. Technické řešení (Great Expectations) problém zvýraznilo, ale rozlišení ano vyžaduje školení a řízení změn.
- Bronzová vrstva uložit vše: třikrát za šest měsíců to bylo nutné znovu spusťte Silver transformace, abyste opravili chyby v modelech dbt. Díky Bronzová vrstva je neměnná, žádná data se neztratí.
- dbt CI/CD mění způsob vaší práce: otestujte každý model na každém PR zamezilo odhadem 4 produkčním incidentům v prvním měsíci.
Kompletní shrnutí řady FoodTech
To byla dlouhá cesta přes technologie, které transformují průmysl jídlo. Zde je to, co jsme společně vybudovali v této sérii 10 článků:
| # | Položka | Naučené klíčové pojmy | Zásobník technologií |
|---|---|---|---|
| 00 | IoT Pipeline pro přesné zemědělství | MQTT broker, polní senzory, příjem datového jezera, úrovně QoS | Python, MQTT, Mosquitto, MinIO |
| 01 | ML Edge pro detekci onemocnění plodin | Kvantování TensorFlow Lite, odvození hran, nasazení modelu na ARM | TFLite, Raspberry Pi, Python, OpenCV |
| 02 | Satelitní a meteorologická rozhraní API pro AgriTech | Sentinel-2 NDVI, Planet Labs API, inženýrství funkcí počasí pro ML | Sentinel Hub, OpenMeteo, rasterio, GeoPandas |
| 03 | Systém sledování potravin | Privátní blockchainy (Hyperledger), RFID EPCIS, standardy GS1 | Hyperledger Fabric, RFID, EPCIS, Python |
| 04 | Počítačové vidění pro kontrolu kvality | Doladění YOLO na produktových datových sadách, odvození průmyslového potrubí | PyTorch, YOLO, FastAPI, OpenCV |
| 05 | Automatizace FSMA 204 | KDE, CTE požadované potraviny, sledovatelnost šarže, automatizace stažení | Python, FastAPI, PostgreSQL, upozornění |
| 06 | Automatizace vertikálního zemědělství | Řízení parametrů prostředí, robotické plánování, REST API pro farmy | Python, FastAPI, InfluxDB, MQTT |
| 07 | Prognóza poptávky po snížení odpadu | LSTM, Prophet, inženýrství funkcí sezónnosti potravin, zpětné testování | PyTorch, Prophet, MLflow, pandy |
| 08 | Řídicí panel v reálném čase pro Farm IoT | Úhlové signály, poskytování Grafany, dotazy InfluxDB, upozornění | Angular 21, Grafana, InfluxDB, WebSocket |
| 09 | Dodavatelský řetězec ETL od farmy k maloobchodníkovi | Architektura medailonu, Airflow DAG, modely dbt, velká očekávání, KPI | Airflow, dbt, Snowflake, GE, SAP RFC, EDI |
Plán pro ty, kteří se chtějí dozvědět více
Pokud jste četli celou sérii, máte nyní pevný základ pro řešení skutečných FoodTech projektů. Zde je strukturovaný plán, jak se ponořit hlouběji do každé domény:
Doporučený hloubkový kurz
- IoT a Edge Computing: studie MQTT 5.0 (oproti 3.1.1 použité v této studii článek), Azure IoT Hub nebo AWS IoT Core pro spravovaná řešení a Apache NiFi pro data příjem z heterogenních zdrojů internetu věcí.
- ML ve výrobě: série MLOps tohoto blogu vás naučí, jak na to verze modelů, provádějte A/B testování a monitorujte drift ve výrobě pomocí MLflow a Seldon.
- Architektura dat: zjistěte více o Data Lakehouse (série Data & AI Business), abyste porozuměli srovnání Snowflake, Databricks a Apache Iceberg platformy pro analýzu dodavatelského řetězce.
- LLM a generativní AI pro dodavatelský řetězec: Velké jazykové modely mohou extrahovat strukturované informace z dokumentů o shodě, analyzovat smlouvy s dodavateli a generovat automatické zprávy. Série blogu AI Engineering zahrnuje RAG a jemné ladění.
- Průmyslové standardy ke studiu: Globální standardy GS1 (GTIN, GLN, SSCC, EPCIS), FSMA 204 (FDA), EU Reg. 178/2002 (sledovatelnost potravin), ISO 22000 (HACCP).
- Příslušné certifikace: AWS Certified Data Engineer, dbt Certified Vývojář, Astronomer Certified Apache Airflow Developer, Databricks Data Engineer Associate.
Osvědčené postupy a anti-vzory pro ETL potravin
Nejlepší postupy
- Bronzová vrstva neměnná: nikdy neupravujte nezpracovaná data. Pokud najdete chybu u modelu Silver dbt opravte model a spusťte jej znovu – bronz zůstane nedotčen. To je zásadní pro FSMA a EU Reg. 178/2002 revizní záznamy.
-
Idempotence DAG proudění vzduchu: každý úkol musí být možné znovu provést
bez vedlejších účinků. USA
replace=Truepro nahrávání S3,MERGEmístoINSERTpro databáze a jedinečné klíče v inkrementálních modelech dbt. -
Explicitní časové limity pro každý úkol: starší systémy jako SAP RFC mohou
zablokovat na dobu neurčitou. Vždy nastaveno
execution_timeout=timedelta(hours=2)u každého úkolu proudění vzduchu. - Monitorování prodlevy při požití: potrubí, které začíná ve 02:00 ale ne kompletní do 06:00 (před provozní směnou) a zbytečné. Monitor i časy dokončení a nastavte upozornění SLA na proudění vzduchu.
- Oddělení schématu EDI pro partnery: udržovat vyhrazený analyzátor EDI pro každého maloobchodního partnera. Proprietární varianty znemožňují parser unikátní univerzální.
- Zdokumentované datum původu: dbt automaticky vygeneruje graf linie. Použijte jej k zodpovězení otázek auditu: „Odkud pochází tento OTIF KPI?“.
Anti-vzory, kterým je třeba se vyhnout
- Transformace v extrakční vrstvě (klasické ETL): transformovat data před uložením do bronzové vrstvy znamená ztrátu možnosti dělat doplnit nebo opravit chyby. Vždy načtěte raw, transformujte později pomocí dbt.
- Monolitický DAG pro všechno: stane se jeden DAG s 50+ úkoly nemožné odladit. Rozdělit podle domény (SAP DAG, IoT DAG, EDI DAG) a používat Airflow TriggerDagRunOperator pro cross-DAG závislosti.
- Ignorování časových pásem: farma na Sicílii, distribuční centrum v Miláně a prodejce v Německu používají různá časová pásma. Vždy převádějte na UTC v bronzové vrstvě a všude používá časová razítka s ohledem na časové pásmo.
- Pevně zakódované přihlašovací údaje v DAG: NIKDY nezadávejte hesla SAP nebo tokeny API v kódu proudění vzduchu. Použijte Airflow Connections and Variables nebo AWS Secrets Manager.
- Přeskočit kvalitu dat na bronzu: ověřit pouze zlatou vrstvu a příliš mnoho pozdě. Problémy by měly být identifikovány co nejdříve: nesprávný údaj o teplotě v bronzu se stává falešně pozitivním vyvoláním ve zlatě.
- Airflow Scheduler jako vykonavatel: úkol nesmí být proveden ze samotného plánovače proudění vzduchu. Vždy používejte samostatný exekutor (CeleryExecutor, KubernetesExecutor) k izolaci velkého zatížení.
Závěry: Konec cesty, začátek další
Dosáhli jsme konce série FoodTech. V deseti článcích jsme postavili kus kousek po kousku vizi potravinového dodavatelského řetězce plně řízena daty: senzory v terénu, které přenášejí data v reálném čase, ML modely, které detekují nemoci před lidským okem, blockchain pro neměnnou sledovatelnost, počítačové vidění pro kontrola kvality, automatizované dodržování FSMA, vertikální farmy řízené přes API, prognózování poptávky, které snižuje plýtvání, řídicí panely v reálném čase pro správu a in tento nejnovější článek, potrubí ETL, které to všechno drží pohromadě.
Zemědělsko-potravinářský sektor je jedním z nejsložitějších na digitalizaci: nepředvídatelná sezónnost, rychle se kazící produkty, přísné předpisy, staré systémy konsolidované po desetiletí, dodavatelské řetězce globální se stovkami hráčů. Ale právě z tohoto důvodu příležitosti k vytváření hodnoty s technologií jsou obrovské. Snížit míru odpadu ze 7 % na 4 % v družstvu s 500 farmami to není jen provozní zlepšení: jsou to peníze, jsou to potraviny, je to dopad na životní prostředí.
Potravinový řetězec do roku 2030 bude definován podle data v reálném čase (každá partie vysledována od výsevu až po vidličku), Prediktivní AI (nulové přeplnění, nulové zásoby, nulový odpad), automatizovaný soulad (audit FSMA za 2 hodiny, ne za 2 týdny) e měřitelnou udržitelnost (uhlíková stopa na šarži, spotřeba vody na hektar).
Všechny technologie jsou tam. Vzor ETL, který jste se naučili v tomto článku — Průtok vzduchu pro orchestraci, dbt pro transformace, Velká očekávání pro kvalitu dat, Medailon architektura pro strukturu – a okamžitě použitelná v jakékoli zemědělsko-potravinářské společnosti střední nebo velké velikosti.
Prozkoumejte další související blogové série
- Data & AI Business (řada 14): ponoří se do datového skladu (Snowflake, Databricks, BigQuery), ETL/ELT s dbt a Airbyte, MLOps enterprise a správa dat – přirozený doplněk této řady FoodTech.
- MLOps (řada 5): jak uvést modely ML na vyžádání do výroby předpovědi a počítačové vidění, které jsme zabudovali do článků 01, 04 a 07.
- AI Engineering/RAG (řada 6): jak používat LLM k dotazování dokumenty o shodě, analyzovat certifikáty HACCP a generovat automatické zprávy pro dodavatelský řetězec.
- PostgreSQL AI (10 Series): jak používat pgvector pro sémantické vyhledávání o popisech produktů, recepturách a technických specifikacích potravin — užitečné pro správa katalogu produktů v potravinářském sektoru.
- EnergyTech (řada): mnoho technologií IoT a datových kanálů Řada FoodTech je totožná s těmi, které se používají pro monitorování energie průmyslových potravinářských závodů.
Děkujeme, že jste celou sérii FoodTech sledovali. Příjemné kódování a dobrou chuť.







