Łańcuch dostaw żywności: wzór ETL od gospodarstwa do sprzedawcy
Co roku mniej więcej jedna trzecia całej żywności produkowanej do spożycia przez ludzi jest tracona lub marnowana wzdłuż łańcucha dostaw żywności: wartość, która w samych Stanach Zjednoczonych przekracza 473 miliardy dolarów, co przekłada się na w miliardzie posiłków wyrzucanych każdego dnia na całym świecie. Paradoksalnie, znaczna część odpady te nie zależą od czynników klimatycznych czy biologicznych, ale od problemu danych: widoczności niewystarczające, dotychczasowe systemy niezintegrowane, KPI obliczane z opóźnieniem i decyzje podejmowane na podstawie informacji niekompletne lub nieprawidłowe.
Globalny rynek zarządzanie łańcuchem dostaw żywności będzie wart 182,81 miliardów dolarów w 2025 roku i do 2034 r. wzrośnie do 359,39 mld (CAGR 7,8%). Jednak 48% dostawców żywności nadal z nich korzysta arkusze kalkulacyjne do codziennych operacji, a 60% zgłasza powtarzalne, ręczne zadania, które pochłaniają dużo czasu cenny czas. Rozbieżność między tym, na co pozwala technologia, a tym, na co pozwalają firmy rolno-spożywcze faktycznie wdrażają i są ogromne.
Ten artykuł — najnowszy z serii FoodTech — porusza problem u podstaw: jak zbudować Solidny i skalowalny potok ETL/ELT zintegrować heterogeniczne dane pochodzące z gospodarstwa rolnego i sprzedawcy detalicznego, koordynuj przepływy za pomocą Apache Airflow, przekształcaj dane za pomocą dbt, zapewniaj jakość dzięki Great Oczekiwania i pomiar wydajności za pomocą wskaźników KPI łańcucha dostaw specyficznych dla sektora spożywczego.
Czego się nauczysz
- Kompleksowa architektura danych łańcucha dostaw żywności (gospodarstwo → przetwórstwo → dystrybucja → sprzedaż detaliczna)
- Heterogeniczne źródła danych: ERP (SAP/Oracle), MES, LIMS, WMS, TMS, POS, IoT, EDI
- ETL vs ELT: kiedy wybrać podejście do danych na temat żywności
- Kompletne DAG Apache Airflow dla rurociągów spożywczych z obsługą błędów i ponawianiem prób
- Modele DBT dla KPI łańcucha dostaw: poziom marnotrawstwa, OTIF, współczynnik wypełnienia, dni dostaw
- Jakość danych z dużymi oczekiwaniami dotyczącymi łańcuchów chłodniczych i dat wygaśnięcia
- Integracja starszych systemów: SAP RFC/BAPI, EDI EDIFACT, CDC z Debezium
- Studium przypadku: spółdzielnia rolnicza 500 gospodarstw, 3 zakłady przetwórcze, 200 punktów sprzedaży
- Pełne podsumowanie całej serii FoodTech (10 artykułów)
Seria FoodTech: Gdzie jesteśmy
To dziesiąty i ostatni odcinek serialu Technologia żywności, w którym omówiono najważniejsze z nich technologie cyfrowe mające zastosowanie w całym łańcuchu dostaw żywności: od gromadzenia danych w terenie po sprzedaż detaliczna. Oto pełny obraz:
| # | Tytuł | Technologie | Poziom |
|---|---|---|---|
| 00 | Rurociąg IoT dla rolnictwa precyzyjnego z Pythonem i MQTT | IoT, MQTT, Python, Data Lake | Zaawansowany |
| 01 | ML Edge do wykrywania chorób upraw: TensorFlow Lite na Raspberry Pi | TensorFlow Lite, EdgeML, Raspberry Pi | Zaawansowany |
| 02 | Interfejsy API satelitów i pogody dla AgriTech: dane predykcyjne | Sentinel, Planet, Weather API, NDVI | Zaawansowany |
| 03 | System identyfikowalności żywności: Blockchain, RFID i IoT | Blockchain, RFID, IoT, Compliance | Zaawansowany |
| 04 | Wizja komputerowa w kontroli jakości żywności za pomocą PyTorch YOLO | YOLO, PyTorch, wizja komputerowa | Zaawansowany |
| 05 | Automatyzacja FSMA 204: śledzenie, ostrzeganie i przywoływanie za pośrednictwem Pythona | FSMA, zgodność, Python, wycofanie | Zaawansowany |
| 06 | Pionowa automatyzacja rolnictwa: sterowanie robotyczne za pośrednictwem API | Robotyka, API, Automatyka | Zaawansowany |
| 07 | Prognozowanie popytu na redukcję odpadów: szeregi czasowe ML | LSTM, Prorok, Szereg czasowy ML | Zaawansowany |
| 08 | Pulpit nawigacyjny w czasie rzeczywistym dla Farm IoT z Angular i Grafana | Angular, Grafana, InfluxDB, w czasie rzeczywistym | Zaawansowany |
| 09 | Łańcuch dostaw żywności: wzór ETL od gospodarstwa do sprzedawcy <-- Jesteś tutaj | Przepływ powietrza, dbt, Wielkie oczekiwania, ETL | Zaawansowany |
ponieważ ten artykuł zamyka serię
Poprzednie artykuły dotyczyły poszczególnych węzłów w łańcuchu: zbieranie danych IoT w terenie, Modele brzegowe ML, identyfikowalność blockchain, wizja komputerowa jakości, zgodność z FSMA, rolnictwo pionowe, prognozowanie popytu i pulpity nawigacyjne działające w czasie rzeczywistym. Ten artykuł łączy je wszystkie: potok ETL i szkielet integrujący każde źródło danych przekształcają surowe informacje w praktyczne wskaźniki KPI i zapewnia kompleksową widoczność od gospodarstwa po półkę w supermarkecie.
Architektura danych łańcucha dostaw żywności
Łańcuch dostaw żywności składa się z odrębnych węzłów, każdy z własnymi systemami informacyjnymi i formatami częstotliwości danych i aktualizacji. Potok ETL musi solidnie radzić sobie z tą heterogenicznością.
# Architettura logica della supply chain alimentare
# (dati generati ad ogni step)
FARM SYSTEMS
|-- Sensori IoT (MQTT): temp suolo, umidita, pH, EC
|-- Sistema ERP agricolo: colture pianificate, costi input
|-- Registro trattamenti fitosanitari (LIMS)
|-- Geo-dati GPS macchinari agricoli
|-- API meteo (OpenMeteo, Copernicus)
|
v
PROCESSING PLANTS (Centri di Lavorazione)
|-- MES (Manufacturing Execution System): ordini produzione, rese
|-- LIMS: analisi chimiche, microbiologiche, allergeni
|-- ERP SAP/Oracle: distinta base, lotti, tracciabilita
|-- WMS (Warehouse Management): magazzino materie prime/finiti
|-- Sensori cold chain: temperatura/umidita durante lavorazione
|
v
DISTRIBUTION CENTERS
|-- WMS: gestione stock, picking, slotting
|-- TMS (Transport Management): spedizioni, veicoli, routing
|-- Sensori cold chain: logger temperatura nei camion frigoriferi
|-- EDI: ordini da retailer (EDIFACT ORDERS, DESADV)
|-- Tracker GPS: posizione veicoli in tempo reale
|
v
RETAIL POS
|-- POS data: vendite per SKU, store, ora
|-- Gestione shelf life: prodotti prossimi scadenza
|-- EDI: conferme ricezione (RECADV), fatture (INVOIC)
|-- Inventario store: stock disponibile per punto vendita
|
v
CONSUMER (segnali indiretti)
|-- App loyalty: acquisti per cliente, frequenza
|-- Reviews prodotto: qualità percepita
|-- Social sentiment: trend di consumo
Każdy węzeł generuje dane w różnych formatach (relacyjny SQL, płaski plik CSV/EDI, JSON z REST API, binarne komunikaty MQTT, starsze pliki SFTP) z częstotliwościami od milisekund (czujniki IoT) do tygodniowego (raport ERP). Wyzwaniem inżyniera danych dotyczących żywności jest ujednolicenie tego wszystkiego w jeden spójny model analityczny.
Heterogeniczne źródła danych: tabela referencyjna
Przed zaprojektowaniem jakiegokolwiek rurociągu konieczne jest zbadanie źródeł. Oto najważniejsze w przypadku łańcucha dostaw żywności o średniej złożoności:
| System | Typ | Format | Objętość/dzień | Częstotliwość | Akceptowalne opóźnienie |
|---|---|---|---|---|---|
| SAP ERP (gospodarstwo/zakład) | ERP | RFC/BAPI, IDoc, SQL | Rekordy 50–500 tys | Partia nocna / CDC | T+1 godz |
| Oracle EBS | ERP | SQL, API REST | Rekord 100 tys.–1 mln | Partia / CDC | T+1 godz |
| MES (produkcja) | MES | OPC-UA, SQL, REST | Wydarzenia 1–10 mln | Prawie w czasie rzeczywistym (1 min) | 5 minut |
| LIMS (laboratorium) | LIMS | HL7, CSV, ODPOCZYNEK | Analiza 1K–10K | Codziennie | T+4 godz |
| WMS (magazyn) | WMS-a | SQL, EDI, REST | Ruchy 10–100 tys | Co 15 minut | 15 minut |
| TMS (transport) | TMS | API REST, EDI | Przesyłki o wartości 5–50 tys | Co 5 minut (GPS) | 5 minut |
| Punkt sprzedaży detalicznej | punkt sprzedaży | CSV, REST, SQL | Transakcje 1–50 mln | Partia godzinowa/wieczorna | T+2 godz |
| Czujniki łańcucha chłodniczego IoT | IoT | MQTT, JSON, Protobuf | Ponad 100 milionów odczytów | Co 30 sek.–5 min | W czasie rzeczywistym (<1 min) |
| Partner EDI (EDIFACT) | EDI | EDIFACT, X12, GS1 XML | 100–10 tys. wiadomości | Sterowane zdarzeniami / wsadowe | T+30 min |
| Interfejs API pogody (OpenMeteo) | Zewnętrzne API | ODPOCZYNEK JSONA | Połączenia 1–10 tys | Cogodzinny | T+1 godz |
| AGEA (dotacje WPR) | Organ publiczny | XML, CSV-SFTP | 1–100 plików | Miesięczne/sezonowe | T+24h |
| Lokalizator pojazdów GPS | IoT/Telematyka | REST, WebSockety | Ponad 10 milionów punktów GPS | Co 30 sek | <2 minuty |
Uwaga: starsze rozwiązania ERP i „okna ekstrakcyjne”
Wiele systemów ERP dla rolnictwa i branży rolno-spożywczej nie obsługuje natywnego CDC. Często jedyny sposób na ekstrakcję danych oraz poprzez nocne zadania wsadowe, które blokują tabele, lub poprzez interfejsy SAP RFC/BAPI sztywne okna konserwacyjne. Zaplanuj okna ekstrakcji z wyprzedzeniem i przewiduj mechanizmy odzyskiwania w przypadku awarii nocnej partii.
ETL kontra ELT dla łańcucha dostaw żywności
Debata ETL vs ELT jest szczególnie istotna w sektorze spożywczym, gdzie dane współistnieją wysoka częstotliwość (łańcuch chłodniczy IoT) z tradycyjnymi danymi wsadowymi (SAP ERP) i wymogami regulacyjnymi rygorystyczne (identyfikowalność, FSMA 204).
| Kryterium | Tradycyjny ETL | Nowoczesny ELT (dbt + Data Lake) |
|---|---|---|
| Transformacja | Przed przesłaniem (serwer przejściowy) | W hurtowni danych (natywny SQL) |
| Skalowalność | Ograniczone przez serwer ETL | Skaluj z DWH (Snowflake, BigQuery, DuckDB) |
| Przechowywanie surowych danych | Często nie (dane już przekształcone) | Tak: Warstwa brązu konserwuje surowo |
| Audytowalność (FSMA) | Poziom trudny: niewyraźne pochodzenie | Doskonale: kompletny wykres linii dbt |
| Utajenie | Partia (T+1h, T+1d) | Mikro-partie lub przesyłanie strumieniowe (Flink/Spark) |
| Błędy łańcucha chłodniczego | Utrata danych w przypadku awarii ETL | Surowe zawsze zapisywane; ponów próbę tylko transformacji |
| Wymagane umiejętności zespołowe | Programista Java/Informatyka/SSIS | SQL + Python (dbt + przepływ powietrza) |
| Koszt infrastruktury | Zawsze aktywny serwer dedykowany | Płatność za zapytanie (płatek śniegu/BigQuery) |
Dla współczesnego łańcucha dostaw żywności rekomendacją jest architektura Medalion ELT:
# Architettura Medallion per Food Supply Chain
BRONZE LAYER (Raw, immutabile)
|-- Tutti i dati sorgente caricati as-is
|-- Partitioned by: source_system, ingestion_date
|-- Retention: 7 anni (compliance FSMA/EU)
|-- Formato: Parquet su S3/GCS o Delta Lake
|
v
SILVER LAYER (Cleansed, deduplicato)
|-- Dbt models: normalizzazione schemi eterogenei
|-- Deduplicazione lotti, SKU, location codes
|-- Type casting: date, temperature (C/F), pesi (kg/lb)
|-- NULL handling: sensori offline, ERP null fields
|-- PII masking: dati operatori, conducenti
|
v
GOLD LAYER (KPI pronti per business)
|-- Aggregazioni: fill rate, waste rate, OTIF per SKU
|-- Metriche cold chain: violazioni temperatura per lotto
|-- Dashboard BI: Power BI, Tableau, Grafana
|-- API ML: feature store per demand forecasting
|
v
DATA MART
|-- Retail mart: sell-through, shelf life at receipt
|-- Operations mart: OEE impianti, rendimenti produzione
|-- Finance mart: costo per lotto, margine per SKU
|-- Compliance mart: tracciabilita FSMA, recall readiness
Apache Airflow do orkiestracji rurociągów żywnościowych
Apache Airflow jest de facto standardem orkiestracji złożonych przepływów pracy ETL. W łańcuchu dostaw mocy, pojedynczy DAG musi koordynować dziesiątki źródeł z opóźnieniami, zależnościami i politykami różnych ponownych prób. Zobaczmy kompletny DAG dla codziennego potoku.
Główny DAG: Dzienny łańcuch dostaw żywności ETL
# food_supply_chain_dag.py
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="food_supply_chain_daily_etl",
description="Pipeline ETL giornaliera: farm -> processing -> distribution -> retail",
default_args=DEFAULT_ARGS,
schedule_interval="0 2 * * *", # ogni notte alle 02:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["foodtech", "supply-chain", "etl", "production"],
max_active_runs=1,
doc_md="""
## Food Supply Chain Daily ETL
Estrae dati da: SAP ERP, MES impianti, WMS, TMS, POS retail.
Carica su Bronze S3, poi lancia dbt per Silver e Gold.
""",
) as dag:
# ----------------------------------------------------------------
# STEP 1: Health checks sulle sorgenti
# ----------------------------------------------------------------
check_sap_available = HttpSensor(
task_id="check_sap_api_available",
http_conn_id="sap_erp_api",
endpoint="/api/health",
timeout=30,
poke_interval=60,
mode="reschedule",
soft_fail=False,
)
check_mes_available = HttpSensor(
task_id="check_mes_api_available",
http_conn_id="mes_plant_api",
endpoint="/health",
timeout=30,
poke_interval=60,
mode="reschedule",
)
# ----------------------------------------------------------------
# STEP 2: Estrazione SAP ERP (lotti, ordini, inventario)
# ----------------------------------------------------------------
@task(task_id="extract_sap_erp", retries=3)
def extract_sap_erp(**context: Any) -> dict:
"""Estrae dati da SAP ERP via RFC/BAPI wrapper REST."""
from src.connectors.sap_connector import SAPConnector
execution_date = context["ds"] # 'YYYY-MM-DD'
sap = SAPConnector(conn_id="sap_erp_api")
# Estrai: lotti produzione del giorno precedente
batches = sap.get_production_batches(
date=execution_date,
plants=["PL001", "PL002", "PL003"],
)
# Estrai: movimenti magazzino
stock_movements = sap.get_stock_movements(
date=execution_date,
movement_types=["101", "261", "501", "601"],
)
# Estrai: ordini di vendita confermati
sales_orders = sap.get_sales_orders(
date=execution_date,
status=["CONF", "DLVR"],
)
# Salva su S3 Bronze layer
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/sap_erp/dt={execution_date}"
s3.load_string(
string_data=batches.to_json(orient="records", lines=True),
key=f"{prefix}/batches.jsonl",
bucket_name="food-data-lake",
replace=True,
)
s3.load_string(
string_data=stock_movements.to_json(orient="records", lines=True),
key=f"{prefix}/stock_movements.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"SAP extraction OK: %d batches, %d movements, %d orders",
len(batches),
len(stock_movements),
len(sales_orders),
)
return {
"batches_count": len(batches),
"movements_count": len(stock_movements),
"orders_count": len(sales_orders),
}
# ----------------------------------------------------------------
# STEP 3: Estrazione dati cold chain (IoT sensori temperatura)
# ----------------------------------------------------------------
@task(task_id="extract_cold_chain_iot")
def extract_cold_chain_iot(**context: Any) -> dict:
"""Estrae letture temperatura da InfluxDB (IoT cold chain)."""
from influxdb_client import InfluxDBClient
execution_date = context["ds"]
client = InfluxDBClient(
url="{{ var.value.influxdb_url }}",
token="{{ var.value.influxdb_token }}",
org="foodtech-org",
)
query_api = client.query_api()
# Query Flux: temperature readings per lotto di trasporto
flux_query = f"""
from(bucket: "cold-chain")
|> range(start: {execution_date}T00:00:00Z,
stop: {execution_date}T23:59:59Z)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "celsius")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time", "lot_id"], columnKey: ["_field"], valueColumn: "_value")
"""
result = query_api.query_data_frame(flux_query)
# Calcola violazioni: temperatura fuori range per categoria prodotto
violations = result[
(result["celsius"] < result["min_temp_required"]) |
(result["celsius"] > result["max_temp_required"])
]
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/cold_chain/dt={execution_date}"
s3.load_string(
string_data=result.to_json(orient="records", lines=True),
key=f"{prefix}/temperature_readings.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"Cold chain extraction OK: %d readings, %d violations",
len(result),
len(violations),
)
if len(violations) > 0:
logger.warning(
"COLD CHAIN VIOLATIONS: %d eventi fuori temperatura!",
len(violations),
)
return {
"readings_count": len(result),
"violations_count": len(violations),
}
# ----------------------------------------------------------------
# STEP 4: Estrazione EDI dai partner retailer
# ----------------------------------------------------------------
@task(task_id="extract_edi_partners")
def extract_edi_partners(**context: Any) -> dict:
"""Processa file EDI EDIFACT da SFTP partners."""
from src.connectors.edi_connector import EDIFACTConnector
execution_date = context["ds"]
edi = EDIFACTConnector(sftp_conn_id="edi_sftp_partners")
# Download e parsing ORDERS (EDIFACT ORDERS D.01B)
orders = edi.process_orders(date=execution_date)
# Download e parsing DESADV (Despatch Advice)
despatch_advices = edi.process_desadv(date=execution_date)
# Download e parsing RECADV (Receiving Advice)
receiving_advices = edi.process_recadv(date=execution_date)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/edi_partners/dt={execution_date}"
s3.load_string(
string_data=orders.to_json(orient="records", lines=True),
key=f"{prefix}/orders.jsonl",
bucket_name="food-data-lake",
replace=True,
)
return {
"orders_count": len(orders),
"desadv_count": len(despatch_advices),
"recadv_count": len(receiving_advices),
}
# ----------------------------------------------------------------
# STEP 5: Estrazione POS retail
# ----------------------------------------------------------------
@task(task_id="extract_pos_data")
def extract_pos_data(**context: Any) -> dict:
"""Estrae dati vendite POS da database retailer partner."""
from src.connectors.retail_connector import RetailPOSConnector
execution_date = context["ds"]
pos = RetailPOSConnector(conn_id="retail_pos_db")
# Vendite per SKU/store con shelf life info
sales = pos.get_daily_sales(
date=execution_date,
include_promotions=True,
include_markdowns=True,
)
# Near-expiry: prodotti a <3 giorni scadenza ancora in shelf
near_expiry = pos.get_near_expiry_stock(
date=execution_date,
days_threshold=3,
)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/pos_retail/dt={execution_date}"
s3.load_string(
string_data=sales.to_json(orient="records", lines=True),
key=f"{prefix}/sales.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info("POS extraction OK: %d transazioni", len(sales))
return {
"sales_count": len(sales),
"near_expiry_skus": len(near_expiry),
}
# ----------------------------------------------------------------
# STEP 6: dbt run (Silver + Gold transformations)
# ----------------------------------------------------------------
dbt_run_silver = BashOperator(
task_id="dbt_run_silver_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:silver --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
dbt_run_gold = BashOperator(
task_id="dbt_run_gold_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:gold --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
# ----------------------------------------------------------------
# STEP 7: dbt test (data quality checks)
# ----------------------------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test_all",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt test --select tag:silver tag:gold --target prod"
),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ----------------------------------------------------------------
# STEP 8: Great Expectations validation
# ----------------------------------------------------------------
@task(task_id="run_great_expectations_validation")
def run_ge_validation(**context: Any) -> dict:
"""Lancia Great Expectations per validare data quality cold chain."""
from great_expectations.data_context import get_context
context_ge = get_context(project_root_dir="/opt/great_expectations")
execution_date = context["ds"]
results = {}
suites_to_run = [
("cold_chain_temperature", "bronze_cold_chain_checkpoint"),
("lot_traceability", "silver_lots_checkpoint"),
("kpi_supply_chain", "gold_kpi_checkpoint"),
]
for suite_name, checkpoint_name in suites_to_run:
result = context_ge.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"runtime_parameters": {"path": f"s3://food-data-lake/bronze/cold_chain/dt={execution_date}"},
"batch_identifiers": {"execution_date": execution_date},
},
)
results[suite_name] = result.success
failed = [name for name, ok in results.items() if not ok]
if failed:
raise ValueError(f"GE validation FAILED per suite: {failed}")
logger.info("Great Expectations: tutte le suite OK")
return results
# ----------------------------------------------------------------
# STEP 9: KPI alert (email se waste rate > soglia)
# ----------------------------------------------------------------
@task(task_id="check_kpi_alerts", trigger_rule=TriggerRule.ALL_SUCCESS)
def check_kpi_alerts(**context: Any) -> None:
"""Controlla KPI critici e invia alert se fuori soglia."""
from src.services.kpi_alert_service import KPIAlertService
execution_date = context["ds"]
alert_service = KPIAlertService(conn_id="snowflake_dw")
kpis = alert_service.get_daily_kpis(date=execution_date)
alerts_sent = []
# Waste rate > 5%: alert critico
if kpis["waste_rate_pct"] > 5.0:
alert_service.send_alert(
severity="HIGH",
message=f"Waste rate {kpis['waste_rate_pct']:.1f}% supera soglia 5%",
kpi="waste_rate",
value=kpis["waste_rate_pct"],
)
alerts_sent.append("waste_rate")
# OTIF < 95%: alert warning
if kpis["otif_pct"] < 95.0:
alert_service.send_alert(
severity="MEDIUM",
message=f"OTIF {kpis['otif_pct']:.1f}% sotto soglia 95%",
kpi="otif",
value=kpis["otif_pct"],
)
alerts_sent.append("otif")
# Cold chain violations > 0: alert immediato
if kpis["cold_chain_violations"] > 0:
alert_service.send_alert(
severity="CRITICAL",
message=f"{kpis['cold_chain_violations']} violazioni temperatura cold chain!",
kpi="cold_chain_violations",
value=kpis["cold_chain_violations"],
)
alerts_sent.append("cold_chain")
logger.info("KPI check completato. Alert inviati: %s", alerts_sent)
# ----------------------------------------------------------------
# DAG wiring: dipendenze tra task
# ----------------------------------------------------------------
extract_sap = extract_sap_erp()
extract_cold = extract_cold_chain_iot()
extract_edi = extract_edi_partners()
extract_pos = extract_pos_data()
ge_validation = run_ge_validation()
kpi_alerts = check_kpi_alerts()
# Health checks prima di tutto
[check_sap_available, check_mes_available] >> extract_sap
# Estrazione parallela di tutte le sorgenti
[extract_sap, extract_cold, extract_edi, extract_pos] >> dbt_run_silver
# Silver prima di Gold
dbt_run_silver >> dbt_run_gold
# Test dopo Gold
dbt_run_gold >> dbt_test
# GE validation e KPI alerts dopo i test
dbt_test >> ge_validation >> kpi_alerts
dbt dla Transformacji: Modele SQL dla kluczowych wskaźników wydajności żywności
dbt (narzędzie do budowania danych) to idealne narzędzie do transformacji danych w warstwie Silver i Gold. Przyjrzyjmy się głównym modelom łańcucha dostaw żywności, ze zintegrowanymi testami i dokumentacją.
Schema.yml: Dokumentacja i test dbt
# models/silver/schema.yml
version: 2
models:
- name: silver_lots
description: "Lotti di produzione normalizzati da SAP ERP"
config:
tags: ["silver", "lots", "traceability"]
materialized: incremental
incremental_strategy: merge
unique_key: lot_id
columns:
- name: lot_id
description: "Identificativo univoco lotto (GS1 SGTIN)"
tests:
- not_null
- unique
- name: production_date
tests:
- not_null
- name: expiry_date
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "expiry_date > production_date"
- name: plant_code
tests:
- not_null
- accepted_values:
values: ["PL001", "PL002", "PL003", "PL004"]
- name: temperature_class
tests:
- accepted_values:
values: ["ambient", "chilled", "frozen", "ultra_frozen"]
- name: net_weight_kg
tests:
- dbt_utils.expression_is_true:
expression: "net_weight_kg > 0 AND net_weight_kg < 50000"
- name: silver_shipments
description: "Spedizioni normalizzate da TMS e EDI DESADV"
config:
tags: ["silver", "logistics"]
materialized: incremental
unique_key: shipment_id
columns:
- name: shipment_id
tests: [not_null, unique]
- name: planned_delivery_date
tests: [not_null]
- name: actual_delivery_date
description: "NULL se non ancora consegnato"
- name: otif_flag
description: "True se consegnato On-Time and In-Full"
tests:
- accepted_values:
values: [true, false, null]
quote: false
- name: gold_kpi_supply_chain
description: "KPI giornalieri supply chain aggregati per SKU/plant/customer"
config:
tags: ["gold", "kpi", "dashboard"]
materialized: table
columns:
- name: kpi_date
tests: [not_null]
- name: waste_rate_pct
description: "% prodotto sprecato su prodotto totale"
tests:
- dbt_utils.expression_is_true:
expression: "waste_rate_pct >= 0 AND waste_rate_pct <= 100"
- name: otif_pct
tests:
- dbt_utils.expression_is_true:
expression: "otif_pct >= 0 AND otif_pct <= 100"
Model srebrny: Partie produkcyjne
-- models/silver/silver_lots.sql
-- {{ config(tags=["silver", "lots"], materialized="incremental") }}
WITH source_sap AS (
SELECT
lot_number AS lot_id,
material_number AS sku_code,
plant AS plant_code,
production_date AS production_date,
-- Normalizza data scadenza (SAP usa formato YYYYMMDD)
TO_DATE(CAST(expiry_date_sap AS VARCHAR), 'YYYYMMDD') AS expiry_date,
quantity_kg AS net_weight_kg,
-- Classi temperatura da tabella materiali SAP
CASE
WHEN temp_cond_sap = '0001' THEN 'ambient'
WHEN temp_cond_sap = '0002' THEN 'chilled'
WHEN temp_cond_sap = '0003' THEN 'frozen'
WHEN temp_cond_sap = '0004' THEN 'ultra_frozen'
ELSE 'unknown'
END AS temperature_class,
-- Range temperatura richiesto (gradi Celsius)
CASE
WHEN temp_cond_sap = '0001' THEN STRUCT(15.0 AS min_c, 25.0 AS max_c)
WHEN temp_cond_sap = '0002' THEN STRUCT(2.0 AS min_c, 8.0 AS max_c)
WHEN temp_cond_sap = '0003' THEN STRUCT(-25.0 AS min_c, -15.0 AS max_c)
WHEN temp_cond_sap = '0004' THEN STRUCT(-60.0 AS min_c, -30.0 AS max_c)
END AS temp_range,
batch_status AS lot_status,
_ingestion_timestamp AS ingested_at
FROM {{ source('bronze', 'sap_erp_batches') }}
WHERE lot_number IS NOT NULL
AND quantity_kg > 0
{% if is_incremental() %}
AND _ingestion_timestamp > (
SELECT MAX(ingested_at) FROM {{ this }}
)
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY lot_id
ORDER BY ingested_at DESC
) AS row_num
FROM source_sap
),
final AS (
SELECT
lot_id,
sku_code,
plant_code,
production_date,
expiry_date,
net_weight_kg,
temperature_class,
temp_range,
lot_status,
-- Shelf life totale in giorni
DATEDIFF('day', production_date, expiry_date) AS shelf_life_days,
-- Shelf life residua rispetto a oggi
DATEDIFF('day', CURRENT_DATE, expiry_date) AS shelf_life_remaining_days,
ingested_at
FROM deduplicated
WHERE row_num = 1
AND expiry_date > production_date
)
SELECT * FROM final
Złoty model: Łańcuch dostaw KPI
-- models/gold/gold_kpi_supply_chain.sql
-- {{ config(tags=["gold", "kpi"], materialized="table") }}
WITH lots AS (
SELECT * FROM {{ ref('silver_lots') }}
),
shipments AS (
SELECT * FROM {{ ref('silver_shipments') }}
),
pos_sales AS (
SELECT * FROM {{ ref('silver_pos_sales') }}
),
inventory AS (
SELECT * FROM {{ ref('silver_inventory_snapshot') }}
),
-- KPI 1: Waste Rate (% prodotto sprecato)
waste_kpi AS (
SELECT
production_date AS kpi_date,
plant_code,
-- Prodotto scaduto o smaltito come spreco
SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END) AS wasted_kg,
SUM(net_weight_kg) AS total_produced_kg,
ROUND(
100.0 * SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END)
/ NULLIF(SUM(net_weight_kg), 0),
2
) AS waste_rate_pct
FROM lots
GROUP BY 1, 2
),
-- KPI 2: OTIF (On-Time In-Full)
otif_kpi AS (
SELECT
DATE(planned_delivery_date) AS kpi_date,
customer_code,
COUNT(*) AS total_orders,
SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END) AS otif_orders,
ROUND(
100.0 * SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0),
2
) AS otif_pct
FROM shipments
WHERE actual_delivery_date IS NOT NULL
GROUP BY 1, 2
),
-- KPI 3: Days of Supply (scorta in giorni)
dos_kpi AS (
SELECT
inv.snapshot_date AS kpi_date,
inv.sku_code,
inv.location_code,
inv.stock_qty_units,
COALESCE(avg_daily_sales.avg_daily_units, 0) AS avg_daily_demand,
ROUND(
inv.stock_qty_units
/ NULLIF(avg_daily_sales.avg_daily_units, 0),
1
) AS days_of_supply
FROM inventory inv
LEFT JOIN (
SELECT sku_code, location_code,
AVG(units_sold) AS avg_daily_units
FROM pos_sales
WHERE sale_date >= DATEADD('day', -30, CURRENT_DATE)
GROUP BY 1, 2
) avg_daily_sales
ON inv.sku_code = avg_daily_sales.sku_code
AND inv.location_code = avg_daily_sales.location_code
),
-- KPI 4: Shelf Life at Receipt (qualità al ricevimento)
slr_kpi AS (
SELECT
DATE(s.actual_delivery_date) AS kpi_date,
l.sku_code,
-- % shelf life residua al momento della consegna
ROUND(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0),
1
) AS shelf_life_pct_at_receipt,
-- Media per SKU
AVG(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0)
) OVER (PARTITION BY DATE(s.actual_delivery_date), l.sku_code)
AS avg_slr_pct
FROM shipments s
INNER JOIN lots l ON s.lot_id = l.lot_id
WHERE s.actual_delivery_date IS NOT NULL
)
-- Join finale per dashboard KPI unificato
SELECT
w.kpi_date,
w.plant_code,
w.waste_rate_pct,
w.wasted_kg,
w.total_produced_kg,
o.otif_pct,
o.total_orders,
d.days_of_supply,
d.avg_daily_demand,
s.avg_slr_pct AS shelf_life_at_receipt_pct,
-- Score complessivo supply chain (0-100)
ROUND(
(
(100.0 - COALESCE(w.waste_rate_pct, 0)) * 0.30 +
COALESCE(o.otif_pct, 0) * 0.40 +
LEAST(d.days_of_supply / 14.0 * 100, 100) * 0.15 +
COALESCE(s.avg_slr_pct, 0) * 0.15
),
1
) AS supply_chain_health_score
FROM waste_kpi w
LEFT JOIN otif_kpi o ON w.kpi_date = o.kpi_date
LEFT JOIN dos_kpi d ON w.kpi_date = d.kpi_date
LEFT JOIN slr_kpi s ON w.kpi_date = s.kpi_date
Jakość danych dotyczących żywności: wielkie oczekiwania
W przemyśle spożywczym jakość danych to nie tylko kwestia czystości: to wymóg regulacyjne. Może to spowodować nieprawidłowe dane dotyczące temperatur, dat ważności lub kodów partii Kary FSMA, bardzo kosztowne wycofanie produktów i szkoda dla reputacji. Wielkie nadzieje na to pozwalają zdefiniuj deklaratywne zasady walidacji i zintegruj je z potokiem Airflow.
# great_expectations/expectations/cold_chain_temperature_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
context = gx.get_context()
# Crea o aggiorna la suite di aspettative
suite = context.add_or_update_expectation_suite(
expectation_suite_name="cold_chain_temperature_suite"
)
validator = context.get_validator(
batch_request=BatchRequest(
datasource_name="s3_bronze_datasource",
data_connector_name="cold_chain_connector",
data_asset_name="temperature_readings",
),
expectation_suite_name="cold_chain_temperature_suite",
)
# REGOLA 1: Temperatura mai NULL per prodotti refrigerati/surgelati
validator.expect_column_values_to_not_be_null(
column="celsius",
mostly=0.99, # tolleranza 1% per sensori offline temporanei
meta={
"notes": "Sensori IoT cold chain: max 1% di letture mancanti",
"regulatory_reference": "EU Reg 37/2005 cold chain monitoring",
}
)
# REGOLA 2: Range temperatura prodotti refrigerati (2-8°C)
validator.expect_column_values_to_be_between(
column="celsius",
min_value=-30.0,
max_value=35.0,
meta={
"notes": "Range assoluto: include frozen (-25°C) e ambient (25°C)",
}
)
# REGOLA 3: Lot ID sempre presente e formato GS1 SGTIN valido
validator.expect_column_values_to_match_regex(
column="lot_id",
regex=r"^[0-9]{14}$", # GS1 GTIN-14 format
meta={"notes": "GS1 SGTIN-14 standard per tracciabilita alimentare"}
)
# REGOLA 4: Timestamp monotonicamente crescente per sensore
validator.expect_column_values_to_be_increasing(
column="reading_timestamp",
strictly=False, # allow duplicates (es. batch upload)
parse_strings_as_datetimes=True,
meta={"notes": "Verifico che i timestamp non siano retroattivi"}
)
# REGOLA 5: Numero di letture per lotto (almeno 1 ogni 30 min)
validator.expect_table_row_count_to_be_between(
min_value=48, # 24h * 2 letture/ora = 48 minimo
max_value=10000, # max ragionevole per 24h
meta={"notes": "Minimo 1 lettura ogni 30 min per cold chain attiva"}
)
# REGOLA 6: Distribuzione temperatura per classe prodotto
# Prodotti refrigerati: mediana deve essere 2-8°C
validator.expect_column_median_to_be_between(
column="celsius",
min_value=1.0,
max_value=9.0,
meta={
"notes": "Solo per lotti con temperature_class='chilled'",
"applies_to": "filtered_batches_chilled",
}
)
# Salva le aspettative
validator.save_expectation_suite(discard_failed_expectations=False)
# Checkpoint per Airflow
checkpoint = context.add_or_update_checkpoint(
name="bronze_cold_chain_checkpoint",
config_version=1,
template_name=None,
module_name="great_expectations.checkpoint",
class_name="Checkpoint",
run_name_template="%Y%m%d-%H%M%S-cold-chain",
expectation_suite_name="cold_chain_temperature_suite",
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ var.value.slack_webhook_cold_chain }}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
Wskaźniki KPI i wskaźniki łańcucha dostaw żywności
Zdefiniuj właściwe KPI i podstawy do podejmowania decyzji opartych na danych. Oto metryki podstawowe dla łańcucha dostaw żywności, obejmujące receptury, standardy sektorowe i realistyczne cele.
| KPI | Formuła | Punkt odniesienia w branży | Celuj w najlepsze w swojej klasie | Próg alertu |
|---|---|---|---|---|
| Idealny współczynnik zamówień | Idealne zamówienia / Łączna liczba zamówień * 100 | 85-92% | >95% | <90% |
| OTIF (terminowo-w całości) | (Dostawy szybkie i kompletne) / Razem * 100 | 88-94% | >97% | <92% |
| Poziom odpadów% | Kg odpadów / Kg produktów ogółem * 100 | 3-8% | <2% | >5% |
| Obrót zapasami | Roczne koszty sprzedaży / średni zapas | 12-20x (świeże) | >25x (świeży) | <10x |
| Dni dostaw (DOS) | Dostępne zapasy / Średnie dzienne zapotrzebowanie | 3-7 dni (świeże) | 2-4 dni | > 10 dni |
| Współczynnik wypełnienia | Jednostki dostarczone / Jednostki zamówione * 100 | 92-96% | >98% | <94% |
| Okres przydatności do spożycia w momencie odbioru (SLR) | Pozostałe dni dostawy / Całkowity okres przydatności do spożycia * 100 | 60-75% | >80% | <60% |
| Zgodność z łańcuchem chłodniczym | Partie bez naruszeń temp. / Partie ogółem * 100 | 97-99% | >99,5% | <98% |
| Dokładność prognozy | 1 - (MAE / Średnie zapotrzebowanie) * 100 | 75-85% | >90% | <75% |
| Cykl od gotówki do gotówki | DIO + DSO - DPO (dni) | 15-35 dni | <15 dni | > 45 dni |
Jak obliczyć ocenę kondycji łańcucha dostaw
Ważenie wskaźników KPI w jednym złożonym wyniku pomaga kierownictwu monitorować stan firmy cały łańcuch dostaw za pomocą jednej liczby:
- OTIF: waga 40% (bezpośredni wpływ na satysfakcję klienta)
- Poziom odpadów: waga 30% (wpływ na marże i zrównoważony rozwój)
- Okres przydatności do spożycia w momencie odbioru: waga 15% (jakość konsumencka)
- Zgodność z łańcuchem chłodniczym: waga 15% (bezpieczeństwo i zgodność żywności)
Ocena > 90: doskonała | 80-90: dobrze | 70-80: do poprawy | <70: krytyczny
Czas rzeczywisty a praca wsadowa: kiedy używać jakiej architektury
Nie wszystko musi odbywać się w czasie rzeczywistym. Koszt architektury przesyłania strumieniowego jest znaczny lepsza od tradycyjnej architektury wsadowej. W łańcuchu dostaw żywności, wybór zależy od akceptowalnych opóźnień i konsekwencji operacyjnych.
| Przypadek użycia | Architektura | Utajenie | Instrumenty | Motywacja |
|---|---|---|---|---|
| Monitorowanie łańcucha chłodniczego (temperatura) | Przesyłanie strumieniowe w czasie rzeczywistym | <1min | Kafka + Flink | Naruszenie temperatury = natychmiastowa utrata partii |
| Zarządzanie przypomnieniem | W czasie rzeczywistym / sterowany zdarzeniami | <5 min | Kafka + alarmowanie | FSMA: Śledź partie w ciągu <2 godzin |
| Śledzenie pojazdów GPS | Prawie w czasie rzeczywistym | <2 minuty | MQTT + InfluxDB | Zaktualizowano ETA dla klientów |
| Dzienne KPI (OTIF, odpady) | Dzienna partia | T+2 godz | Przepływ powietrza + dbt | Poranny raport operacyjny |
| Prognozowanie popytu | Partia dzienna/tygodniowa | T+6 godz | Przepływ powietrza + MLprzepływ | Plan produkcji nie wymaga czasu rzeczywistego |
| Migawka zapasów | Mikroporcja (co 15 min) | 15 minut | Przepływ powietrza + płatek śniegu | Operatorzy magazynów: wystarczająca widoczność |
| Cotygodniowa analiza odpadów | Cotygodniowa partia | T+24h | dbt + narzędzie BI | Decyzje strategiczne, a nie operacyjne |
| Audyt zgodności z FSMA | Partia na żądanie | Na żądanie | dbt + skarbiec danych | Kontrole zaplanowane, a nie codzienne |
# Architettura Lambda per Food Supply Chain
# Speed layer (real-time) + Batch layer (accuracy)
# SPEED LAYER: Kafka + Apache Flink
# Processa eventi in milliseconds: cold chain, recall alerts
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("cold-chain-readings") \
.set_group_id("flink-cold-chain-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
cold_chain_stream = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(
Duration.of_seconds(30)
),
source_name="cold-chain-kafka",
)
# Filtra violazioni temperatura in tempo reale
violations_stream = cold_chain_stream \
.map(lambda msg: json.loads(msg)) \
.filter(lambda r: (
r.get("temperature_class") == "chilled" and
(r["celsius"] < 2.0 or r["celsius"] > 8.0)
)) \
.map(lambda r: {
**r,
"violation_severity": "CRITICAL" if r["celsius"] > 12.0 else "WARNING",
"violation_detected_at": datetime.utcnow().isoformat(),
})
# Scrivi violations su Kafka alert topic
violations_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("cold-chain-violations")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
env.execute("food-cold-chain-monitoring")
Integracja ze starszymi systemami
Większość firm rolno-spożywczych działa na starszych systemach: SAP R/3 z lat '90, pliki EDIFACT EDI przez SFTP, baza danych Oracle bez REST API. Integracja tych systemów wymaga specjalnego podejścia.
Łącznik SAP: RFC i BAPI
# src/connectors/sap_connector.py
"""
Connettore SAP ERP via pyrfc (SAP RFC/BAPI wrapper).
Richiede: SAP NetWeaver RFC SDK + pyrfc library.
"""
import pyrfc
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SAPConnectionConfig:
ashost: str
sysnr: str
client: str
user: str
passwd: str
lang: str = "IT"
class SAPConnector:
"""Wrapper per SAP RFC/BAPI calls per dati supply chain."""
def __init__(self, config: SAPConnectionConfig):
self._config = config
self._conn: Optional[pyrfc.Connection] = None
def __enter__(self):
self._conn = pyrfc.Connection(
ashost=self._config.ashost,
sysnr=self._config.sysnr,
client=self._config.client,
user=self._config.user,
passwd=self._config.passwd,
lang=self._config.lang,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.close()
def get_production_batches(
self,
date_from: str,
date_to: str,
plants: list[str],
) -> pd.DataFrame:
"""
Estrae lotti di produzione da SAP tramite BAPI_BATCH_GET_DETAIL.
Args:
date_from: Data inizio formato YYYYMMDD
date_to: Data fine formato YYYYMMDD
plants: Lista codici stabilimento SAP
"""
all_batches = []
for plant in plants:
try:
# Chiama BAPI SAP per lista lotti
result = self._conn.call(
"BAPI_BATCH_GET_DETAIL",
PLANT=plant,
DATE_FROM=date_from,
DATE_TO=date_to,
BATCH_STATUS="", # tutti gli stati
)
batches = result.get("BATCH_DETAIL_LIST", [])
logger.info(
"SAP plant %s: %d lotti estratti",
plant,
len(batches),
)
for batch in batches:
all_batches.append({
"lot_id": f"{batch['MATNR']}-{batch['CHARG']}",
"material_number": batch["MATNR"].strip(),
"lot_number": batch["CHARG"].strip(),
"plant": plant,
"production_date": batch.get("HSDAT"),
"expiry_date_sap": batch.get("VFDAT"),
"batch_status": batch.get("ZUSTD"),
"quantity_kg": float(batch.get("CLABS", 0)),
"temp_cond_sap": batch.get("MHDRZ", "0001"),
"_source": "SAP_ERP",
"_extracted_at": pd.Timestamp.utcnow().isoformat(),
})
except pyrfc.ABAPApplicationError as e:
logger.error(
"SAP BAPI error per plant %s: %s",
plant,
str(e),
)
# Non fallisce tutta l'estrazione per un singolo plant
continue
return pd.DataFrame(all_batches)
def get_stock_movements(
self,
date_from: str,
date_to: str,
movement_types: list[str],
) -> pd.DataFrame:
"""
Estrae movimenti di magazzino SAP via MB51 BAPI equivalent.
movement_types: ['101'=GR, '261'=GI to production, '601'=GI to customer]
"""
result = self._conn.call(
"BAPI_GOODSMVT_GETDETAIL",
GOODSMVT_DATE_FROM=date_from,
GOODSMVT_DATE_TO=date_to,
)
movements = []
for item in result.get("GOODSMVT_ITEMS", []):
if item["BWART"] in movement_types:
movements.append({
"movement_id": f"{item['MBLNR']}-{item['ZEILE']}",
"material": item["MATNR"].strip(),
"plant": item["WERKS"],
"movement_type": item["BWART"],
"quantity": float(item.get("MENGE", 0)),
"unit": item.get("MEINS", "KG"),
"lot_number": item.get("CHARG", ""),
"posting_date": item["BUDAT"],
"_source": "SAP_MOVEMENTS",
})
return pd.DataFrame(movements)
CDC z Debezium do przesyłania strumieniowego ze starszych baz danych
# debezium-config/oracle-erp-connector.json
# Configurazione Debezium per CDC da Oracle EBS (ERP legacy)
{
"name": "oracle-erp-food-supply-chain",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle-erp.internal",
"database.port": "1521",
"database.user": "debezium_cdc",
"database.password": "${ORACLE_CDC_PASSWORD}",
"database.dbname": "FOOD_ERP",
"database.pdb.name": "FOOD_PDB",
"database.server.name": "oracle-erp",
"table.include.list": "FOOD_ERP.INV_TRANSACTIONS,FOOD_ERP.MTL_LOT_NUMBERS,FOOD_ERP.WSH_DELIVERY_DETAILS",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.oracle.food_erp",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.oracle.food_erp",
"transforms": "route,addFields",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.addFields.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addFields.static.field": "_cdc_source",
"transforms.addFields.static.value": "oracle-erp",
"include.schema.changes": "true",
"snapshot.mode": "initial_only",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"topic.prefix": "food-cdc"
}
}
Parser EDIFACT EDI dla zamówień detalicznych
# src/connectors/edi_connector.py
"""
Parser EDI EDIFACT per messaggi ORDERS, DESADV, RECADV (GS1 subset).
Usa pydifact library per parsing EDIFACT messages.
"""
import pandas as pd
from pydifact.segmentcollection import SegmentCollection
from pydifact.segments import Segment
from pathlib import Path
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def parse_edifact_orders(file_path: str | Path) -> pd.DataFrame:
"""
Parsa un file EDI EDIFACT ORDERS D.01B (GS1 EANCOM).
Restituisce DataFrame con righe ordine normalizzate.
Struttura messaggio ORDERS EDIFACT:
UNB (Interchange header)
UNH (Message header)
BGM (Order message identifier)
DTM (Date/time)
NAD+BY (Buyer = retailer)
NAD+SU (Supplier = nostro sistema)
LIN (Line item)
+QTY (Quantity ordered)
+DTM (Requested delivery date)
+PIA (Product code GTIN)
UNT (Message trailer)
"""
file_content = Path(file_path).read_text(encoding="latin-1")
collection = SegmentCollection.from_str(file_content)
orders = []
current_order: dict = {}
current_line: dict = {}
for segment in collection.segments:
tag = segment.tag
if tag == "BGM":
# Inizio nuovo ordine
current_order = {
"order_number": segment.elements[1] if len(segment.elements) > 1 else None,
"document_type": segment.elements[0][0] if segment.elements else None,
"lines": [],
}
elif tag == "DTM" and current_order:
# Data ordine o data consegna richiesta
qualifier = segment.elements[0][0]
date_str = segment.elements[0][1]
date_fmt = segment.elements[0][2]
if qualifier == "137": # Document date
try:
current_order["order_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
logger.warning("DTM parse error: %s", date_str)
elif qualifier == "2": # Requested delivery date
try:
current_order["requested_delivery_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
pass
elif tag == "NAD":
qualifier = segment.elements[0]
party_id = segment.elements[1][0] if len(segment.elements) > 1 else None
party_name = segment.elements[3] if len(segment.elements) > 3 else None
if qualifier == "BY": # Buyer (retailer)
current_order["buyer_gln"] = party_id
current_order["buyer_name"] = party_name
elif qualifier == "SU": # Supplier
current_order["supplier_gln"] = party_id
elif tag == "LIN":
# Salva riga precedente se esiste
if current_line and current_order:
current_order["lines"].append(current_line)
current_line = {
"line_number": segment.elements[0],
"gtin": None,
"qty_ordered": None,
"delivery_date": None,
}
elif tag == "PIA" and current_line:
# Product identification: GTIN-13 o GTIN-14
if len(segment.elements) > 1:
qualifier = segment.elements[1][1] if len(segment.elements[1]) > 1 else ""
if qualifier in ["SRV", "GE1"]: # GTIN qualifiers
current_line["gtin"] = segment.elements[1][0]
elif tag == "QTY" and current_line:
qualifier = segment.elements[0][0]
if qualifier == "21": # Ordered quantity
current_line["qty_ordered"] = float(segment.elements[0][1])
current_line["qty_unit"] = segment.elements[0][2] if len(segment.elements[0]) > 2 else "PCE"
# Normalizza output
for order in orders:
for line in order.get("lines", []):
orders.append({
"order_number": order.get("order_number"),
"order_date": order.get("order_date"),
"requested_delivery_date": order.get("requested_delivery_date"),
"buyer_gln": order.get("buyer_gln"),
"buyer_name": order.get("buyer_name"),
"supplier_gln": order.get("supplier_gln"),
"line_number": line.get("line_number"),
"gtin": line.get("gtin"),
"qty_ordered": line.get("qty_ordered"),
"qty_unit": line.get("qty_unit"),
"_source": "EDI_EDIFACT_ORDERS",
"_parsed_at": datetime.utcnow().isoformat(),
})
return pd.DataFrame(orders)
Studium przypadku: Spółdzielnia rolnicza licząca 500 gospodarstw
Zobaczmy, jak te schematy mają zastosowanie w prawdziwym przypadku: dużej spółdzielni rolniczej północnych Włoch z 500 stowarzyszonymi przedsiębiorstwami rolniczymi, 3 centrami przetwórstwa i przetwórstwa, oraz 200 punktów sprzedaży pomiędzy wielkopowierzchniowym handlem detalicznym a rynkami lokalnymi.
Profil spółdzielczy
- 500 zrzeszonych gospodarstw: owoce i warzywa, zboża, rośliny strączkowe – dane IoT MQTT z 8000 czujników
- 3 centra obróbcze: Starsza wersja SAP R/3 (2005), niestandardowe MES, LIMS
- 200 punktów sprzedaży: Heterogeniczny POS (3 różnych dostawców), EDI EDIFACT z 15 sprzedawcami detalicznymi
- Sytuacja wyjściowa: Arkusze Excel, izolowane dane, brak kompleksowej widoczności
- Cele: zmniejszyć ilość odpadów o 40%, poprawić OTIF do 95%, zgodność z FSMA
Faza 1: Ocena i inwentaryzacja źródeł (tygodnie 1-4)
# Inventario sorgenti dati: output assessment
SOURCE_INVENTORY = {
"farm_iot": {
"count": 8000,
"protocol": "MQTT v3.1.1",
"broker": "Mosquitto on-premise",
"format": "JSON custom",
"issues": [
"Schema non standardizzato tra vendor IoT diversi",
"15% sensori con timestamp errati (clock drift)",
"3 farm su 500 senza connettivita stabile (4G intermittente)",
],
"recommended_fix": "Normalizzazione schema + edge buffer (Mosquitto local)",
},
"sap_erp": {
"version": "SAP R/3 4.7 (anno 2005)",
"tables_relevant": ["MCHA", "MCH1", "MSEG", "MKPF", "VBAK", "VBAP"],
"issues": [
"No REST API: solo RFC/BAPI",
"Batch extraction window: 01:00-03:00 (finestra ristretta)",
"Codifica caratteri: ISO-8859-1 (non UTF-8)",
"Date in formato SAP (YYYYMMDD come intero)",
],
"recommended_fix": "pyrfc + ETL incremental con delta timestamp",
},
"edi_partners": {
"partners": 15,
"standards": ["EANCOM D.01B", "X12 850/856", "GS1 XML 3.1"],
"transport": "SFTP (12 partner), AS2 (3 partner)",
"issues": [
"3 retailer usano EDIFACT non-standard (varianti proprietarie)",
"Mancanza di RECADV da 5 retailer: no conferma ricezione",
],
},
"pos_retail": {
"vendors": ["Cassa Easy", "NCR Aloha", "Custom PHP legacy"],
"issues": [
"3 formati CSV diversi",
"Nessun campo shelf_life nei dati POS",
"Granularità: ora (non transazione)",
],
},
}
Faza 2: Implementacja stosu technicznego (tygodnie 5-16)
# Stack tecnico implementato per la cooperativa
INFRASTRUCTURE = {
"data_lake": "AWS S3 (Medallion: Bronze/Silver/Gold)",
"compute": "AWS EMR Serverless (Spark per bulk load iniziale)",
"warehouse": "Snowflake (pay-per-query, snowpark per ML features)",
"orchestration": "Apache Airflow 2.9 su AWS MWAA (managed)",
"transformation": "dbt Cloud (Team plan: 8 developers)",
"data_quality": "Great Expectations + Slack alerts",
"streaming": "Apache Kafka su MSK (managed) per cold chain",
"monitoring": "Grafana + Prometheus (Airflow metrics, DWH queries)",
"ci_cd": "GitHub Actions (dbt CI: test su ogni PR)",
"secret_management": "AWS Secrets Manager",
}
TIMELINE = [
{"week": "1-4", "milestone": "Assessment sorgenti + architettura design"},
{"week": "5-7", "milestone": "Bronze layer: connettori SAP + IoT MQTT"},
{"week": "8-10", "milestone": "Bronze layer: EDI + POS + LIMS"},
{"week": "11-12","milestone": "Silver layer: dbt models + data quality"},
{"week": "13-14","milestone": "Gold layer: KPI dashboard + alerting"},
{"week": "15-16","milestone": "UAT + go-live + team training"},
}
TEAM = {
"data_engineers": 3,
"dbt_developers": 2,
"sap_consultant": 1, # part-time per RFC/BAPI
"project_manager": 1,
}
Wyniki uzyskane po 6 miesiącach
| KPI | Przed (wartość bazowa) | Po 6 miesiącach | Poprawa |
|---|---|---|---|
| Poziom odpadów | 7,2% | 4,3% | -40% odpadów (-2,9pp) |
| OTIF | 88% | 94% | +6pp |
| Okres ważności w momencie odbioru | 58% | 74% | +16pp |
| Zgodność z łańcuchem chłodniczym | 94% (szacunkowo) | 99,1% (zmierzone) | +5,1pp przy rzeczywistej widoczności |
| Przypomnij sobie czas rozwiązania | 48-72 godziny | 2-4 godziny | -93% czasu reakcji |
| Raport ręczny dyrektorów | 3-4 godziny/tydzień | Automatyczny pulpit nawigacyjny | -100% praca ręczna |
| Pytanie o dokładność prognozy | 71% | 86% | +15pp |
| Dni dostaw (świeże) | 8-12 dni | 4-6 dni | -50% nadwyżki zapasów |
Wnioski wyciągnięte ze studium przypadku
- SAP RFC/BAPI jest wolniejszy niż myślisz: nocne okno wsadowe było najbardziej krytycznym wąskim gardłem. Rozwiązaniem było wynegocjowanie okna z działem IT szerzej i stosuj ekstrakcję przyrostową, aby zminimalizować objętość.
- EDI nigdy nie jest standardem: z 15 partnerów handlowych 11 miało różnice właściciele standardu EANCOM. Ukończenie parsera EDIFACT zajęło 2 tygodnie kalibracja zgodnie ze specyfikacjami każdego partnera.
- Jakość danych to problem kulturowy, a nie tylko techniczny: 30% z problemy z jakością wynikały z błędnych, ręcznych wpisów do SAP przez operatorów. Rozwiązanie techniczne („Wielkie oczekiwania”) uwydatniło problem, ale uchwała tak wymagane szkolenia i zarządzanie zmianami.
- Warstwa brązu zapisz wszystko: było to konieczne trzy razy w ciągu sześciu miesięcy uruchom ponownie transformacje Silvera, aby naprawić błędy w modelach dbt. Dzięki Warstwa brązowa niezmienna, żadne dane nie są tracone.
- dbt CI/CD zmienia sposób, w jaki pracujesz: przetestuj każdy model na każdym PR zapobiegł około 4 incydentom produkcyjnym w pierwszym miesiącu.
Pełne podsumowanie serii FoodTech
To była długa podróż przez technologie, które zmieniają branżę jedzenie. Oto, co wspólnie stworzyliśmy w tej serii 10 artykułów:
| # | Przedmiot | Nauczono się kluczowych pojęć | Stos technologii |
|---|---|---|---|
| 00 | Rurociąg IoT dla rolnictwa precyzyjnego | Broker MQTT, czujniki terenowe, pozyskiwanie danych z jeziora danych, poziomy QoS | Python, MQTT, Mosquitto, MinIO |
| 01 | ML Edge do wykrywania chorób upraw | Kwantyzacja TensorFlow Lite, wnioskowanie brzegowe, wdrażanie modelu na ARM | TFLite, Raspberry Pi, Python, OpenCV |
| 02 | Satelitarne i meteorologiczne interfejsy API dla AgriTech | Sentinel-2 NDVI, Planet Labs API, inżynieria funkcji pogodowych dla ML | Sentinel Hub, OpenMeteo, rasterio, GeoPandas |
| 03 | System identyfikowalności żywności | Prywatne blockchainy (Hyperledger), RFID EPCIS, standardy GS1 | Tkanina Hyperledger, RFID, EPCIS, Python |
| 04 | Wizja komputerowa w kontroli jakości | Dostrajanie YOLO w zakresie zbiorów danych produktów, wnioskowanie o rurociągach przemysłowych | PyTorch, YOLO, FastAPI, OpenCV |
| 05 | Automatyka FSMA 204 | KDE, CTE wymagały żywności, identyfikowalności partii, automatyzacji wycofywania | Python, FastAPI, PostgreSQL, alarmowanie |
| 06 | Pionowa automatyzacja rolnictwa | Kontrola parametrów środowiskowych, harmonogramowanie robotyczne, REST API dla gospodarstw rolnych | Python, FastAPI, InfluxDB, MQTT |
| 07 | Prognozowanie popytu w celu ograniczenia ilości odpadów | LSTM, Prorok, inżynieria cech sezonowości żywności, weryfikacja historyczna | PyTorch, Prorok, MLflow, pandy |
| 08 | Pulpit nawigacyjny w czasie rzeczywistym dla Farm IoT | Sygnały kątowe, dostarczanie Grafana, zapytania InfluxDB, alarmowanie | Angular 21, Grafana, InfluxDB, WebSocket |
| 09 | Łańcuch dostaw ETL od gospodarstwa do sprzedawcy | Architektura medalionowa, DAG przepływu powietrza, modele dbt, Wielkie oczekiwania, KPI | Przepływ powietrza, dbt, płatek śniegu, GE, SAP RFC, EDI |
Mapa drogowa dla tych, którzy chcą dowiedzieć się więcej
Jeśli przeczytałeś całą serię, masz teraz solidne podstawy do podejmowania prawdziwych projektów FoodTech. Oto uporządkowany plan działania pozwalający głębiej zagłębić się w każdą domenę:
Zalecany kurs pogłębiony
- IoT i przetwarzanie brzegowe: przestudiuj MQTT 5.0 (w porównaniu z wersją 3.1.1 zastosowaną w tym przypadku artykuł), Azure IoT Hub lub AWS IoT Core w przypadku rozwiązań zarządzanych oraz Apache NiFi w przypadku danych pozyskiwanie danych z heterogenicznych źródeł IoT.
- ML w produkcji: seria MLOps na tym blogu nauczy Cię, jak to zrobić wersji, przeprowadzaj testy A/B i monitoruj dryf w produkcji za pomocą MLflow i Seldon.
- Architektura danych: dowiedz się więcej o Data Lakehouse (seria Dane i sztuczna inteligencja Business), aby zrozumieć porównanie Snowflake, Databricks i Apache Iceberg platformy analityczne łańcucha dostaw.
- LLM i generatywna sztuczna inteligencja dla łańcucha dostaw: Modele z dużym językiem mogą wydobywaj ustrukturyzowane informacje z dokumentów zgodności, analizuj umowy z dostawcami i generuj automatyczne raporty. Seria bloga AI Engineering poświęcona jest RAG i dostrajaniu.
- Standardy branżowe do zbadania: Globalne standardy GS1 (GTIN, GLN, SSCC, EPCIS), FSMA 204 (FDA), rozporządzenie UE 178/2002 (identyfikowalność żywności), ISO 22000 (HACCP).
- Odpowiednie certyfikaty: Certyfikowany inżynier danych AWS, certyfikat dbt Programista, certyfikowany programista Apache Airflow przez astronoma, współpracownik inżyniera danych Databricks.
Najlepsze praktyki i anty-wzorce dla żywności ETL
Najlepsze praktyki
- Warstwa brązu niezmienna: nigdy nie modyfikuj surowych danych. Jeśli znajdziesz błąd w modelu Silver dbt napraw model i uruchom ponownie — brąz pozostanie nienaruszony. Ma to fundamentalne znaczenie dla przepisów FSMA i UE. 178/2002 ścieżki audytu.
-
Idempotencja DAG przepływu powietrza: każde zadanie musi być możliwe do ponownego wykonania
bez skutków ubocznych. USA
replace=Truedla przesyłania S3,MERGEzamiastINSERTdla baz danych oraz klucze unikalne w przyrostowych modelach dbt. -
Wyraźne limity czasu dla każdego zadania: starsze systemy, takie jak SAP RFC, mogą
blokować na czas nieokreślony. Zawsze ustawione
execution_timeout=timedelta(hours=2)przy każdym zadaniu Airflow. - Monitorowanie opóźnień w przetwarzaniu: rurociąg rozpoczynający się o godzinie 02:00 ale nie ukończone do godziny 06:00 (przed zmianą operacyjną) i bezużyteczne. Monitoruj ja terminy realizacji i ustaw alerty SLA w Airflow.
- Wydzielenie schematu EDI dla partnerów: utrzymywać dedykowany parser EDI dla każdego partnera handlowego. Zastrzeżone warianty uniemożliwiają parser wyjątkowy uniwersalny.
- Udokumentowana data rodowodu: dbt automatycznie generuje wykres rodowód. Użyj go, aby odpowiedzieć na pytania audytu: „Skąd pochodzi ten KPI OTIF?”.
Anty-wzorce, których należy unikać
- Transformacje w warstwie ekstrakcyjnej (klasyczny ETL): przekształcić dane przed zapisaniem ich w warstwie Brązowej oznaczają utratę możliwości zrobienia tego uzupełnij lub napraw błędy. Zawsze ładuj surowe, przekształcaj później za pomocą dbt.
- Monolityczny DAG do wszystkiego: staje się pojedynczy DAG z ponad 50 zadaniami niemożliwe do debugowania. Podziel według domeny (SAP DAG, IoT DAG, EDI DAG) i użyj Airflow TriggerDagRunOperator dla zależności między DAG.
- Ignorowanie stref czasowych: gospodarstwo rolne na Sycylii, centrum dystrybucyjne w Mediolanie i sprzedawca w Niemczech używają różnych stref czasowych. Zawsze konwertuj na UTC w warstwie Brązowej i wszędzie używa znaczników czasu uwzględniających strefę czasową.
- Zakodowane na stałe dane uwierzytelniające w DAG: NIGDY nie wprowadzaj haseł SAP ani tokenów API w kodzie przepływu powietrza. Użyj połączeń i zmiennych przepływu powietrza lub Menedżera sekretów AWS.
- Pomiń jakość danych na platformie Bronze: zatwierdź tylko warstwę Złota i za dużo późno. Problemy należy zidentyfikować tak szybko, jak to możliwe: nieprawidłowy odczyt temperatury w przypadku brązu staje się to fałszywie pozytywnym przypomnieniem w przypadku złota.
- Harmonogram przepływu powietrza jako wykonawca: zadanie nie może zostać wykonane z samego harmonogramu przepływu powietrza. Zawsze używaj oddzielnego Executora (CeleryExecutor, KubernetesExecutor) w celu odizolowania dużych obciążeń.
Wnioski: koniec ścieżki, początek innej
Dotarliśmy do końca serii FoodTech. W dziesięciu artykułach zbudowaliśmy kawałek kawałek po kawałku wizję łańcucha dostaw żywności całkowicie oparte na danych: czujniki terenowe przesyłające dane w czasie rzeczywistym, modele ML wykrywające choroby przed ludzkim okiem, blockchain dla niezmiennej identyfikowalności, wizja komputerowa dla kontrola jakości, zautomatyzowana zgodność z FSMA, farmy wertykalne kontrolowane poprzez API, prognozowanie popytu, które zmniejsza ilość odpadów, pulpity nawigacyjne działające w czasie rzeczywistym dla kadry zarządzającej oraz, w najnowszy artykuł, potok ETL, który łączy to wszystko w całość.
Sektor rolno-spożywczy jest jednym z najbardziej skomplikowanych pod względem digitalizacji: nieprzewidywalna sezonowość, produkty łatwo psujące się, rygorystyczne przepisy, dotychczasowe systemy skonsolidowane przez dziesięciolecia, łańcuchy dostaw globalny z setkami graczy. Ale właśnie z tego powodu możliwości tworzenia wartości z technologią są ogromne. Zmniejsz w spółdzielni poziom odpadów z 7% do 4%. w przypadku 500 gospodarstw to nie tylko poprawa operacyjna: to pieniądze, to żywność, to wpływ na środowisko.
Łańcuch dostaw żywności 2030 zostanie zdefiniowany przez dane w czasie rzeczywistym (każda partia prześledzona od siewu do widelca), Przewidywalna sztuczna inteligencja (zero nadmiaru zapasów, zero zapasów, zero marnotrawstwa), zautomatyzowana zgodność (Audyt FSMA w 2 godziny, a nie 2 tygodnie) e mierzalna trwałość (ślad węglowy na działkę, zużycie wody na hektar).
Są tam wszystkie technologie. Wzorzec ETL, którego nauczyłeś się w tym artykule — Przepływ powietrza za orkiestrację, dbt za transformacje, Wielkie oczekiwania w zakresie jakości danych, Medalion architektura dla konstrukcji — i można ją natychmiast zastosować w każdym przedsiębiorstwie rolno-spożywczym średniej lub dużej wielkości.
Przeglądaj inne powiązane serie blogów
- Biznes danych i sztucznej inteligencji (seria 14): zagłębia się w hurtownię danych (Snowflake, Databricks, BigQuery), ETL/ELT z dbt i Airbyte, przedsiębiorstwo MLOps i zarządzanie danymi — naturalne uzupełnienie tej serii FoodTech.
- MLOps (seria 5): jak wprowadzić do produkcji modele ML na żądanie prognozowanie i wizja komputerowa, które opracowaliśmy w artykułach 01, 04 i 07.
- Inżynieria AI/RAG (seria 6): jak używać LLM do wysyłania zapytań dokumenty zgodności, analizuj certyfikaty HACCP i generuj automatyczne raporty dla łańcucha dostaw.
- PostgreSQL AI (seria 10): jak używać pgvector do wyszukiwania semantycznego na opisach produktów, przepisach i specyfikacjach technicznych żywności – przydatne dla zarządzanie katalogiem produktów w branży spożywczej.
- EnergyTech (seria): wiele technologii IoT i potoków danych Seria FoodTech jest identyczna z tymi stosowanymi do monitorowania energii przemysłowych zakładów spożywczych.
Dziękujemy za śledzenie całej serii FoodTech. Miłego kodowania i smacznego.







