Gıda Tedarik Zinciri: Çiftlikten Perakendeciye ETL Modeli
Her yıl yaklaşık olarak İnsan tüketimi için üretilen gıdaların üçte biri kayboluyor veya israf ediliyor Gıda tedarik zinciri boyunca: Yalnızca ABD'de 473 milyar doları aşan bir değer. Dünya çapında her gün bir milyar öğün çöpe atılıyor. Paradoksal olarak, önemli bir pay bu israf iklimsel veya biyolojik faktörlere değil, bir veri sorununa bağlıdır: görünürlük Yetersiz, eski sistemler entegre edilmemiş, KPI’lar geç hesaplanıyor ve kararlar bilgiye dayalı olarak alınıyor eksik veya yanlış.
Küresel pazar gıda tedarik zinciri yönetimi 2025'te 182,81 milyar dolar değerinde ve 2034 yılına kadar 359,39 milyara ulaşacak (CAGR %7,8). Ancak gıda tedarikçilerinin %48'i hâlâ kullanıyor günlük işlemler için elektronik tablolar kullanıyor ve %60'ı enerji tüketen tekrarlayan manuel görevleri rapor ediyor değerli zaman. Teknolojinin izin verdiği ile tarım-gıda şirketlerinin yaptıkları arasındaki uçurum aslında uyguluyorlar ve çok büyük.
FoodTech serisinin en son makalesi olan bu makale, sorunu kökünden ele alıyor: Sağlam ve ölçeklenebilir ETL/ELT hattı Heterojen verileri çiftlikten perakendeciye entegre etmek, Apache Airflow ile akışları düzenleyin, dbt ile verileri dönüştürün, Great ile kaliteyi sağlayın Beklentileri ve gıda sektörüne özel tedarik zinciri KPI'ları ile performansı ölçün.
Ne Öğreneceksiniz
- Gıda tedarik zincirinin uçtan uca veri mimarisi (çiftlik → işleme → dağıtım → perakende)
- Heterojen veri kaynakları: ERP (SAP/Oracle), MES, LIMS, WMS, TMS, POS, IoT, EDI
- ETL ve ELT: Gıda verileri için hangi yaklaşımın ne zaman seçileceği
- Hata işleme ve yeniden deneme özellikleriyle gıda işlem hatları için eksiksiz Apache Airflow DAG'leri
- Tedarik zinciri KPI'ları için DBT modelleri: atık oranı, OTIF, doluluk oranı, tedarik günleri
- Soğuk zincirler ve son kullanma tarihleri için Büyük Beklentilere sahip veri kalitesi
- Eski sistem entegrasyonu: SAP RFC/BAPI, EDI EDIFACT, Debezium ile CDC
- Örnek olay: tarım kooperatifi 500 çiftlik, 3 işleme merkezi, 200 satış noktası
- FoodTech serisinin tamamının tam özeti (10 makale)
FoodTech Serisi: Neredeyiz
Bu serinin onuncu ve son bölümüdür Gıda Teknolojisi, ana olanları araştıran Gıda tedarik zincirinin tamamına uygulanabilen dijital teknolojiler: sahada veri toplanmasından perakende. İşte resmin tamamı:
| # | Başlık | Teknolojiler | Seviye |
|---|---|---|---|
| 00 | Python ve MQTT ile Hassas Tarım için IoT Pipeline | IoT, MQTT, Python, Veri Gölü | Gelişmiş |
| 01 | Mahsul Hastalıklarının Tespiti için ML Edge: Raspberry Pi'de TensorFlow Lite | TensorFlow Lite, Edge ML, Raspberry Pi | Gelişmiş |
| 02 | AgriTech için Uydu ve Hava Durumu API'leri: Tahmine Dayalı Veriler | Sentinel, Planet, Hava Durumu API'si, NDVI | Gelişmiş |
| 03 | Gıda İzlenebilirlik Sistemi: Blockchain, RFID ve IoT | Blockchain, RFID, Nesnelerin İnterneti, Uyumluluk | Gelişmiş |
| 04 | PyTorch YOLO ile Gıda Kalite Kontrolü için Bilgisayarlı Görme | YOLO, PyTorch, Bilgisayarla Görme | Gelişmiş |
| 05 | FSMA 204 Otomasyon: Python ile Takip, Uyarı ve Geri Çağırma | FSMA, Uyumluluk, Python, Geri Çağırma | Gelişmiş |
| 06 | Dikey Tarım Otomasyonu: API aracılığıyla Robotik Kontrol | Robotik, API'ler, Otomasyon | Gelişmiş |
| 07 | Atıkların Azaltılması için Talep Tahmini: ML Zaman Serisi | LSTM, Peygamber, Zaman Serisi ML | Gelişmiş |
| 08 | Angular ve Grafana ile Çiftlik IoT için Gerçek Zamanlı Kontrol Paneli | Açısal, Grafana, InfluxDB, Gerçek Zamanlı | Gelişmiş |
| 09 | Gıda Tedarik Zinciri: Çiftlikten Perakendeciye ETL Modeli <-- Buradasın | Hava akışı, dbt, Büyük Beklentiler, ETL | Gelişmiş |
çünkü bu makale seriyi kapatıyor
Önceki makaleler zincirdeki bireysel düğümleri ele alıyordu: Sahada IoT veri toplama, ML uç modelleri, blockchain izlenebilirliği, kalite için bilgisayar görüşü, FSMA uyumluluğu, dikey tarım, talep tahmini ve gerçek zamanlı gösterge tabloları. Bu makale hepsini birbirine bağlıyor: ETL boru hattı ve her veri kaynağını entegre eden omurga, ham bilgiyi dönüştürür eyleme dönüştürülebilir KPI'lara dönüştürür ve çiftlikten süpermarket rafına kadar uçtan uca görünürlük sağlar.
Gıda Tedarik Zincirinin Veri Mimarisi
Gıda tedarik zinciri, her biri kendi bilgi sistemlerine ve formatlarına sahip farklı düğümlerden oluşur. Veriler ve güncelleme sıklıkları. Bir ETL boru hattının bu heterojenliği sağlam bir şekilde ele alması gerekir.
# Architettura logica della supply chain alimentare
# (dati generati ad ogni step)
FARM SYSTEMS
|-- Sensori IoT (MQTT): temp suolo, umidita, pH, EC
|-- Sistema ERP agricolo: colture pianificate, costi input
|-- Registro trattamenti fitosanitari (LIMS)
|-- Geo-dati GPS macchinari agricoli
|-- API meteo (OpenMeteo, Copernicus)
|
v
PROCESSING PLANTS (Centri di Lavorazione)
|-- MES (Manufacturing Execution System): ordini produzione, rese
|-- LIMS: analisi chimiche, microbiologiche, allergeni
|-- ERP SAP/Oracle: distinta base, lotti, tracciabilita
|-- WMS (Warehouse Management): magazzino materie prime/finiti
|-- Sensori cold chain: temperatura/umidita durante lavorazione
|
v
DISTRIBUTION CENTERS
|-- WMS: gestione stock, picking, slotting
|-- TMS (Transport Management): spedizioni, veicoli, routing
|-- Sensori cold chain: logger temperatura nei camion frigoriferi
|-- EDI: ordini da retailer (EDIFACT ORDERS, DESADV)
|-- Tracker GPS: posizione veicoli in tempo reale
|
v
RETAIL POS
|-- POS data: vendite per SKU, store, ora
|-- Gestione shelf life: prodotti prossimi scadenza
|-- EDI: conferme ricezione (RECADV), fatture (INVOIC)
|-- Inventario store: stock disponibile per punto vendita
|
v
CONSUMER (segnali indiretti)
|-- App loyalty: acquisti per cliente, frequenza
|-- Reviews prodotto: qualità percepita
|-- Social sentiment: trend di consumo
Her düğüm farklı formatlarda veri üretir (ilişkisel SQL, CSV/EDI düz dosyası, REST API'den JSON, milisaniye (IoT sensörleri) arasında değişen frekanslara sahip ikili MQTT mesajları, eski SFTP dosyaları) haftalık (ERP raporu). Gıda veri mühendisinin görevi tüm bunları birleştirmek tek bir tutarlı analitik modele dönüştürür.
Heterojen Veri Kaynakları: Referans Tablosu
Herhangi bir boru hattını tasarlamadan önce kaynakları araştırmak önemlidir. İşte başlıcaları orta düzeyde karmaşıklığa sahip bir gıda tedarik zinciri için:
| Sistem | Tip | Biçim | Hacim/gün | Sıklık | Kabul edilebilir gecikme |
|---|---|---|---|---|---|
| SAP ERP (çiftlik/fabrika) | ERP | RFC/BAPI, IDoc, SQL | 50K–500K kayıt | Gece partisi / CDC | T+1 saat |
| Oracle EBS'si | ERP | SQL, REST API'si | 100K–1M rekoru | Toplu / CDC | T+1 saat |
| MES (üretim) | ME'ler | OPC-UA, SQL, REST | 1 milyon ila 10 milyon etkinlik | Neredeyse gerçek zamanlı (1 dk) | 5 dakika |
| LIMS (laboratuvar) | LIMS | HL7, CSV, REST | 1K–10K analizi | Günlük | T+4 saat |
| WMS (depo) | WMS | SQL, EDI, REST | 10K–100K hareketler | Her 15 dakikada bir | 15 dakika |
| TMS (ulaşım) | TMS | REST API, EDI | 5K–50K gönderiler | Her 5 dakikada bir (GPS) | 5 dakika |
| Perakende POS | POS | CSV, REST, SQL | 1 milyon–50 milyon işlem | Saatlik/akşam partisi | T+2 saat |
| IoT soğuk zincir sensörleri | Nesnelerin İnterneti | MQTT, JSON, Protobuf | 100 milyondan fazla okuma | Her 30 saniyede bir – 5 dakikada bir | Gerçek zamanlı (<1 dakika) |
| EDI ortağı (EDIFACT) | EDI | EDIFACT, X12, GS1 XML | 100–10.000 mesaj | Olay odaklı / toplu | T+30 dk |
| Hava Durumu API'sı (OpenMeteo) | Harici API | JSON REST | 1K–10K çağrı | Saatlik | T+1 saat |
| AGEA (CAP sübvansiyonları) | Kamu kurumu | XML, CSV SFTP | 1–100 dosya | Aylık/sezonluk | T+24 saat |
| GPS araç takip cihazı | Nesnelerin İnterneti/Telematik | REST, WebSockets | 10 milyondan fazla GPS noktası | Her 30 saniyede bir | <2 dakika |
Dikkat: Eski ERP ve "Çıkartma Pencereleri"
Birçok tarım ve tarım-gıda ERP'si yerel CDC'yi desteklemez. Genellikle çıkarmanın tek yolu veriler ve tabloları kilitleyen gecelik toplu işler aracılığıyla veya SAP RFC/BAPI arayüzleri aracılığıyla sert bakım pencereleri. Ekstraksiyon pencerelerinizi önceden planlayın ve tahmin edin Gecelik parti arızası durumunda kurtarma mekanizmaları.
Gıda Tedarik Zinciri için ETL ve ELT
ETL ve ELT tartışması özellikle verilerin bir arada bulunduğu gıda sektörüyle ilgilidir geleneksel toplu veriler (SAP ERP) ve düzenleyici gereklilikler ile yüksek frekans (IoT soğuk zincir) sıkıdır (izlenebilirlik, FSMA 204).
| Kriter | Geleneksel ETL | Modern ELT (dbt + Veri Gölü) |
|---|---|---|
| Dönüşüm | Yüklemeden önce (hazırlama sunucusu) | Veri ambarında (yerel SQL) |
| Ölçeklenebilirlik | ETL sunucusuyla sınırlıdır | DWH (Snowflake, BigQuery, DuckDB) ile ölçeklendirme |
| Ham veri saklama | Çoğu zaman hayır (veriler zaten dönüştürülmüştür) | Evet: Bronz katman ham maddeyi korur |
| Denetlenebilirlik (FSMA) | Sert: Bulanık soy | Mükemmel: tam dbt köken grafiği |
| Gecikme | Toplu (T+1h, T+1d) | Mikro toplu veya akış (Flink/Spark) |
| Soğuk zincir hataları | ETL başarısız olursa veri kaybı | Raw her zaman kaydedilir; yalnızca dönüşümü yeniden dene |
| Ekip becerileri gerekli | Java/Informatica/SSIS geliştiricisi | SQL + Python (dbt + Hava Akışı) |
| Altyapı maliyeti | Her zaman açık özel sunucu | Sorgu başına ödeme (Snowflake/BigQuery) |
Modern gıda tedarik zinciri için öneri bir mimaridir Madalyon ELT:
# Architettura Medallion per Food Supply Chain
BRONZE LAYER (Raw, immutabile)
|-- Tutti i dati sorgente caricati as-is
|-- Partitioned by: source_system, ingestion_date
|-- Retention: 7 anni (compliance FSMA/EU)
|-- Formato: Parquet su S3/GCS o Delta Lake
|
v
SILVER LAYER (Cleansed, deduplicato)
|-- Dbt models: normalizzazione schemi eterogenei
|-- Deduplicazione lotti, SKU, location codes
|-- Type casting: date, temperature (C/F), pesi (kg/lb)
|-- NULL handling: sensori offline, ERP null fields
|-- PII masking: dati operatori, conducenti
|
v
GOLD LAYER (KPI pronti per business)
|-- Aggregazioni: fill rate, waste rate, OTIF per SKU
|-- Metriche cold chain: violazioni temperatura per lotto
|-- Dashboard BI: Power BI, Tableau, Grafana
|-- API ML: feature store per demand forecasting
|
v
DATA MART
|-- Retail mart: sell-through, shelf life at receipt
|-- Operations mart: OEE impianti, rendimenti produzione
|-- Finance mart: costo per lotto, margine per SKU
|-- Compliance mart: tracciabilita FSMA, recall readiness
Gıda Boru Hattı Düzenlemesi için Apache Hava Akışı
Apache Airflow, karmaşık ETL iş akışlarının düzenlenmesine yönelik fiili standarttır. Tedarik zincirinde güç sağlamak için tek bir DAG'nin düzinelerce kaynağı gecikmeler, bağımlılıklar ve politikalarla koordine etmesi gerekir farklı yeniden denemeler. Günlük boru hattı için eksiksiz bir DAG görelim.
Ana DAG: Gıda Tedarik Zinciri Günlük ETL
# food_supply_chain_dag.py
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.trigger_rule import TriggerRule
logger = logging.getLogger(__name__)
DEFAULT_ARGS = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
with DAG(
dag_id="food_supply_chain_daily_etl",
description="Pipeline ETL giornaliera: farm -> processing -> distribution -> retail",
default_args=DEFAULT_ARGS,
schedule_interval="0 2 * * *", # ogni notte alle 02:00 UTC
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["foodtech", "supply-chain", "etl", "production"],
max_active_runs=1,
doc_md="""
## Food Supply Chain Daily ETL
Estrae dati da: SAP ERP, MES impianti, WMS, TMS, POS retail.
Carica su Bronze S3, poi lancia dbt per Silver e Gold.
""",
) as dag:
# ----------------------------------------------------------------
# STEP 1: Health checks sulle sorgenti
# ----------------------------------------------------------------
check_sap_available = HttpSensor(
task_id="check_sap_api_available",
http_conn_id="sap_erp_api",
endpoint="/api/health",
timeout=30,
poke_interval=60,
mode="reschedule",
soft_fail=False,
)
check_mes_available = HttpSensor(
task_id="check_mes_api_available",
http_conn_id="mes_plant_api",
endpoint="/health",
timeout=30,
poke_interval=60,
mode="reschedule",
)
# ----------------------------------------------------------------
# STEP 2: Estrazione SAP ERP (lotti, ordini, inventario)
# ----------------------------------------------------------------
@task(task_id="extract_sap_erp", retries=3)
def extract_sap_erp(**context: Any) -> dict:
"""Estrae dati da SAP ERP via RFC/BAPI wrapper REST."""
from src.connectors.sap_connector import SAPConnector
execution_date = context["ds"] # 'YYYY-MM-DD'
sap = SAPConnector(conn_id="sap_erp_api")
# Estrai: lotti produzione del giorno precedente
batches = sap.get_production_batches(
date=execution_date,
plants=["PL001", "PL002", "PL003"],
)
# Estrai: movimenti magazzino
stock_movements = sap.get_stock_movements(
date=execution_date,
movement_types=["101", "261", "501", "601"],
)
# Estrai: ordini di vendita confermati
sales_orders = sap.get_sales_orders(
date=execution_date,
status=["CONF", "DLVR"],
)
# Salva su S3 Bronze layer
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/sap_erp/dt={execution_date}"
s3.load_string(
string_data=batches.to_json(orient="records", lines=True),
key=f"{prefix}/batches.jsonl",
bucket_name="food-data-lake",
replace=True,
)
s3.load_string(
string_data=stock_movements.to_json(orient="records", lines=True),
key=f"{prefix}/stock_movements.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"SAP extraction OK: %d batches, %d movements, %d orders",
len(batches),
len(stock_movements),
len(sales_orders),
)
return {
"batches_count": len(batches),
"movements_count": len(stock_movements),
"orders_count": len(sales_orders),
}
# ----------------------------------------------------------------
# STEP 3: Estrazione dati cold chain (IoT sensori temperatura)
# ----------------------------------------------------------------
@task(task_id="extract_cold_chain_iot")
def extract_cold_chain_iot(**context: Any) -> dict:
"""Estrae letture temperatura da InfluxDB (IoT cold chain)."""
from influxdb_client import InfluxDBClient
execution_date = context["ds"]
client = InfluxDBClient(
url="{{ var.value.influxdb_url }}",
token="{{ var.value.influxdb_token }}",
org="foodtech-org",
)
query_api = client.query_api()
# Query Flux: temperature readings per lotto di trasporto
flux_query = f"""
from(bucket: "cold-chain")
|> range(start: {execution_date}T00:00:00Z,
stop: {execution_date}T23:59:59Z)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "celsius")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time", "lot_id"], columnKey: ["_field"], valueColumn: "_value")
"""
result = query_api.query_data_frame(flux_query)
# Calcola violazioni: temperatura fuori range per categoria prodotto
violations = result[
(result["celsius"] < result["min_temp_required"]) |
(result["celsius"] > result["max_temp_required"])
]
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/cold_chain/dt={execution_date}"
s3.load_string(
string_data=result.to_json(orient="records", lines=True),
key=f"{prefix}/temperature_readings.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info(
"Cold chain extraction OK: %d readings, %d violations",
len(result),
len(violations),
)
if len(violations) > 0:
logger.warning(
"COLD CHAIN VIOLATIONS: %d eventi fuori temperatura!",
len(violations),
)
return {
"readings_count": len(result),
"violations_count": len(violations),
}
# ----------------------------------------------------------------
# STEP 4: Estrazione EDI dai partner retailer
# ----------------------------------------------------------------
@task(task_id="extract_edi_partners")
def extract_edi_partners(**context: Any) -> dict:
"""Processa file EDI EDIFACT da SFTP partners."""
from src.connectors.edi_connector import EDIFACTConnector
execution_date = context["ds"]
edi = EDIFACTConnector(sftp_conn_id="edi_sftp_partners")
# Download e parsing ORDERS (EDIFACT ORDERS D.01B)
orders = edi.process_orders(date=execution_date)
# Download e parsing DESADV (Despatch Advice)
despatch_advices = edi.process_desadv(date=execution_date)
# Download e parsing RECADV (Receiving Advice)
receiving_advices = edi.process_recadv(date=execution_date)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/edi_partners/dt={execution_date}"
s3.load_string(
string_data=orders.to_json(orient="records", lines=True),
key=f"{prefix}/orders.jsonl",
bucket_name="food-data-lake",
replace=True,
)
return {
"orders_count": len(orders),
"desadv_count": len(despatch_advices),
"recadv_count": len(receiving_advices),
}
# ----------------------------------------------------------------
# STEP 5: Estrazione POS retail
# ----------------------------------------------------------------
@task(task_id="extract_pos_data")
def extract_pos_data(**context: Any) -> dict:
"""Estrae dati vendite POS da database retailer partner."""
from src.connectors.retail_connector import RetailPOSConnector
execution_date = context["ds"]
pos = RetailPOSConnector(conn_id="retail_pos_db")
# Vendite per SKU/store con shelf life info
sales = pos.get_daily_sales(
date=execution_date,
include_promotions=True,
include_markdowns=True,
)
# Near-expiry: prodotti a <3 giorni scadenza ancora in shelf
near_expiry = pos.get_near_expiry_stock(
date=execution_date,
days_threshold=3,
)
s3 = S3Hook(aws_conn_id="aws_s3")
prefix = f"bronze/pos_retail/dt={execution_date}"
s3.load_string(
string_data=sales.to_json(orient="records", lines=True),
key=f"{prefix}/sales.jsonl",
bucket_name="food-data-lake",
replace=True,
)
logger.info("POS extraction OK: %d transazioni", len(sales))
return {
"sales_count": len(sales),
"near_expiry_skus": len(near_expiry),
}
# ----------------------------------------------------------------
# STEP 6: dbt run (Silver + Gold transformations)
# ----------------------------------------------------------------
dbt_run_silver = BashOperator(
task_id="dbt_run_silver_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:silver --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
dbt_run_gold = BashOperator(
task_id="dbt_run_gold_layer",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt run --select tag:gold --target prod "
"--vars '{\"execution_date\": \"{{ ds }}\", \"source_env\": \"prod\"}'"
),
retries=2,
)
# ----------------------------------------------------------------
# STEP 7: dbt test (data quality checks)
# ----------------------------------------------------------------
dbt_test = BashOperator(
task_id="dbt_test_all",
bash_command=(
"cd /opt/dbt/food_supply_chain && "
"dbt test --select tag:silver tag:gold --target prod"
),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
# ----------------------------------------------------------------
# STEP 8: Great Expectations validation
# ----------------------------------------------------------------
@task(task_id="run_great_expectations_validation")
def run_ge_validation(**context: Any) -> dict:
"""Lancia Great Expectations per validare data quality cold chain."""
from great_expectations.data_context import get_context
context_ge = get_context(project_root_dir="/opt/great_expectations")
execution_date = context["ds"]
results = {}
suites_to_run = [
("cold_chain_temperature", "bronze_cold_chain_checkpoint"),
("lot_traceability", "silver_lots_checkpoint"),
("kpi_supply_chain", "gold_kpi_checkpoint"),
]
for suite_name, checkpoint_name in suites_to_run:
result = context_ge.run_checkpoint(
checkpoint_name=checkpoint_name,
batch_request={
"runtime_parameters": {"path": f"s3://food-data-lake/bronze/cold_chain/dt={execution_date}"},
"batch_identifiers": {"execution_date": execution_date},
},
)
results[suite_name] = result.success
failed = [name for name, ok in results.items() if not ok]
if failed:
raise ValueError(f"GE validation FAILED per suite: {failed}")
logger.info("Great Expectations: tutte le suite OK")
return results
# ----------------------------------------------------------------
# STEP 9: KPI alert (email se waste rate > soglia)
# ----------------------------------------------------------------
@task(task_id="check_kpi_alerts", trigger_rule=TriggerRule.ALL_SUCCESS)
def check_kpi_alerts(**context: Any) -> None:
"""Controlla KPI critici e invia alert se fuori soglia."""
from src.services.kpi_alert_service import KPIAlertService
execution_date = context["ds"]
alert_service = KPIAlertService(conn_id="snowflake_dw")
kpis = alert_service.get_daily_kpis(date=execution_date)
alerts_sent = []
# Waste rate > 5%: alert critico
if kpis["waste_rate_pct"] > 5.0:
alert_service.send_alert(
severity="HIGH",
message=f"Waste rate {kpis['waste_rate_pct']:.1f}% supera soglia 5%",
kpi="waste_rate",
value=kpis["waste_rate_pct"],
)
alerts_sent.append("waste_rate")
# OTIF < 95%: alert warning
if kpis["otif_pct"] < 95.0:
alert_service.send_alert(
severity="MEDIUM",
message=f"OTIF {kpis['otif_pct']:.1f}% sotto soglia 95%",
kpi="otif",
value=kpis["otif_pct"],
)
alerts_sent.append("otif")
# Cold chain violations > 0: alert immediato
if kpis["cold_chain_violations"] > 0:
alert_service.send_alert(
severity="CRITICAL",
message=f"{kpis['cold_chain_violations']} violazioni temperatura cold chain!",
kpi="cold_chain_violations",
value=kpis["cold_chain_violations"],
)
alerts_sent.append("cold_chain")
logger.info("KPI check completato. Alert inviati: %s", alerts_sent)
# ----------------------------------------------------------------
# DAG wiring: dipendenze tra task
# ----------------------------------------------------------------
extract_sap = extract_sap_erp()
extract_cold = extract_cold_chain_iot()
extract_edi = extract_edi_partners()
extract_pos = extract_pos_data()
ge_validation = run_ge_validation()
kpi_alerts = check_kpi_alerts()
# Health checks prima di tutto
[check_sap_available, check_mes_available] >> extract_sap
# Estrazione parallela di tutte le sorgenti
[extract_sap, extract_cold, extract_edi, extract_pos] >> dbt_run_silver
# Silver prima di Gold
dbt_run_silver >> dbt_run_gold
# Test dopo Gold
dbt_run_gold >> dbt_test
# GE validation e KPI alerts dopo i test
dbt_test >> ge_validation >> kpi_alerts
Dönüşümler için dbt: Gıda KPI'ları için SQL Modelleri
dbt (veri oluşturma aracı), Gümüş ve Altın katmanındaki verileri dönüştürmek için ideal bir araçtır. Entegre test ve dokümantasyon ile gıda tedarik zincirine yönelik ana modelleri görelim.
Schema.yml: Dokümantasyon ve Test dbt
# models/silver/schema.yml
version: 2
models:
- name: silver_lots
description: "Lotti di produzione normalizzati da SAP ERP"
config:
tags: ["silver", "lots", "traceability"]
materialized: incremental
incremental_strategy: merge
unique_key: lot_id
columns:
- name: lot_id
description: "Identificativo univoco lotto (GS1 SGTIN)"
tests:
- not_null
- unique
- name: production_date
tests:
- not_null
- name: expiry_date
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "expiry_date > production_date"
- name: plant_code
tests:
- not_null
- accepted_values:
values: ["PL001", "PL002", "PL003", "PL004"]
- name: temperature_class
tests:
- accepted_values:
values: ["ambient", "chilled", "frozen", "ultra_frozen"]
- name: net_weight_kg
tests:
- dbt_utils.expression_is_true:
expression: "net_weight_kg > 0 AND net_weight_kg < 50000"
- name: silver_shipments
description: "Spedizioni normalizzate da TMS e EDI DESADV"
config:
tags: ["silver", "logistics"]
materialized: incremental
unique_key: shipment_id
columns:
- name: shipment_id
tests: [not_null, unique]
- name: planned_delivery_date
tests: [not_null]
- name: actual_delivery_date
description: "NULL se non ancora consegnato"
- name: otif_flag
description: "True se consegnato On-Time and In-Full"
tests:
- accepted_values:
values: [true, false, null]
quote: false
- name: gold_kpi_supply_chain
description: "KPI giornalieri supply chain aggregati per SKU/plant/customer"
config:
tags: ["gold", "kpi", "dashboard"]
materialized: table
columns:
- name: kpi_date
tests: [not_null]
- name: waste_rate_pct
description: "% prodotto sprecato su prodotto totale"
tests:
- dbt_utils.expression_is_true:
expression: "waste_rate_pct >= 0 AND waste_rate_pct <= 100"
- name: otif_pct
tests:
- dbt_utils.expression_is_true:
expression: "otif_pct >= 0 AND otif_pct <= 100"
Gümüş Model: Üretim Grupları
-- models/silver/silver_lots.sql
-- {{ config(tags=["silver", "lots"], materialized="incremental") }}
WITH source_sap AS (
SELECT
lot_number AS lot_id,
material_number AS sku_code,
plant AS plant_code,
production_date AS production_date,
-- Normalizza data scadenza (SAP usa formato YYYYMMDD)
TO_DATE(CAST(expiry_date_sap AS VARCHAR), 'YYYYMMDD') AS expiry_date,
quantity_kg AS net_weight_kg,
-- Classi temperatura da tabella materiali SAP
CASE
WHEN temp_cond_sap = '0001' THEN 'ambient'
WHEN temp_cond_sap = '0002' THEN 'chilled'
WHEN temp_cond_sap = '0003' THEN 'frozen'
WHEN temp_cond_sap = '0004' THEN 'ultra_frozen'
ELSE 'unknown'
END AS temperature_class,
-- Range temperatura richiesto (gradi Celsius)
CASE
WHEN temp_cond_sap = '0001' THEN STRUCT(15.0 AS min_c, 25.0 AS max_c)
WHEN temp_cond_sap = '0002' THEN STRUCT(2.0 AS min_c, 8.0 AS max_c)
WHEN temp_cond_sap = '0003' THEN STRUCT(-25.0 AS min_c, -15.0 AS max_c)
WHEN temp_cond_sap = '0004' THEN STRUCT(-60.0 AS min_c, -30.0 AS max_c)
END AS temp_range,
batch_status AS lot_status,
_ingestion_timestamp AS ingested_at
FROM {{ source('bronze', 'sap_erp_batches') }}
WHERE lot_number IS NOT NULL
AND quantity_kg > 0
{% if is_incremental() %}
AND _ingestion_timestamp > (
SELECT MAX(ingested_at) FROM {{ this }}
)
{% endif %}
),
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY lot_id
ORDER BY ingested_at DESC
) AS row_num
FROM source_sap
),
final AS (
SELECT
lot_id,
sku_code,
plant_code,
production_date,
expiry_date,
net_weight_kg,
temperature_class,
temp_range,
lot_status,
-- Shelf life totale in giorni
DATEDIFF('day', production_date, expiry_date) AS shelf_life_days,
-- Shelf life residua rispetto a oggi
DATEDIFF('day', CURRENT_DATE, expiry_date) AS shelf_life_remaining_days,
ingested_at
FROM deduplicated
WHERE row_num = 1
AND expiry_date > production_date
)
SELECT * FROM final
Altın Model: KPI Tedarik Zinciri
-- models/gold/gold_kpi_supply_chain.sql
-- {{ config(tags=["gold", "kpi"], materialized="table") }}
WITH lots AS (
SELECT * FROM {{ ref('silver_lots') }}
),
shipments AS (
SELECT * FROM {{ ref('silver_shipments') }}
),
pos_sales AS (
SELECT * FROM {{ ref('silver_pos_sales') }}
),
inventory AS (
SELECT * FROM {{ ref('silver_inventory_snapshot') }}
),
-- KPI 1: Waste Rate (% prodotto sprecato)
waste_kpi AS (
SELECT
production_date AS kpi_date,
plant_code,
-- Prodotto scaduto o smaltito come spreco
SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END) AS wasted_kg,
SUM(net_weight_kg) AS total_produced_kg,
ROUND(
100.0 * SUM(CASE WHEN lot_status IN ('WASTE', 'EXPIRED', 'DOWNGRADE')
THEN net_weight_kg ELSE 0 END)
/ NULLIF(SUM(net_weight_kg), 0),
2
) AS waste_rate_pct
FROM lots
GROUP BY 1, 2
),
-- KPI 2: OTIF (On-Time In-Full)
otif_kpi AS (
SELECT
DATE(planned_delivery_date) AS kpi_date,
customer_code,
COUNT(*) AS total_orders,
SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END) AS otif_orders,
ROUND(
100.0 * SUM(CASE WHEN otif_flag = TRUE THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0),
2
) AS otif_pct
FROM shipments
WHERE actual_delivery_date IS NOT NULL
GROUP BY 1, 2
),
-- KPI 3: Days of Supply (scorta in giorni)
dos_kpi AS (
SELECT
inv.snapshot_date AS kpi_date,
inv.sku_code,
inv.location_code,
inv.stock_qty_units,
COALESCE(avg_daily_sales.avg_daily_units, 0) AS avg_daily_demand,
ROUND(
inv.stock_qty_units
/ NULLIF(avg_daily_sales.avg_daily_units, 0),
1
) AS days_of_supply
FROM inventory inv
LEFT JOIN (
SELECT sku_code, location_code,
AVG(units_sold) AS avg_daily_units
FROM pos_sales
WHERE sale_date >= DATEADD('day', -30, CURRENT_DATE)
GROUP BY 1, 2
) avg_daily_sales
ON inv.sku_code = avg_daily_sales.sku_code
AND inv.location_code = avg_daily_sales.location_code
),
-- KPI 4: Shelf Life at Receipt (qualità al ricevimento)
slr_kpi AS (
SELECT
DATE(s.actual_delivery_date) AS kpi_date,
l.sku_code,
-- % shelf life residua al momento della consegna
ROUND(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0),
1
) AS shelf_life_pct_at_receipt,
-- Media per SKU
AVG(
100.0 * l.shelf_life_remaining_days / NULLIF(l.shelf_life_days, 0)
) OVER (PARTITION BY DATE(s.actual_delivery_date), l.sku_code)
AS avg_slr_pct
FROM shipments s
INNER JOIN lots l ON s.lot_id = l.lot_id
WHERE s.actual_delivery_date IS NOT NULL
)
-- Join finale per dashboard KPI unificato
SELECT
w.kpi_date,
w.plant_code,
w.waste_rate_pct,
w.wasted_kg,
w.total_produced_kg,
o.otif_pct,
o.total_orders,
d.days_of_supply,
d.avg_daily_demand,
s.avg_slr_pct AS shelf_life_at_receipt_pct,
-- Score complessivo supply chain (0-100)
ROUND(
(
(100.0 - COALESCE(w.waste_rate_pct, 0)) * 0.30 +
COALESCE(o.otif_pct, 0) * 0.40 +
LEAST(d.days_of_supply / 14.0 * 100, 100) * 0.15 +
COALESCE(s.avg_slr_pct, 0) * 0.15
),
1
) AS supply_chain_health_score
FROM waste_kpi w
LEFT JOIN otif_kpi o ON w.kpi_date = o.kpi_date
LEFT JOIN dos_kpi d ON w.kpi_date = d.kpi_date
LEFT JOIN slr_kpi s ON w.kpi_date = s.kpi_date
Gıda Verilerine İlişkin Veri Kalitesi: Büyük Beklentiler
Gıda endüstrisinde veri kalitesi yalnızca temizlik meselesi değildir: bu bir gerekliliktir düzenleyici. Sıcaklıklar, son kullanma tarihleri veya parti kodlarıyla ilgili yanlış veriler ortaya çıkabilir FSMA cezaları, çok pahalı geri çağırmalar ve itibar kaybı. Büyük Umutlar şunları yapmanızı sağlar: Bildirime dayalı doğrulama kurallarını tanımlayın ve bunları Airflow hattına entegre edin.
# great_expectations/expectations/cold_chain_temperature_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
context = gx.get_context()
# Crea o aggiorna la suite di aspettative
suite = context.add_or_update_expectation_suite(
expectation_suite_name="cold_chain_temperature_suite"
)
validator = context.get_validator(
batch_request=BatchRequest(
datasource_name="s3_bronze_datasource",
data_connector_name="cold_chain_connector",
data_asset_name="temperature_readings",
),
expectation_suite_name="cold_chain_temperature_suite",
)
# REGOLA 1: Temperatura mai NULL per prodotti refrigerati/surgelati
validator.expect_column_values_to_not_be_null(
column="celsius",
mostly=0.99, # tolleranza 1% per sensori offline temporanei
meta={
"notes": "Sensori IoT cold chain: max 1% di letture mancanti",
"regulatory_reference": "EU Reg 37/2005 cold chain monitoring",
}
)
# REGOLA 2: Range temperatura prodotti refrigerati (2-8°C)
validator.expect_column_values_to_be_between(
column="celsius",
min_value=-30.0,
max_value=35.0,
meta={
"notes": "Range assoluto: include frozen (-25°C) e ambient (25°C)",
}
)
# REGOLA 3: Lot ID sempre presente e formato GS1 SGTIN valido
validator.expect_column_values_to_match_regex(
column="lot_id",
regex=r"^[0-9]{14}$", # GS1 GTIN-14 format
meta={"notes": "GS1 SGTIN-14 standard per tracciabilita alimentare"}
)
# REGOLA 4: Timestamp monotonicamente crescente per sensore
validator.expect_column_values_to_be_increasing(
column="reading_timestamp",
strictly=False, # allow duplicates (es. batch upload)
parse_strings_as_datetimes=True,
meta={"notes": "Verifico che i timestamp non siano retroattivi"}
)
# REGOLA 5: Numero di letture per lotto (almeno 1 ogni 30 min)
validator.expect_table_row_count_to_be_between(
min_value=48, # 24h * 2 letture/ora = 48 minimo
max_value=10000, # max ragionevole per 24h
meta={"notes": "Minimo 1 lettura ogni 30 min per cold chain attiva"}
)
# REGOLA 6: Distribuzione temperatura per classe prodotto
# Prodotti refrigerati: mediana deve essere 2-8°C
validator.expect_column_median_to_be_between(
column="celsius",
min_value=1.0,
max_value=9.0,
meta={
"notes": "Solo per lotti con temperature_class='chilled'",
"applies_to": "filtered_batches_chilled",
}
)
# Salva le aspettative
validator.save_expectation_suite(discard_failed_expectations=False)
# Checkpoint per Airflow
checkpoint = context.add_or_update_checkpoint(
name="bronze_cold_chain_checkpoint",
config_version=1,
template_name=None,
module_name="great_expectations.checkpoint",
class_name="Checkpoint",
run_name_template="%Y%m%d-%H%M%S-cold-chain",
expectation_suite_name="cold_chain_temperature_suite",
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "send_slack_notification_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "{{ var.value.slack_webhook_cold_chain }}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
],
)
Gıda Tedarik Zinciri KPI'ları ve Metrikleri
Veriye dayalı kararlar almanın temelini ve doğru KPI'ları tanımlayın. İşte ölçümler formüller, sektör kıyaslamaları ve gerçekçi hedeflerle gıda tedarik zinciri için temel önemdedir.
| KPI'lar | Formül | Endüstri Karşılaştırması | Sınıfının En İyisini Hedefleyin | Uyarı Eşiği |
|---|---|---|---|---|
| Mükemmel Sipariş Oranı | Mükemmel siparişler / Toplam siparişler * 100 | %85-92 | >%95 | <%90 |
| OTIF (Zamanında-Tam Zamanında) | (Hızlı ve eksiksiz teslimat) / Toplam * 100 | %88-94 | >%97 | <%92 |
| Atık Oranı % | Kg atık / Kg toplam ürün * 100 | %3-8 | <%2 | >%5 |
| Envanter Devri | Yıllık SMM / Ortalama Envanter | 12-20x (taze) | >25x (taze) | <10x |
| Tedarik Günleri (DOS) | Stok mevcut / Ortalama günlük talep | 3-7 gün (taze) | 2-4 gün | >10 gün |
| Doluluk Oranı | Teslim edilen birimler / Sipariş edilen birimler * 100 | %92-96 | >%98 | <%94 |
| Teslim Alma Anındaki Raf Ömrü (SLR) | Teslimata kalan gün sayısı / Toplam raf ömrü * 100 | %60-75 | >%80 | <%60 |
| Soğuk Zincir Uyumluluğu | Sıcaklık ihlali olmayan partiler / Toplam partiler * 100 | %97-99 | >%99,5 | <%98 |
| Tahmin Doğruluğu | 1 - (MAE / Ortalama talep) * 100 | %75-85 | >%90 | <%75 |
| Nakitten Nakde Döngüsü | DIO + DSO - DPO (gün) | 15-35 gün | <15 gün | >45 gün |
Tedarik Zinciri Sağlık Puanı nasıl hesaplanır?
KPI'ları tek bir bileşik puanda ağırlıklandırmak, yönetimin sağlığı izlemesine yardımcı olur tek bir numarayla genel tedarik zinciri:
- OTIF: ağırlık %40 (müşteri memnuniyetine doğrudan etki)
- Atık Oranı: ağırlık %30 (marjlar ve sürdürülebilirlik üzerindeki etki)
- Alındığında Raf Ömrü: ağırlık %15 (tüketici kalitesi)
- Soğuk Zincir Uyumluluğu: ağırlık %15 (gıda güvenliği ve uyumluluğu)
Puan > 90: mükemmel | 80-90: iyi | 70-80: geliştirilecek | <70: kritik
Gerçek Zamanlı ve Toplu İşlem: Hangi Mimari Ne Zaman Kullanılmalı?
Her şeyin gerçek zamanlı olması gerekmiyor. Bir akış mimarisinin maliyeti önemli ölçüde yüksektir geleneksel toplu mimariden daha üstündür. Gıda tedarik zincirinde seçim doğru, kabul edilebilir gecikme süresine ve operasyonel sonuçlara bağlıdır.
| Kullanım Örneği | Mimarlık | Gecikme | Aletler | Motivasyon |
|---|---|---|---|---|
| Soğuk zincir izleme (sıcaklık) | Gerçek zamanlı akış | <1 dakika | Kafka + Flink | Sıcaklık ihlali = anında parti kaybı |
| Geri çağırma yönetimi | Gerçek zamanlı / olay odaklı | <5 dakika | Kafka + uyarı | FSMA: Grupları 2 saatten kısa sürede takip edin |
| GPS araç takibi | Neredeyse gerçek zamanlı | <2 dakika | MQTT + InfluxDB | Müşteriler için güncelleştirilmiş ETA |
| Günlük KPI'lar (OTIF, atık) | Günlük parti | T+2 saat | Hava akışı + dbt | Sabah operasyonel raporu |
| Talep tahmini | Günlük/haftalık toplu | T+6 saat | Hava akışı + ML akışı | Üretim planı gerçek zamanlı gerektirmez |
| Envanter anlık görüntüsü | Mikro parti (her 15 dakikada bir) | 15 dakika | Hava Akışı + Kar Tanesi | Depo operatörleri: yeterli görünürlük |
| Haftalık atık analizi | Haftalık parti | T+24 saat | dbt + BI aracı | Operasyonel değil stratejik kararlar |
| FSMA uyumluluk denetimi | Talep üzerine toplu | Talep üzerine | dbt + veri kasası | Günlük değil, planlı denetimler |
# Architettura Lambda per Food Supply Chain
# Speed layer (real-time) + Batch layer (accuracy)
# SPEED LAYER: Kafka + Apache Flink
# Processa eventi in milliseconds: cold chain, recall alerts
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("cold-chain-readings") \
.set_group_id("flink-cold-chain-consumer") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
cold_chain_stream = env.from_source(
kafka_source,
watermark_strategy=WatermarkStrategy.for_bounded_out_of_orderness(
Duration.of_seconds(30)
),
source_name="cold-chain-kafka",
)
# Filtra violazioni temperatura in tempo reale
violations_stream = cold_chain_stream \
.map(lambda msg: json.loads(msg)) \
.filter(lambda r: (
r.get("temperature_class") == "chilled" and
(r["celsius"] < 2.0 or r["celsius"] > 8.0)
)) \
.map(lambda r: {
**r,
"violation_severity": "CRITICAL" if r["celsius"] > 12.0 else "WARNING",
"violation_detected_at": datetime.utcnow().isoformat(),
})
# Scrivi violations su Kafka alert topic
violations_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("cold-chain-violations")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
env.execute("food-cold-chain-monitoring")
Eski Sistemlerle Entegrasyon
Tarım-gıda şirketlerinin çoğu eski sistemlerle çalışıyor: Yılın SAP R/3'ü '90, SFTP üzerinden EDIFACT EDI dosyaları, REST API'siz Oracle veritabanı. Bunların entegrasyonu Sistemler özel yaklaşımlar gerektirir.
SAP Bağlayıcısı: RFC ve BAPI
# src/connectors/sap_connector.py
"""
Connettore SAP ERP via pyrfc (SAP RFC/BAPI wrapper).
Richiede: SAP NetWeaver RFC SDK + pyrfc library.
"""
import pyrfc
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class SAPConnectionConfig:
ashost: str
sysnr: str
client: str
user: str
passwd: str
lang: str = "IT"
class SAPConnector:
"""Wrapper per SAP RFC/BAPI calls per dati supply chain."""
def __init__(self, config: SAPConnectionConfig):
self._config = config
self._conn: Optional[pyrfc.Connection] = None
def __enter__(self):
self._conn = pyrfc.Connection(
ashost=self._config.ashost,
sysnr=self._config.sysnr,
client=self._config.client,
user=self._config.user,
passwd=self._config.passwd,
lang=self._config.lang,
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn:
self._conn.close()
def get_production_batches(
self,
date_from: str,
date_to: str,
plants: list[str],
) -> pd.DataFrame:
"""
Estrae lotti di produzione da SAP tramite BAPI_BATCH_GET_DETAIL.
Args:
date_from: Data inizio formato YYYYMMDD
date_to: Data fine formato YYYYMMDD
plants: Lista codici stabilimento SAP
"""
all_batches = []
for plant in plants:
try:
# Chiama BAPI SAP per lista lotti
result = self._conn.call(
"BAPI_BATCH_GET_DETAIL",
PLANT=plant,
DATE_FROM=date_from,
DATE_TO=date_to,
BATCH_STATUS="", # tutti gli stati
)
batches = result.get("BATCH_DETAIL_LIST", [])
logger.info(
"SAP plant %s: %d lotti estratti",
plant,
len(batches),
)
for batch in batches:
all_batches.append({
"lot_id": f"{batch['MATNR']}-{batch['CHARG']}",
"material_number": batch["MATNR"].strip(),
"lot_number": batch["CHARG"].strip(),
"plant": plant,
"production_date": batch.get("HSDAT"),
"expiry_date_sap": batch.get("VFDAT"),
"batch_status": batch.get("ZUSTD"),
"quantity_kg": float(batch.get("CLABS", 0)),
"temp_cond_sap": batch.get("MHDRZ", "0001"),
"_source": "SAP_ERP",
"_extracted_at": pd.Timestamp.utcnow().isoformat(),
})
except pyrfc.ABAPApplicationError as e:
logger.error(
"SAP BAPI error per plant %s: %s",
plant,
str(e),
)
# Non fallisce tutta l'estrazione per un singolo plant
continue
return pd.DataFrame(all_batches)
def get_stock_movements(
self,
date_from: str,
date_to: str,
movement_types: list[str],
) -> pd.DataFrame:
"""
Estrae movimenti di magazzino SAP via MB51 BAPI equivalent.
movement_types: ['101'=GR, '261'=GI to production, '601'=GI to customer]
"""
result = self._conn.call(
"BAPI_GOODSMVT_GETDETAIL",
GOODSMVT_DATE_FROM=date_from,
GOODSMVT_DATE_TO=date_to,
)
movements = []
for item in result.get("GOODSMVT_ITEMS", []):
if item["BWART"] in movement_types:
movements.append({
"movement_id": f"{item['MBLNR']}-{item['ZEILE']}",
"material": item["MATNR"].strip(),
"plant": item["WERKS"],
"movement_type": item["BWART"],
"quantity": float(item.get("MENGE", 0)),
"unit": item.get("MEINS", "KG"),
"lot_number": item.get("CHARG", ""),
"posting_date": item["BUDAT"],
"_source": "SAP_MOVEMENTS",
})
return pd.DataFrame(movements)
Eski Veritabanlarından Akış için Debezium'lu CDC
# debezium-config/oracle-erp-connector.json
# Configurazione Debezium per CDC da Oracle EBS (ERP legacy)
{
"name": "oracle-erp-food-supply-chain",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle-erp.internal",
"database.port": "1521",
"database.user": "debezium_cdc",
"database.password": "${ORACLE_CDC_PASSWORD}",
"database.dbname": "FOOD_ERP",
"database.pdb.name": "FOOD_PDB",
"database.server.name": "oracle-erp",
"table.include.list": "FOOD_ERP.INV_TRANSACTIONS,FOOD_ERP.MTL_LOT_NUMBERS,FOOD_ERP.WSH_DELIVERY_DETAILS",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.oracle.food_erp",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.oracle.food_erp",
"transforms": "route,addFields",
"transforms.route.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.addFields.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addFields.static.field": "_cdc_source",
"transforms.addFields.static.value": "oracle-erp",
"include.schema.changes": "true",
"snapshot.mode": "initial_only",
"decimal.handling.mode": "double",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"topic.prefix": "food-cdc"
}
}
Perakende Siparişler için EDIFACT EDI Ayrıştırıcı
# src/connectors/edi_connector.py
"""
Parser EDI EDIFACT per messaggi ORDERS, DESADV, RECADV (GS1 subset).
Usa pydifact library per parsing EDIFACT messages.
"""
import pandas as pd
from pydifact.segmentcollection import SegmentCollection
from pydifact.segments import Segment
from pathlib import Path
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
def parse_edifact_orders(file_path: str | Path) -> pd.DataFrame:
"""
Parsa un file EDI EDIFACT ORDERS D.01B (GS1 EANCOM).
Restituisce DataFrame con righe ordine normalizzate.
Struttura messaggio ORDERS EDIFACT:
UNB (Interchange header)
UNH (Message header)
BGM (Order message identifier)
DTM (Date/time)
NAD+BY (Buyer = retailer)
NAD+SU (Supplier = nostro sistema)
LIN (Line item)
+QTY (Quantity ordered)
+DTM (Requested delivery date)
+PIA (Product code GTIN)
UNT (Message trailer)
"""
file_content = Path(file_path).read_text(encoding="latin-1")
collection = SegmentCollection.from_str(file_content)
orders = []
current_order: dict = {}
current_line: dict = {}
for segment in collection.segments:
tag = segment.tag
if tag == "BGM":
# Inizio nuovo ordine
current_order = {
"order_number": segment.elements[1] if len(segment.elements) > 1 else None,
"document_type": segment.elements[0][0] if segment.elements else None,
"lines": [],
}
elif tag == "DTM" and current_order:
# Data ordine o data consegna richiesta
qualifier = segment.elements[0][0]
date_str = segment.elements[0][1]
date_fmt = segment.elements[0][2]
if qualifier == "137": # Document date
try:
current_order["order_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
logger.warning("DTM parse error: %s", date_str)
elif qualifier == "2": # Requested delivery date
try:
current_order["requested_delivery_date"] = datetime.strptime(
date_str, "%Y%m%d"
).date().isoformat()
except ValueError:
pass
elif tag == "NAD":
qualifier = segment.elements[0]
party_id = segment.elements[1][0] if len(segment.elements) > 1 else None
party_name = segment.elements[3] if len(segment.elements) > 3 else None
if qualifier == "BY": # Buyer (retailer)
current_order["buyer_gln"] = party_id
current_order["buyer_name"] = party_name
elif qualifier == "SU": # Supplier
current_order["supplier_gln"] = party_id
elif tag == "LIN":
# Salva riga precedente se esiste
if current_line and current_order:
current_order["lines"].append(current_line)
current_line = {
"line_number": segment.elements[0],
"gtin": None,
"qty_ordered": None,
"delivery_date": None,
}
elif tag == "PIA" and current_line:
# Product identification: GTIN-13 o GTIN-14
if len(segment.elements) > 1:
qualifier = segment.elements[1][1] if len(segment.elements[1]) > 1 else ""
if qualifier in ["SRV", "GE1"]: # GTIN qualifiers
current_line["gtin"] = segment.elements[1][0]
elif tag == "QTY" and current_line:
qualifier = segment.elements[0][0]
if qualifier == "21": # Ordered quantity
current_line["qty_ordered"] = float(segment.elements[0][1])
current_line["qty_unit"] = segment.elements[0][2] if len(segment.elements[0]) > 2 else "PCE"
# Normalizza output
for order in orders:
for line in order.get("lines", []):
orders.append({
"order_number": order.get("order_number"),
"order_date": order.get("order_date"),
"requested_delivery_date": order.get("requested_delivery_date"),
"buyer_gln": order.get("buyer_gln"),
"buyer_name": order.get("buyer_name"),
"supplier_gln": order.get("supplier_gln"),
"line_number": line.get("line_number"),
"gtin": line.get("gtin"),
"qty_ordered": line.get("qty_ordered"),
"qty_unit": line.get("qty_unit"),
"_source": "EDI_EDIFACT_ORDERS",
"_parsed_at": datetime.utcnow().isoformat(),
})
return pd.DataFrame(orders)
Örnek Olay: 500 Çiftlikli Tarım Kooperatifi
Bu modellerin gerçek bir durumda nasıl uygulandığını görelim: büyük bir tarım kooperatifi 500 ilişkili tarım şirketi, 3 işleme ve işleme merkezi ile Kuzey İtalya, ve büyük ölçekli perakende ticaret ile yerel pazarlar arasında 200 satış noktası.
Kooperatif profili
- 500 ilişkili çiftlik: meyve ve sebzeler, tahıllar, baklagiller - 8.000 sensörden alınan IoT MQTT verileri
- 3 işleme merkezi: SAP R/3 mirası (2005), özel MES, LIMS
- 200 satış noktası: Heterojen POS (3 farklı satıcı), 15 perakendeciyle EDI EDIFACT
- Başlangıç durumu: Excel sayfaları, silolanmış veriler, uçtan uca görünürlük yok
- Hedefler: Atığı %40 oranında azaltın, OTIF'yi %95'e yükseltin, FSMA uyumluluğu
Aşama 1: Değerlendirme ve Kaynak Envanteri (1-4. Haftalar)
# Inventario sorgenti dati: output assessment
SOURCE_INVENTORY = {
"farm_iot": {
"count": 8000,
"protocol": "MQTT v3.1.1",
"broker": "Mosquitto on-premise",
"format": "JSON custom",
"issues": [
"Schema non standardizzato tra vendor IoT diversi",
"15% sensori con timestamp errati (clock drift)",
"3 farm su 500 senza connettivita stabile (4G intermittente)",
],
"recommended_fix": "Normalizzazione schema + edge buffer (Mosquitto local)",
},
"sap_erp": {
"version": "SAP R/3 4.7 (anno 2005)",
"tables_relevant": ["MCHA", "MCH1", "MSEG", "MKPF", "VBAK", "VBAP"],
"issues": [
"No REST API: solo RFC/BAPI",
"Batch extraction window: 01:00-03:00 (finestra ristretta)",
"Codifica caratteri: ISO-8859-1 (non UTF-8)",
"Date in formato SAP (YYYYMMDD come intero)",
],
"recommended_fix": "pyrfc + ETL incremental con delta timestamp",
},
"edi_partners": {
"partners": 15,
"standards": ["EANCOM D.01B", "X12 850/856", "GS1 XML 3.1"],
"transport": "SFTP (12 partner), AS2 (3 partner)",
"issues": [
"3 retailer usano EDIFACT non-standard (varianti proprietarie)",
"Mancanza di RECADV da 5 retailer: no conferma ricezione",
],
},
"pos_retail": {
"vendors": ["Cassa Easy", "NCR Aloha", "Custom PHP legacy"],
"issues": [
"3 formati CSV diversi",
"Nessun campo shelf_life nei dati POS",
"Granularità: ora (non transazione)",
],
},
}
Aşama 2: Teknik Yığın Uygulaması (5-16. Haftalar)
# Stack tecnico implementato per la cooperativa
INFRASTRUCTURE = {
"data_lake": "AWS S3 (Medallion: Bronze/Silver/Gold)",
"compute": "AWS EMR Serverless (Spark per bulk load iniziale)",
"warehouse": "Snowflake (pay-per-query, snowpark per ML features)",
"orchestration": "Apache Airflow 2.9 su AWS MWAA (managed)",
"transformation": "dbt Cloud (Team plan: 8 developers)",
"data_quality": "Great Expectations + Slack alerts",
"streaming": "Apache Kafka su MSK (managed) per cold chain",
"monitoring": "Grafana + Prometheus (Airflow metrics, DWH queries)",
"ci_cd": "GitHub Actions (dbt CI: test su ogni PR)",
"secret_management": "AWS Secrets Manager",
}
TIMELINE = [
{"week": "1-4", "milestone": "Assessment sorgenti + architettura design"},
{"week": "5-7", "milestone": "Bronze layer: connettori SAP + IoT MQTT"},
{"week": "8-10", "milestone": "Bronze layer: EDI + POS + LIMS"},
{"week": "11-12","milestone": "Silver layer: dbt models + data quality"},
{"week": "13-14","milestone": "Gold layer: KPI dashboard + alerting"},
{"week": "15-16","milestone": "UAT + go-live + team training"},
}
TEAM = {
"data_engineers": 3,
"dbt_developers": 2,
"sap_consultant": 1, # part-time per RFC/BAPI
"project_manager": 1,
}
6 Ay Sonra Elde Edilen Sonuçlar
| KPI'lar | Önce (temel) | 6 ay sonra | Gelişim |
|---|---|---|---|
| Atık Oranı | %7,2 | %4,3 | -%40 atık (-2,9 puan) |
| OTIF | %88 | %94 | +6 puan |
| Teslim Alma Anındaki Raf Ömrü | %58 | %74 | +16pp |
| Soğuk Zincir Uyumluluğu | %94 (tahmini) | %99,1 (ölçülen) | Gerçek görünürlük ile +5,1 puan |
| Çözüm süresini geri çağırma | 48-72 saat | 2-4 saat | -93% yanıt süresi |
| Yönetici el kitabı raporu | 3-4 saat/hafta | Otomatik kontrol paneli | -100% manuel çalışma |
| Tahmin doğruluğu sorusu | %71 | %86 | +15pp |
| Tedarik Günleri (taze) | 8-12 gün | 4-6 gün | -%50 fazla stok |
Vaka çalışmasından öğrenilen dersler
- SAP RFC/BAPI düşündüğünüzden daha yavaş: gecelik toplu iş penceresi en kritik darboğazdı. Çözüm, BT ile bir pencere için pazarlık yapmaktı daha geniş tutun ve hacmi en aza indirmek için artımlı ekstraksiyon kullanın.
- EDI hiçbir zaman standart değildir: 15 perakendeci ortağından 11'inde varyasyonlar vardı EANCOM standardının sahipleri. EDIFACT ayrıştırıcısının tamamlanması 2 hafta sürdü her bir ortağın spesifikasyonlarına göre kalibrasyon.
- Veri kalitesi yalnızca teknik bir sorun değil, kültürel bir sorundur: %30'u Operatörlerin SAP'ye hatalı manuel girişlerinden kaynaklanan kalite sorunları. Teknik çözüm (Büyük Beklentiler) sorunu vurguladı ancak çözüm eğitim ve değişim yönetimi gereklidir.
- Bronz katman hepsini kurtarır: altı ayda üç kez gerekliydi dbt modellerindeki hataları düzeltmek için Silver dönüşümlerini yeniden çalıştırın. sayesinde Bronz katman değişmez, hiçbir veri kaybolmaz.
- dbt CI/CD çalışma şeklinizi değiştirir: her modeli her cihazda test edin PR ilk ayda tahmini 4 üretim olayını önledi.
FoodTech Serisinin Tam Özeti
Bu, sektörü dönüştüren teknolojiler arasında uzun bir yolculuk oldu yiyecek. İşte bu 10 makalelik seride birlikte derlediklerimiz:
| # | Öğe | Öğrenilen temel kavramlar | Teknoloji yığını |
|---|---|---|---|
| 00 | Hassas Tarım için IoT Boru Hattı | MQTT komisyoncusu, saha sensörleri, veri gölü alımı, QoS seviyeleri | Python, MQTT, Sivrisinek, MinIO |
| 01 | Mahsul Hastalıklarının Tespiti için ML Edge | TensorFlow Lite niceleme, uç çıkarım, ARM'de model dağıtımı | TFLite, Ahududu Pi, Python, OpenCV |
| 02 | AgriTech için Uydu ve Meteorolojik API'ler | Sentinel-2 NDVI, Planet Labs API, ML için hava durumu özelliği mühendisliği | Sentinel Hub, OpenMeteo, rasterio, GeoPandas |
| 03 | Gıda Takip Sistemi | Özel blockchainler (Hyperledger), RFID EPCIS, GS1 standartları | Hyperledger Yapısı, RFID, EPCIS, Python |
| 04 | Kalite Kontrolü için Bilgisayarlı Görme | Ürün veri kümelerinde YOLO ince ayarı, endüstriyel boru hattı çıkarımı | PyTorch, YOLO, FastAPI, OpenCV |
| 05 | FSMA 204 Otomasyon | KDE, CTE'ler gıdaları, parti izlenebilirliğini, geri çağırma otomasyonunu gerektiriyordu | Python, FastAPI, PostgreSQL, uyarı |
| 06 | Dikey Tarım Otomasyonu | Çiftlikler için çevresel parametre kontrolü, robotik planlama, REST API | Python, FastAPI, InfluxDB, MQTT |
| 07 | Atıkların Azaltılmasına Yönelik Talep Tahmini | LSTM, Prophet, gıda mevsimselliği özelliği mühendisliği, geriye dönük test | PyTorch, Peygamber, MLflow, pandalar |
| 08 | Çiftlik IoT için Gerçek Zamanlı Kontrol Paneli | Açısal sinyaller, Grafana provizyonu, InfluxDB sorguları, uyarı | Açısal 21, Grafana, InfluxDB, WebSocket |
| 09 | Çiftlikten Perakendeciye Tedarik Zinciri ETL'si | Madalyon mimarisi, Hava Akışı DAG'leri, dbt modelleri, Büyük Beklentiler, KPI'lar | Hava akışı, dbt, Kar Tanesi, GE, SAP RFC, EDI |
Daha fazlasını öğrenmek isteyenler için yol haritası
Serinin tamamını okuduysanız artık gerçek FoodTech projeleriyle baş etmek için sağlam bir temele sahipsiniz demektir. Her bir alanı daha derinlemesine incelemek için yapılandırılmış bir yol haritası aşağıda verilmiştir:
Önerilen derinlemesine kurs
- Nesnelerin İnterneti ve Uç Bilgi İşlem: MQTT 5.0'ı inceleyin (bunda kullanılan 3.1.1'e kıyasla) makalesi), yönetilen çözümler için Azure IoT Hub veya AWS IoT Core ve veriler için Apache NiFi heterojen IoT kaynaklarından alım.
- Üretimdeki makine öğrenimi: bu blogun MLOps serisi size nasıl yapılacağını öğretecek sürüm modelleri, MLflow ve Seldon ile A/B testi yapın ve üretimdeki sapmayı izleyin.
- Veri mimarisi: Data Lakehouse (Veri ve Yapay Zeka serisi) hakkında daha fazla bilgi edinin Business ) Snowflake, Databricks ve Apache Iceberg'in nasıl karşılaştırıldığını anlamak için tedarik zinciri analitiği platformları.
- Yüksek Lisans ve Tedarik Zinciri için Üretken Yapay Zeka: Büyük Dil Modelleri şunları yapabilir: Uyumluluk belgelerinden yapılandırılmış bilgileri çıkarın, tedarikçi sözleşmelerini analiz edin ve otomatik raporlar oluşturun. Blogun AI Engineering serisi RAG ve ince ayarları kapsıyor.
- İncelenecek endüstri standartları: GS1 Küresel Standartları (GTIN, GLN, SSCC, EPCIS), FSMA 204 (FDA), AB Reg. 178/2002 (gıda izlenebilirliği), ISO 22000 (HACCP).
- İlgili sertifikalar: AWS Sertifikalı Veri Mühendisi, dbt Sertifikalı Geliştirici, Astronom Sertifikalı Apache Hava Akışı Geliştiricisi, Databricks Veri Mühendisi Yardımcısı.
Gıda ETL'sine Yönelik En İyi Uygulamalar ve Anti-Kalıplar
En İyi Uygulamalar
- Bronz katman değişmez: ham verileri asla değiştirmeyin. Bir hata bulursanız Silver dbt modelinde modeli düzeltin ve yeniden çalıştırın; Bronze bozulmadan kalır. Bu, FSMA ve AB Reg. 178/2002 denetim izleri.
-
Hava Akışı DAG'lerinin Bağımsızlığı: her görev yeniden yürütülebilmelidir
yan etkileri olmadan. Amerika
replace=TrueS3 yüklemeleri için,MERGEyerineINSERTveritabanları için ve artımlı dbt modellerinde benzersiz anahtarlar. -
Her görevde açık zaman aşımları: SAP RFC gibi eski sistemler şunları yapabilir:
süresiz olarak engelle. Her zaman ayarla
execution_timeout=timedelta(hours=2)her Hava Akışı görevinde. - Besleme gecikmesi izleme: 02:00'de başlayan bir boru hattı ancak saat 06:00'ya kadar (operasyonel vardiyadan önce) tamamlanmaz ve kullanılamaz. Monitör i tamamlanma sürelerini belirleyin ve Airflow'ta SLA uyarılarını ayarlayın.
- Ortaklar için EDI şeması ayrımı: özel bir EDI ayrıştırıcıyı koruyun her perakendeci ortağı için. Tescilli değişkenler ayrıştırıcıyı imkansız hale getirir benzersiz evrensel.
- Belgelenen soy tarihi: dbt otomatik olarak bir grafik oluşturur soy. Denetim sorularını yanıtlamak için bunu kullanın: "Bu OTIF KPI nereden geliyor?".
Kaçınılması Gereken Anti-Desenler
- Ekstraksiyon katmanındaki dönüşümler (klasik ETL): dönüştürmek Verileri Bronz katmana kaydetmeden önce yapma olanağını kaybetmek anlamına gelir hataları doldurun veya düzeltin. Her zaman ham yükleyin, daha sonra dbt ile dönüştürün.
- Her şey için yekpare bir DAG: 50'den fazla göreve sahip tek bir DAG, hata ayıklamak imkansızdır. Etki alanına göre bölün (SAP DAG, IoT DAG, EDI DAG) ve kullanın Çapraz DAG bağımlılıkları için Airflow TriggerDagRunOperator.
- Saat dilimlerinin göz ardı edilmesi: Sicilya'da bir çiftlik, bir dağıtım merkezi Milano'daki bir perakendeci ile Almanya'daki bir perakendeci farklı saat dilimlerini kullanıyor. Her zaman UTC'ye dönüştür Bronz katmanda bulunur ve her yerde saat dilimine duyarlı zaman damgaları kullanır.
- DAG'lerde sabit kodlanmış kimlik bilgileri: ASLA SAP şifrelerini veya API belirteçlerini girmeyin Hava akışı kodunda. Airflow Connections and Variables'ı veya AWS Secrets Manager'ı kullanın.
- Bronze'da veri kalitesini atlayın: yalnızca Altın katmanını ve çok fazlasını doğrulayın geç. Sorunlar mümkün olan en kısa sürede tespit edilmelidir: yanlış sıcaklık ölçümü Bronz'da bu, Altın'da yanlış pozitif geri çağırmaya dönüşür.
- Yürütücü olarak Hava Akışı Zamanlayıcısı: görev yürütülmemelidir Hava Akışı planlayıcısının kendisinden. Her zaman ayrı bir Yürütücü kullanın (CeleryExecutor, Ağır iş yüklerini izole etmek için KubernetesExecutor).
Sonuçlar: Bir Yolun Sonu, Diğerinin Başlangıcı
FoodTech serisinin sonuna geldik. On makalede parça inşa ettik Bir gıda tedarik zincirinin parça parça vizyonu tamamen veriye dayalı: Sahada gerçek zamanlı veri aktaran sensörler, hastalıkları tespit eden ML modelleri insan gözünün önünde, değişmez izlenebilirlik için blockchain, kalite kontrol, otomatik FSMA uyumluluğu, API aracılığıyla kontrol edilen dikey çiftlikler, israfı azaltan talep tahmini, yönetim için gerçek zamanlı gösterge tabloları ve bu son makale, hepsini bir arada tutan ETL boru hattı.
Tarım-gıda sektörü dijitalleştirilmesi en karmaşık sektörlerden biridir: öngörülemeyen mevsimsellik, çabuk bozulan ürünler, sıkı düzenlemeler, onlarca yıldır birleştirilen eski sistemler, tedarik zincirleri Yüzlerce oyuncuyla küresel. Ancak tam da bu nedenle değer yaratma fırsatları teknolojiyle birlikte çok büyükler. Bir kooperatifte atık oranını %7'den %4'e düşürmek 500 çiftlikle bu sadece operasyonel bir gelişme değil: bu para, yiyecek ve çevresel bir etki.
2030'un gıda tedarik zinciri şu şekilde tanımlanacak: gerçek zamanlı veriler (her parti ekimden çatala kadar izlenir), Tahmine Dayalı Yapay Zeka (Sıfır fazla stok, sıfır stok yokluğu, sıfır atık), otomatik uyumluluk (FSMA denetimi 2 hafta değil, 2 saat içinde) e ölçülebilir sürdürülebilirlik (Part başına karbon ayak izi, hektar başına su kullanımı).
Teknolojilerin hepsi orada. Bu makalede öğrendiğiniz ETL modeli — Hava Akışı orkestrasyon için, dönüşümler için dbt, veri kalitesi için Büyük Beklentiler, Madalyon yapı mimarisi — ve herhangi bir tarım-gıda şirketinde hemen uygulanabilir orta veya büyük boyutludur.
İlgili diğer blog serilerini keşfedin
- Veri ve Yapay Zeka İşi (Seri 14): veri ambarını derinlemesine inceliyor (Snowflake, Databricks, BigQuery), dbt ve Airbyte ile ETL/ELT, MLOps kurumsal ve veri yönetimi — bu FoodTech serisinin doğal tamamlayıcısı.
- MLOps (seri 5): Talep üzerine makine öğrenimi modellerinin üretime nasıl geçirileceği 01, 04 ve 07. maddelerde oluşturduğumuz tahmin ve bilgisayar görüşü.
- Yapay Zeka Mühendisliği/RAG (Seri 6): sorgulamak için LLM nasıl kullanılır uyumluluk belgeleri, HACCP sertifikalarını analiz edin ve otomatik raporlar oluşturun Tedarik zinciri için.
- PostgreSQL AI (10 Serisi): anlamsal arama için pgvector nasıl kullanılır ürün açıklamaları, tarifler ve gıda teknik özellikleri hakkında - aşağıdakiler için faydalıdır: Gıda sektöründe ürün kataloğunun yönetimi.
- EnergyTech (seri): birçok IoT teknolojisi ve veri hattı FoodTech serisi, enerji izleme için kullanılanlarla aynıdır endüstriyel gıda tesisleri.
FoodTech serisini başından sonuna kadar takip ettiğiniz için teşekkür ederiz. Mutlu kodlamalar ve afiyet olsun.







