Prognóza poptávky po redukci odpadu: ML a časové řady ve FoodTech
Každý rok svět vyprodukuje cca 1,05 miliardy tun potravin vyplýtváno, podle Zpráva UNEP Food Waste Index Report 2024. Z toho 60 % pochází z rodin, 28 % z restaurací a 12 % z distribuce ve velkém měřítku. Přeloženo do ekonomických termínů: dále 1 bilion dolarů hořel každý rok, s dopadem na životní prostředí, který vytváří 10 % celosvětových emisí plynů skleník, téměř pětkrát více než celé světové letectví.
Přesto paradoxně, zatímco miliarda porcí jídla se denně vyplýtvá, 783 miliony lidí trpí hladem. Kořen problému není jen kulturní nebo logistický: a v podstatě problém prognózování poptávky. Velké maloobchodní objednávky příliš mnoho Neriskujte zásoby. Dodavatelé nadprodukují, aby byli v bezpečí. Výsledkem je, že každý odkaz potravního řetězce hromadí nárazníky, které se stávají odpadem.
V Itálii, Gadda Law (č. 166/2016) zavedl daňové pobídky pro dárcovství přebytků potravin a zjednodušené postupy zpětného získávání. Na evropské úrovni strategie Z farmy do Fork si klade za cíl snížit plýtvání potravinami na polovinu do roku 2030 se závaznými cíli pro maloobchodníky a potravinářský průmysl. Regulace vytváří naléhavost, ale strojové učení přináší výsledky konkrétní nástroje k dosažení těchto cílů.
Předpovídání poptávky pomocí ML není teorie: maloobchodníci, kteří implementovali modely LSTM a Temporal Fusion Transformátory hlásí snížení MAPE z 28 % typických pro tradiční metody na 5–15 %, což má za následek 25-40% snížení odpadu a měřitelná návratnost investic do 12 měsíců. V tomto článku budeme stavět kompletní potrubí, od nezpracovaných dat až po produkční model, analyzující každou architektonickou volbu s fungujícím kódem Pythonu.
Co se dozvíte v tomto článku
- Ekonomický a regulační rámec pro plýtvání potravinami v roce 2025
- Klasické statistické modely: ARIMA, SARIMA, Holt-Winters a Prophet s kódem Python
- Hluboké učení pro časové řady: LSTM, GRU, Temporal Fusion Transformer a N-BEATS
- Pokročilé inženýrství funkcí: exogenní proměnné, cyklické kódování, funkce zpoždění
- Kompletní end-to-end potrubí ML s ověřováním vpřed
- Srovnávací benchmark na reálném datovém souboru: MAPE, RMSE, MAE, doba školení
- Integrace prognóz se správou zásob a dynamickým stanovením cen
- Algoritmus optimalizace markdown pro produkty těsně před expirací
- Případová studie italského velkoprodeje: 200+ prodejních míst, 35% snížení odpadu
- Obchodní metriky: Míra snížení odpadu, Přesnost prognózy, Efektivita snížení
FoodTech Series: 10 článků o AI a technologii v potravinářském průmyslu
| # | Položka | Soustředit |
|---|---|---|
| 1 | Úvod do FoodTech | Přehled ekosystémů a klíčové technologie |
| 2 | IoT a senzory v potravinovém řetězci | Datový kanál z pole do cloudu |
| 3 | Počítačové vidění pro kvalitu a kontrolu | Klasifikace vad a automatické třídění |
| 4 | Blockchain a sledovatelnost | Bezpečnost potravin od farmy až po vidličku |
| 5 | Vertikální zemědělství a AgriTech | Kontrolovaná kultivace s ML |
| 6 | Optimalizace dodavatelského řetězce | Logistika, chladící řetězec a transparentnost |
| 7 | Panel řízení farmy | Monitorování zemědělských společností v reálném čase |
| 8 | Jste zde – Prognóza poptávky a snižování odpadu | ML časová řada, LSTM, TFT, dynamické oceňování |
| 9 | Satelitní API a přesné zemědělství | NDVI, dálkový průzkum Země, sledování plodin |
| 10 | ML Edge pro monitorování plodin | Vestavěná inference na polních zařízeních |
Problém plýtvání potravinami: Data a předpisy 2025
Abychom pochopili, proč se prognózování poptávky pomocí ML stalo strategickou prioritou ve FoodTech, musíte začít od skutečných čísel. The Zpráva UNEP o indexu potravinového odpadu 2024 prozrazuje to v roce 2022 (minulý rok s kompletními údaji) bylo vyplýtváno 1,05 miliardy tun potravin, 132 kg na osobu a rok a téměř pětina všech potravin dostupných spotřebitelům.
Globální dopad plýtvání potravinami (UNEP 2024)
| Indikátor | Hodnota | Zdroj |
|---|---|---|
| Každoroční plýtvání potravinami | 1,05 miliardy tun | Index potravinového odpadu UNEP 2024 |
| Ztráta ekonomické hodnoty | ~1 bilion dolarů ročně | Odhad FAO/Světové banky |
| % emisí skleníkových plynů | 8-10 % celosvětově | UNEP 2024 |
| Odpad na hlavu | 132 kg/osoba/rok | UNEP 2024 |
| Maloobchodní podíl ve velkém maloobchodu | 12 % z celkového odpadu | UNEP 2024 |
| Rodinný podíl | 60 % z celkového počtu | UNEP 2024 |
| Kvóta stravovacích služeb | 28 % z celkového počtu | UNEP 2024 |
| Ztráty před maloobchodem (dodavatelský řetězec) | 13 % vyrobených potravin | FAO |
Regulační rámec: od zákona Gadda přes farmu až po vidličku
Itálie byla průkopníkem v Evropě s Zákon 166/2016 (Gaddův zákon), která má zavedl odměňující spíše než represivní přístup k plýtvání potravinami. Klíčové body jsou:
- Daňové pobídky pro ty, kteří darují potravinové přebytky: ekvivalentní ničení pro výpočet IRPEF/IRES
- Byrokratické zjednodušení: odstranění poplatků za dokumenty za dary do výše na 15 000 eur/rok
- Propagace „psí tašky“ v restauracích a prodej produktů v blízkosti termín s transparentními slevami
- Výchova k jídlu ve školách jako strukturální preventivní opatření
Na evropské úrovni, Strategie z farmy na vidličku (součást Zelené dohody) stanoví cíle závazné: 50% snížení odpadu na úrovni maloobchodu a spotřeby do roku 2030 s opatřeními povinný pro maloobchod ve velkém měřítku, který postupně vstoupí v platnost od roku 2025-2026. Pro i prodejce s více než 250 zaměstnanci se stává povinným každoroční podávání zpráv o plýtvání potravinami.
Regulační dopad na podnikové strategie
Kombinace pobídek (Gadda Law), ohlašovacích povinností (Farm to Fork) a tlaku ze strany spotřebitelé vytváří jasný obchodní případ pro investice do prognózování poptávky s ML. Ne ano je to všechno o udržitelnosti: pro maloobchodníka s 200 obchody a maržemi 2–3 % snižte 35 % odpadu odpovídá znovuzískání 50–80 základních bodů provozní marže.
Proč je prognóza poptávky po potravinách obtížná
Předpovídání poptávky v potravinářském průmyslu představuje jedinečné výzvy, které z něj dělají jednu z nich složitější problémy ve strojovém učení aplikované na podnikání. Strukturální charakteristiky je jich pět, které jej odlišují od ostatních maloobchodních sektorů.
1. Pokazitelnost a úzké okno prodeje
Část oblečení, která se tento týden neprodá, může být prodána příští týden. Salát čerstvé č. Čerstvé potravinářské výrobky mají trvanlivost od 1 do 14 dnů, který znamená, že nadměrná chyba predikce se přímo promítá do neobnovitelného fyzického odpadu. Tato asymetrie nákladů na chyby (náklady na nadměrné předpovědi často převyšují náklady na under-forecast) vyžaduje modely, které specificky minimalizují vzestupnou chybu.
2. Víceúrovňová sezónnost
Časová řada potravin představuje současnou sezónnost v několika frekvencích:
- Denní sezónnost: Páteční večerní prodej odlišný od úterního dopoledne
- Týdenní sezónnost: různé nákupní vzorce pro každý den v týdnu
- Měsíční sezónnost: Účinky začátku/konce měsíce související s platovými cykly
- Roční sezónnost: léto vs zima pro sezónní produkty
- Sváteční období: Vánoce, Velikonoce, polovina srpna s náhlými výkyvy
Tradiční modely ARIMA zvládají pouze jeden sezónní komponent. Moderní modely jako Prophet a TFT nativně zvládají více sezón.
3. Nestacionární exogenní proměnné
Poptávka po potravinách je silně ovlivněna vnějšími faktory, které se neřídí pravidelnými vzory: povětrnostní podmínky (týden deště zvyšuje prodej polévky o 40 %), propagační kampaně (propagační leták může dočasně ztrojnásobit poptávku), místní akce (fotbalové zápasy, veletrhy), konkurenční ceny a trendy na sociálních sítích. Vhodně začlenit tyto exogenní proměnné efektivní a rozdíl mezi průměrným modelem a vynikajícím modelem.
4. Dlouhý chvost SKU se špatnými daty
Typický velký maloobchod spravuje 15 000–30 000 aktivních SKU. 20 % SKU generuje 80 % tržeb, ale zbývajících 80 % má řídké, přerušované časové řady s mnoha nulami. Pro tyto standardní modely produktů selhávají a je zapotřebí specializovaných přístupů, jako je Crostonova metoda, modely s nulovým nafouknutím nebo přenos učení z podobných SKU.
5. Typické chyby bez ML
Il Typická MAPE (střední absolutní procentní chyba). v tradičních prognostických systémech na základě klouzavých průměrů, manuálních pravidel nebo ERP systémů bez ML osciluje mezi 20 % a 40 % pro čerstvé produkty. Zavedení modelů ML tuto chybu snižuje na 5–15 %, s LSTM demonstrujícím MAPE 16,43 % oproti 28,76 % tradičních metod v nedávných benchmarkech, snížení o 43 %.
Klasické statistické modely: ARIMA, SARIMA, Holt-Winters a Prophet
Než přejdeme k hlubokému učení, je nezbytné porozumět klasickým statistickým modelům. Ne protože jsou zastaralé, ale protože často představují základní linii k překonání, jsou interpretovatelné, výpočetně lehké a v některých kontextech (málo dat, jednoduché produkty) konkurenceschopné složitější přístupy.
ARIMA a SARIMA: Základy prognózování
ARIMA (AutoRegressive Integrated Moving Average) je nejpoužívanější statistický model podle řady jednorozměrné časové osy. Model ARIMA(p,d,q) kombinuje tři komponenty: AutoRegrese (p lags del minulá hodnota), integrace (rozdíly, aby se řada stala stacionární) a klouzavý průměr (q zpoždění zbytkových chyb). SARIMA přidává sezónní složku (P,D,Q).
# SARIMA per forecasting vendite settimanali - prodotto fresco
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_percentage_error
# Caricamento dati di esempio
# Formato: date index, colonna 'sales' con unita vendute
df = pd.read_csv('vendite_insalata.csv', index_col='date', parse_dates=True)
df = df.asfreq('D') # Frequenza giornaliera
df['sales'] = df['sales'].fillna(0)
# Test di stazionarieta (ADF Test)
result = adfuller(df['sales'].dropna())
print(f'ADF Statistic: {result[0]:.4f}')
print(f'p-value: {result[1]:.4f}')
print(f'La serie e {"stazionaria" if result[1] < 0.05 else "non stazionaria"}')
# Split train/test (80/20 con holdout degli ultimi 30 giorni)
train_size = len(df) - 30
train = df.iloc[:train_size]
test = df.iloc[train_size:]
# Modello SARIMA(1,1,1)(1,1,1)7 - stagionalita settimanale
# p=1, d=1, q=1: componenti ARIMA
# P=1, D=1, Q=1, s=7: componenti stagionali settimanali
model = SARIMAX(
train['sales'],
order=(1, 1, 1),
seasonal_order=(1, 1, 1, 7), # s=7 per stagionalita settimanale
enforce_stationarity=False,
enforce_invertibility=False
)
results = model.fit(disp=False)
print(results.summary())
# Previsione sullo stesso periodo del test set
forecast = results.forecast(steps=len(test))
forecast = np.maximum(forecast, 0) # Nessun valore negativo
# Metriche
mape = mean_absolute_percentage_error(test['sales'], forecast) * 100
rmse = np.sqrt(np.mean((test['sales'].values - forecast.values) ** 2))
mae = np.mean(np.abs(test['sales'].values - forecast.values))
print(f'\nMetriche SARIMA:')
print(f' MAPE: {mape:.2f}%')
print(f' RMSE: {rmse:.2f} unita')
print(f' MAE: {mae:.2f} unita')
# Visualizzazione
plt.figure(figsize=(12, 5))
plt.plot(train['sales'][-60:], label='Train (ultimi 60gg)')
plt.plot(test['sales'], label='Reale', color='green')
plt.plot(test.index, forecast, label='Previsione SARIMA', color='red', linestyle='--')
plt.title('SARIMA - Previsione Vendite Prodotto Fresco')
plt.legend()
plt.tight_layout()
plt.savefig('sarima_forecast.png', dpi=150)
plt.show()
Prophet: Interpretovatelné předpovídání s exogenními proměnnými
Prorok (Meta/Facebook, 2017) a výborná volba pro předpověď jídla protože nativně zvládá: více ročních období (denní, týdenní, roční), prázdniny efekty s přizpůsobitelným kalendářem, nelineární trendy s automatickými body změny a proměnné exogenní (další regresoři). Jeho interpretovatelnost (automatický rozklad na trend + sezónnost + dovolená) ocení i netechnickí uživatelé.
# Prophet per forecasting con variabili esogene (meteo, promozioni)
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
import pandas as pd
import numpy as np
# Prophet richiede colonne 'ds' (datetime) e 'y' (target)
df_prophet = df.reset_index().rename(columns={'date': 'ds', 'sales': 'y'})
# Aggiunta variabili esogene (regressori)
# Esempio: temperatura media giornaliera e flag promozionale
df_prophet['temperature'] = meteo_df['temp_media'] # gradi Celsius
df_prophet['is_promotion'] = promo_df['active_promo'].astype(int)
# Calendario festivo italiano (personalizzato)
holidays_it = pd.DataFrame({
'holiday': [
'Natale', 'Capodanno', 'Pasqua', 'Ferragosto',
'Festa Repubblica', 'Ognissanti'
],
'ds': pd.to_datetime([
'2024-12-25', '2025-01-01', '2025-04-20', '2025-08-15',
'2025-06-02', '2025-11-01'
]),
'lower_window': [-3, -1, -3, -5, -1, -1], # Giorni prima
'upper_window': [3, 2, 2, 2, 1, 1] # Giorni dopo
})
# Configurazione modello
model = Prophet(
holidays=holidays_it,
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='multiplicative', # Meglio per dati con forte trend
changepoint_prior_scale=0.05, # Flessibilità del trend
seasonality_prior_scale=10.0 # Forza delle componenti stagionali
)
# Aggiunta regressori
model.add_regressor('temperature', standardize=True)
model.add_regressor('is_promotion', standardize=False)
# Training
train_prophet = df_prophet.iloc[:train_size]
model.fit(train_prophet)
# Previsione (con valori futuri dei regressori)
future = model.make_future_dataframe(periods=30)
future['temperature'] = future_meteo_df['temp_media']
future['is_promotion'] = future_promo_df['active_promo'].astype(int)
forecast_prophet = model.predict(future)
# Cross-validation interna (walk-forward)
df_cv = cross_validation(
model,
initial='365 days', # Periodo di training iniziale
period='30 days', # Frequenza di re-fitting
horizon='14 days' # Orizzonte di previsione
)
metrics_cv = performance_metrics(df_cv)
print(f'Prophet MAPE (CV): {metrics_cv["mape"].mean()*100:.2f}%')
# Visualizzazione decomposizione
fig = model.plot_components(forecast_prophet)
fig.savefig('prophet_components.png', dpi=150)
print('Componenti Prophet salvate: trend + stagionalita + holiday effects')
Kdy použít klasické statistické modely
| Model | Silné stránky | Omezení | Ideální případ použití |
|---|---|---|---|
| ARIMA/SARIMA | Interpretovatelné, rychlé, stacionární série | Pouze jedna sezónnost, žádné exogenní proměnné | Stabilní produkty, málo dat |
| Holt-Winters | Zvládá trendy + sezónnost, jednoduchost | Nezvládá odlehlé hodnoty, pevnou sezónnost | Série s lineárním trendem a stabilní sezónností |
| Prorok | Vícesezónnost, dovolená, regresoři | Není ideální pro velmi nepravidelné série | Produkty se silným slavnostním/propagačním účinkem |
| LSTM/GRU | Komplexní, vícerozměrné vzory s dlouhým dosahem | Potřebné množství dat, černá skříňka | Velké množství SKU, mnoho exogenních proměnných |
| TFT | Interpretovatelné + DL, multi-horizont, pozornost | Výpočetně náročné, vyžaduje GPU | Centralizované předpovídání napříč mnoha SKU |
Hluboké učení pro časové řady: LSTM, GRU, TFT a N-BEATS
Modely hlubokého učení způsobily revoluci v prognózování poptávky počínaje lety 2018–2020. Jejich převaha se projevuje především v přítomnosti: mnoha korelovaných exogenních proměnných, vzorců složité nelineární, dlouhodobé časové závislosti a velké objemy dat s více jednotkami SKU které vám umožňují využívat přenos učení mezi podobnými produkty.
LSTM a GRU: Selektivní paměť v sekvencích
Le Dlouhodobá krátkodobá paměť (LSTM) e le Gated Recurrent Units (GRU) jsou to opakující se sítě navržené k zachycení závislostí na dlouhé vzdálenosti v časových sekvencích. LSTM používá tři brány (vstup, zapomenutí, výstup) k rozhodnutí, které informace zachovat nebo zahodit v buněčné paměti. GRU zjednodušuje pomocí dvou bran (reset, aktualizace), které dosahují podobného výkonu s menšími parametry.
# LSTM multi-variate per demand forecasting alimentare
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_percentage_error
# ---- Preparazione dati ----
class FoodDemandDataset(torch.utils.data.Dataset):
def __init__(self, data, seq_len=14, pred_len=7):
self.seq_len = seq_len
self.pred_len = pred_len
self.data = torch.FloatTensor(data)
def __len__(self):
return len(self.data) - self.seq_len - self.pred_len + 1
def __getitem__(self, idx):
x = self.data[idx: idx + self.seq_len]
y = self.data[idx + self.seq_len: idx + self.seq_len + self.pred_len, 0]
return x, y # x: (seq_len, features), y: (pred_len,)
# ---- Architettura LSTM ----
class LSTMForecaster(nn.Module):
def __init__(self, input_size, hidden_size=128, num_layers=2,
pred_len=7, dropout=0.2):
super(LSTMForecaster, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.pred_len = pred_len
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, pred_len)
def forward(self, x):
# x shape: (batch, seq_len, input_size)
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.dropout(out[:, -1, :]) # Ultimo time step
out = self.fc(out) # (batch, pred_len)
return out
# ---- Feature preparation ----
def prepare_features(df):
"""Crea feature matrix multi-variate per LSTM"""
features = pd.DataFrame()
features['sales'] = df['sales']
# Lag features: vendite passate come input esplicito
for lag in [1, 2, 3, 7, 14]:
features[f'sales_lag_{lag}'] = df['sales'].shift(lag)
# Rolling statistics
features['sales_rolling_mean_7d'] = df['sales'].rolling(7).mean()
features['sales_rolling_std_7d'] = df['sales'].rolling(7).std()
features['sales_rolling_mean_14d'] = df['sales'].rolling(14).mean()
# Feature temporali cicliche (encoding sinusoidale)
features['day_sin'] = np.sin(2 * np.pi * df.index.dayofweek / 7)
features['day_cos'] = np.cos(2 * np.pi * df.index.dayofweek / 7)
features['month_sin'] = np.sin(2 * np.pi * df.index.month / 12)
features['month_cos'] = np.cos(2 * np.pi * df.index.month / 12)
# Variabili esogene (meteo, promozioni)
features['temperature'] = df['temperature']
features['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
features['is_holiday'] = df['is_holiday'].astype(int)
features['is_promotion'] = df['is_promotion'].astype(int)
features = features.dropna()
return features
# ---- Training loop ----
def train_lstm_model(train_data, val_data, config):
model = LSTMForecaster(
input_size=config['input_size'],
hidden_size=config['hidden_size'],
num_layers=config['num_layers'],
pred_len=config['pred_len']
)
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
criterion = nn.HuberLoss(delta=1.0) # Robusto agli outlier
train_loader = torch.utils.data.DataLoader(
FoodDemandDataset(train_data, config['seq_len'], config['pred_len']),
batch_size=config['batch_size'],
shuffle=True
)
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(config['epochs']):
model.train()
train_loss = 0
for x_batch, y_batch in train_loader:
optimizer.zero_grad()
y_pred = model(x_batch)
loss = criterion(y_pred, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
# Validazione
model.eval()
with torch.no_grad():
val_dataset = FoodDemandDataset(val_data, config['seq_len'], config['pred_len'])
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32)
val_loss = sum(criterion(model(x), y).item() for x, y in val_loader)
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_lstm_model.pt')
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= config['early_stopping_patience']:
print(f'Early stopping all\'epoca {epoch}')
break
if epoch % 10 == 0:
print(f'Epoch {epoch}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}')
# Carica modello migliore
model.load_state_dict(torch.load('best_lstm_model.pt'))
return model
# Configurazione
config = {
'input_size': 15, # Numero di feature (sales + lag + temporal + exog)
'hidden_size': 128,
'num_layers': 2,
'pred_len': 7, # Previsione a 7 giorni
'seq_len': 14, # Lookback di 14 giorni
'batch_size': 32,
'lr': 0.001,
'epochs': 100,
'early_stopping_patience': 15
}
Temporal Fusion Transformer: Interpretovatelnost + výkon
Il Temporal Fusion Transformer (TFT) od Google DeepMind (2021) a aktuálně považováno za nejmodernější pro předpovídání podnikové poptávky. Jeho architektura se snoubí vícehlavé mechanismy pozornosti se zbytkovými hradly (GRN), které zpracovávají obě statické proměnné (typ produktu, kategorie produktu) a předem známé časové proměnné (kalendář, plánované akce) a pozorované (proběhlé prodeje, počasí).
V roce 2024 srovnáme dataset M5 (30 490 časových řad z 10 obchodů Walmart), TFT a modely Transformátorové MASE prokázaly zlepšení 26–29 % a snížení z WQL 34 % ve srovnání se sezónní naivní metodou. Piknik, prodejce potravin Dutch online, přijal TFT jako svůj hlavní model pro prognózování poptávky, publikování podrobné výsledky ve výrobě.
# Temporal Fusion Transformer con PyTorch Forecasting
# pip install pytorch-forecasting pytorch-lightning
from pytorch_forecasting import TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import QuantileLoss
import pytorch_lightning as pl
import pandas as pd
import torch
# ---- Preparazione dati nel formato TFT ----
def prepare_tft_dataset(df):
"""
df deve avere: date, store_id, product_id, sales,
temperature, is_holiday, is_promotion, price, category
"""
df = df.copy()
df['time_idx'] = (df['date'] - df['date'].min()).dt.days
df['month'] = df['date'].dt.month.astype(str)
df['day_of_week'] = df['date'].dt.dayofweek.astype(str)
max_prediction_length = 7 # Previsione 7 giorni
max_encoder_length = 28 # Lookback 28 giorni
training_cutoff = df['time_idx'].max() - max_prediction_length
training_dataset = TimeSeriesDataSet(
df[df.time_idx <= training_cutoff],
time_idx='time_idx',
target='sales',
group_ids=['store_id', 'product_id'],
# Variabili statiche (non cambiano nel tempo per ogni gruppo)
static_categoricals=['store_id', 'product_id', 'category'],
# Variabili temporali note in anticipo
time_varying_known_categoricals=['month', 'day_of_week', 'is_holiday'],
time_varying_known_reals=['time_idx', 'is_promotion', 'price'],
# Variabili osservate (disponibili solo per il passato)
time_varying_unknown_reals=['sales', 'temperature'],
min_encoder_length=max_encoder_length // 2,
max_encoder_length=max_encoder_length,
min_prediction_length=1,
max_prediction_length=max_prediction_length,
target_normalizer=GroupNormalizer(
groups=['store_id', 'product_id'],
transformation='softplus' # Mantiene valori positivi
),
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
)
return training_dataset, training_cutoff
# ---- Modello TFT ----
def build_tft_model(training_dataset):
tft = TemporalFusionTransformer.from_dataset(
training_dataset,
learning_rate=0.03,
hidden_size=64, # Dimensione hidden layer
attention_head_size=4, # Numero attention heads
dropout=0.1,
hidden_continuous_size=16, # Feature continue
loss=QuantileLoss(), # Previsione probabilistica
log_interval=10,
reduce_on_plateau_patience=4,
)
print(f'Numero parametri: {tft.size()/1e3:.1f}k')
return tft
# ---- Training con PyTorch Lightning ----
def train_tft(training_dataset, validation_dataset):
train_loader = training_dataset.to_dataloader(
train=True, batch_size=128, num_workers=4
)
val_loader = validation_dataset.to_dataloader(
train=False, batch_size=128, num_workers=4
)
trainer = pl.Trainer(
max_epochs=30,
accelerator='gpu' if torch.cuda.is_available() else 'cpu',
gradient_clip_val=0.1,
callbacks=[
pl.callbacks.EarlyStopping(
monitor='val_loss', patience=5, mode='min'
),
pl.callbacks.ModelCheckpoint(
monitor='val_loss', save_top_k=1
)
]
)
tft_model = build_tft_model(training_dataset)
trainer.fit(tft_model, train_loader, val_loader)
# Interpretabilita: variable importance
best_model = TemporalFusionTransformer.load_from_checkpoint(
trainer.checkpoint_callback.best_model_path
)
raw_predictions, x = best_model.predict(
val_loader, mode='raw', return_x=True
)
# Feature importance - chi contribuisce di più alle previsioni
interpretation = best_model.interpret_output(
raw_predictions, reduction='sum'
)
print('Feature importance:')
for key, vals in interpretation.items():
print(f' {key}: {vals}')
return best_model
N-BEATS: Rozšíření neurální báze bez rekurentních architektur
N-BEATS (Element AI, 2020) a radikálně odlišný přístup: používejte pouze vrstvy plně propojené organizované v hromadách a blocích, bez svinutí nebo mechanismů pozornosti. Každý blok rozkládá signál na expanzi báze (trend, sezónnost) a rezidua. Na M4 Kompetitivní datový soubor překonal všechny statistické metody o 11 % a také hybridní neurálně-statistické vítěz o 3 %. Pro potravinářské produkty se silnou sezónností, interpretovatelná verze N-BEATS vytváří trendy/sezónní rozklady, které obchodní analytici oceňují.
Feature Engineering pro prognózování poptávky po potravinách
Kvalita konstrukce prvků je často nejdůležitějším faktorem výkonu modelu. LSTM s vynikajícím inženýrstvím funkcí překonává TFT s průměrným inženýrstvím funkcí. Podívejme se hlavní kategorie prvků, které mají být vytvořeny pro potravinový kontext.
Cyklické kódování pro časové proměnné
Častou chybou je považování dne v týdnu za celočíselnou hodnotu (0-6). Problém je v tom model nechápe, že den 6 (neděle) je „blízko“ dne 0 (pondělí). Kódování cyklický se sinem a kosinusem tento problém řeší.
# Feature engineering completo per food demand forecasting
import pandas as pd
import numpy as np
from typing import List, Dict
def create_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
"""Crea feature temporali cicliche e categoriche"""
df = df.copy()
idx = df.index
# Encoding ciclico - preserva la ciclicita (es. lunedi vicino a domenica)
df['hour_sin'] = np.sin(2 * np.pi * idx.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * idx.hour / 24)
df['dow_sin'] = np.sin(2 * np.pi * idx.dayofweek / 7)
df['dow_cos'] = np.cos(2 * np.pi * idx.dayofweek / 7)
df['month_sin'] = np.sin(2 * np.pi * idx.month / 12)
df['month_cos'] = np.cos(2 * np.pi * idx.month / 12)
df['doy_sin'] = np.sin(2 * np.pi * idx.dayofyear / 365.25)
df['doy_cos'] = np.cos(2 * np.pi * idx.dayofyear / 365.25)
# Feature binarie utili per GDO italiana
df['is_weekend'] = (idx.dayofweek >= 5).astype(int)
df['is_monday'] = (idx.dayofweek == 0).astype(int)
df['is_friday'] = (idx.dayofweek == 4).astype(int)
# Posizione nel mese (utile per effetti stipendio)
df['day_of_month'] = idx.day
df['is_month_start'] = (idx.day <= 5).astype(int)
df['is_month_end'] = (idx.day >= 25).astype(int)
# Settimana dell'anno
df['week_of_year'] = idx.isocalendar().week.astype(int)
df['quarter'] = idx.quarter
return df
def create_lag_features(df: pd.DataFrame, target_col: str,
lags: List[int] = None) -> pd.DataFrame:
"""Crea lag features del target"""
if lags is None:
lags = [1, 2, 3, 7, 14, 21, 28]
df = df.copy()
for lag in lags:
df[f'{target_col}_lag_{lag}d'] = df[target_col].shift(lag)
return df
def create_rolling_features(df: pd.DataFrame, target_col: str,
windows: List[int] = None) -> pd.DataFrame:
"""Crea rolling statistics"""
if windows is None:
windows = [3, 7, 14, 28]
df = df.copy()
for window in windows:
df[f'{target_col}_roll_mean_{window}d'] = (
df[target_col].shift(1).rolling(window).mean()
)
df[f'{target_col}_roll_std_{window}d'] = (
df[target_col].shift(1).rolling(window).std()
)
df[f'{target_col}_roll_min_{window}d'] = (
df[target_col].shift(1).rolling(window).min()
)
df[f'{target_col}_roll_max_{window}d'] = (
df[target_col].shift(1).rolling(window).max()
)
# Exponential moving average
df[f'{target_col}_ema_{window}d'] = (
df[target_col].shift(1).ewm(span=window).mean()
)
return df
def create_weather_features(df: pd.DataFrame,
meteo_df: pd.DataFrame) -> pd.DataFrame:
"""
Integra variabili meteorologiche
meteo_df: DataFrame con colonne [date, temp_max, temp_min, precipitazione_mm,
umidita, radiazione_solare, vento_kmh]
"""
df = df.merge(meteo_df, left_index=True, right_on='date', how='left')
# Feature derivate dal meteo
df['temp_delta'] = df['temp_max'] - df['temp_min']
df['is_hot'] = (df['temp_max'] > 28).astype(int) # Ondata di caldo
df['is_cold'] = (df['temp_min'] < 5).astype(int) # Freddo invernale
df['is_rain'] = (df['precipitazione_mm'] > 1).astype(int)
# Interazioni meteo x prodotto (es. gelati vanno bene con caldo)
# Necessità di parametrizzazione per categoria prodotto
df['heat_wave_consecutive'] = (
df['is_hot'].rolling(3).sum() == 3
).astype(int)
return df
def create_promotion_features(df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""
Crea feature da calendario promozionale
promo_df: colonne [date, sku_id, discount_pct, is_volantino, is_digital_promo]
"""
df = df.merge(promo_df, left_index=True, right_on='date', how='left')
df['discount_pct'] = df['discount_pct'].fillna(0)
df['is_promotion'] = (df['discount_pct'] > 0).astype(int)
df['is_deep_discount'] = (df['discount_pct'] > 0.3).astype(int)
# Effetto anticipazione promozione (pre-promo dip)
df['promo_lag_1'] = df['is_promotion'].shift(1).fillna(0)
df['promo_lead_1'] = df['is_promotion'].shift(-1).fillna(0) # Solo in training
# Effetto post-promozione (inventory loading)
df['days_since_last_promo'] = (
df['is_promotion']
.pipe(lambda s: s.where(s.eq(1)).ffill().index - s.index)
.dt.days
.clip(upper=30)
)
return df
def create_price_features(df: pd.DataFrame) -> pd.DataFrame:
"""Feature relative al prezzo e alla sua variazione"""
df = df.copy()
df['price_lag_1'] = df['price'].shift(1)
df['price_change_pct'] = df['price'].pct_change()
df['price_vs_avg_30d'] = df['price'] / df['price'].rolling(30).mean() - 1
return df
def full_feature_engineering_pipeline(df: pd.DataFrame,
meteo_df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""Pipeline completa di feature engineering"""
df = create_temporal_features(df)
df = create_lag_features(df, 'sales')
df = create_rolling_features(df, 'sales')
df = create_weather_features(df, meteo_df)
df = create_promotion_features(df, promo_df)
df = create_price_features(df)
df = df.dropna()
print(f'Feature totali create: {df.shape[1]}')
print(f'Sample size dopo feature engineering: {df.shape[0]}')
return df
Kompletní ML potrubí: od nezpracovaných dat po model ve výrobě
Profesionální kanál prognózování poptávky se neomezuje pouze na modelové školení. Vyžaduje end-to-end architektura, která spravuje sběr dat, předběžné zpracování, validaci, výběr modelu, nasazení a průběžné sledování. Níže je uvedena struktura potrubí připraveného k výrobě.
# Pipeline ML completa per food demand forecasting
import mlflow
import mlflow.pytorch
from dataclasses import dataclass, field
from typing import Optional, Tuple
import pandas as pd
import numpy as np
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Configurazione Pipeline ----
@dataclass
class ForecastConfig:
# Data
store_ids: list = field(default_factory=list)
product_categories: list = field(default_factory=list)
training_lookback_days: int = 365
forecast_horizon_days: int = 7
# Feature engineering
lag_days: list = field(default_factory=lambda: [1, 2, 3, 7, 14, 28])
rolling_windows: list = field(default_factory=lambda: [7, 14, 28])
# Model selection
model_type: str = 'tft' # 'sarima', 'prophet', 'lstm', 'tft', 'xgboost'
use_ensemble: bool = False
# Validation
n_cv_splits: int = 4
val_horizon_days: int = 14
# MLflow
experiment_name: str = 'food_demand_forecasting'
run_name: Optional[str] = None
# Deployment
min_mape_threshold: float = 0.20 # Non deployare se MAPE > 20%
model_registry_name: str = 'food_demand_model'
# ---- Step 1: Data Collection e Validation ----
class DataCollector:
def __init__(self, config: ForecastConfig):
self.config = config
def collect(self, start_date: str, end_date: str) -> pd.DataFrame:
"""Raccoglie dati da data warehouse (es. Snowflake, BigQuery)"""
query = f"""
SELECT
date,
store_id,
product_id,
category,
sales_units,
price,
stock_level,
is_holiday,
is_promotion,
discount_pct
FROM gold.sales_daily
WHERE date BETWEEN '{start_date}' AND '{end_date}'
AND store_id IN ({','.join(map(str, self.config.store_ids))})
ORDER BY date, store_id, product_id
"""
# Qui si userebbe il connector del DWH (es. snowflake-connector-python)
# df = snowflake_connector.execute(query)
logger.info(f'Dati raccolti: {start_date} - {end_date}')
return df # Placeholder
def validate(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
"""Validazione qualità dati"""
issues = {}
# Check valori negativi
neg_sales = (df['sales_units'] < 0).sum()
if neg_sales > 0:
issues['negative_sales'] = neg_sales
df.loc[df['sales_units'] < 0, 'sales_units'] = 0
# Check missing dates per gruppo
for group_id, group_df in df.groupby(['store_id', 'product_id']):
date_range = pd.date_range(group_df['date'].min(), group_df['date'].max())
missing = len(date_range) - len(group_df)
if missing > 0:
issues[f'missing_dates_{group_id}'] = missing
# Check outliers (IQR method)
Q1 = df['sales_units'].quantile(0.25)
Q3 = df['sales_units'].quantile(0.75)
IQR = Q3 - Q1
outliers = ((df['sales_units'] < Q1 - 3*IQR) | (df['sales_units'] > Q3 + 3*IQR)).sum()
issues['outliers'] = int(outliers)
logger.info(f'Validazione dati: {issues}')
return df, issues
# ---- Step 2: Walk-Forward Validation ----
class WalkForwardValidator:
"""
Validazione temporale corretta per time series.
NON usare cross-validation classico che crea data leakage.
"""
def __init__(self, config: ForecastConfig):
self.config = config
def validate(self, df: pd.DataFrame, model_class) -> dict:
results = []
total_days = len(df['date'].unique())
split_size = total_days // (self.config.n_cv_splits + 1)
for fold in range(self.config.n_cv_splits):
train_end_idx = split_size * (fold + 1)
val_start_idx = train_end_idx
val_end_idx = val_start_idx + self.config.val_horizon_days
train_dates = sorted(df['date'].unique())[:train_end_idx]
val_dates = sorted(df['date'].unique())[val_start_idx:val_end_idx]
train_df = df[df['date'].isin(train_dates)]
val_df = df[df['date'].isin(val_dates)]
if len(val_df) == 0:
continue
# Training sul fold
model = model_class(self.config)
model.fit(train_df)
predictions = model.predict(val_df)
# Metriche
actuals = val_df['sales_units'].values
preds = predictions['forecast'].values
mape = np.mean(np.abs((actuals - preds) / (actuals + 1e-8))) * 100
rmse = np.sqrt(np.mean((actuals - preds) ** 2))
mae = np.mean(np.abs(actuals - preds))
results.append({
'fold': fold,
'mape': mape,
'rmse': rmse,
'mae': mae,
'train_size': len(train_df),
'val_size': len(val_df)
})
logger.info(f'Fold {fold}: MAPE={mape:.2f}%, RMSE={rmse:.2f}')
# Aggrega risultati
results_df = pd.DataFrame(results)
return {
'mean_mape': results_df['mape'].mean(),
'std_mape': results_df['mape'].std(),
'mean_rmse': results_df['rmse'].mean(),
'mean_mae': results_df['mae'].mean(),
'folds': results
}
# ---- Step 3: MLflow Tracking e Model Registry ----
class ExperimentTracker:
def __init__(self, config: ForecastConfig):
self.config = config
mlflow.set_experiment(config.experiment_name)
def run_experiment(self, df: pd.DataFrame, model_class) -> str:
run_name = self.config.run_name or f'{self.config.model_type}_{datetime.now():%Y%m%d_%H%M}'
with mlflow.start_run(run_name=run_name) as run:
# Log parametri
mlflow.log_params({
'model_type': self.config.model_type,
'forecast_horizon': self.config.forecast_horizon_days,
'training_lookback': self.config.training_lookback_days,
'n_cv_splits': self.config.n_cv_splits
})
# Walk-forward validation
validator = WalkForwardValidator(self.config)
metrics = validator.validate(df, model_class)
# Log metriche
mlflow.log_metrics({
'cv_mean_mape': metrics['mean_mape'],
'cv_std_mape': metrics['std_mape'],
'cv_mean_rmse': metrics['mean_rmse'],
'cv_mean_mae': metrics['mean_mae']
})
# Training finale su tutti i dati
final_model = model_class(self.config)
final_model.fit(df)
# Log artefatti
mlflow.log_dict(metrics, 'validation_metrics.json')
# Registra modello se MAPE accettabile
if metrics['mean_mape'] < self.config.min_mape_threshold * 100:
mlflow.pytorch.log_model(
final_model,
artifact_path='model',
registered_model_name=self.config.model_registry_name
)
logger.info(f'Modello registrato: MAPE={metrics["mean_mape"]:.2f}%')
else:
logger.warning(
f'Modello NON registrato: MAPE {metrics["mean_mape"]:.2f}% '
f'> soglia {self.config.min_mape_threshold * 100}%'
)
return run.info.run_id
Benchmark: Srovnání mezi modely na reálném souboru dat
Níže je systematické srovnání výkonu různých přístupů na typickém souboru dat GDO: 150 SKU čerstvých produktů (mléko, ovoce, zelenina, maso), 2 roky denních dat z 5 prodejních míst, s exogenními proměnnými (počasí, akce, svátky).
Srovnávací srovnávací modely – italský datový soubor GDO (150 SKU, 2 roky)
| Model | MAPE (%) | RMSE (jednotka) | MAE (spojené) | Doba školení | Interpretovatelnost | Poznámky |
|---|---|---|---|---|---|---|
| Klouzavý průměr (základní hodnota) | 31.4 | 18.7 | 12.3 | < 1 min | Vysoký | Žádná sezónnost, žádní regresoři |
| SARIMA | 22.8 | 14.2 | 9.8 | ~15 min | Vysoký | Pouze jedno sezónní zpoždění |
| Holt-Winters | 20.1 | 13.1 | 8.9 | ~5 min | Vysoký | Dobré pro běžné seriály |
| Prorok | 14.6 | 10.4 | 7.2 | ~30 min | Vysoký | Vynikající s dovolenou/regresory |
| XGBoost + funkce Eng. | 12.9 | 9.8 | 6.7 | ~10 min | Průměrný | Inženýrství kritických funkcí |
| LSTM (jednorozměrné) | 16.4 | 11.2 | 7.8 | ~45 minut GPU | Nízký | Horší než Prophet bez exog |
| LSTM (více proměnných) | 10.8 | 8.4 | 5.9 | ~2h GPU | Nízký | Silné zlepšení s exogem |
| N-BEATS | 11.2 | 8.7 | 6.1 | ~1h GPU | Průměrný | Interpretovatelný rozklad |
| TFT (Temporal Fusion Transformer) | 8.3 | 6.9 | 4.8 | ~4h GPU | Vysoký | Celkově nejlepší, interpretovatelné |
| Ensemble TFT + XGBoost | 7.6 | 6.4 | 4.4 | ~5h GPU | Průměrný | +8 % na jediném TFT |
Dataset: 150 SKU čerstvých produktů, 5 obchodů, 2 roky denní data. Dopředná validace na 4 násobcích (14denní horizont). GPU: NVIDIA A100.
Úvahy o volbě modelu
TFT je nejvýkonnější možností, ale vyžaduje GPU a dostatek dat pro každou SKU. Pro maloobchodníky s tisíce SKU, ale nízké objemy na jeden produkt, hybridní přístup funguje nejlépe: XGBoost nebo Prophet pro long tail SKU (nízký objem, špatná data), TFT pro SKU velkoobjemová odvětví, která generují nejvíce příjmů a odpadu.
Snížení plýtvání: integrace prognóz se skladovými zásobami a dynamickým oceňováním
Přesný model předpovědi sám o sobě nesnižuje plýtvání: je nezbytný jednat na předpovědích prostřednictvím řízení zásob a dynamických cenových systémů. Smyčka rozhodování a: predikce budoucí poptávky → optimální objednávka dodavateli → sledování skladové zásoby → dynamické snížení cen u produktů, které se blíží expiraci → automatické darování za neprodané přebytky.
Optimalizace zásob pomocí Dynamic Safety Stock
# Inventory optimization basato su forecast probabilistico
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class InventoryParams:
product_id: str
shelf_life_days: int # Vita utile del prodotto
lead_time_days: int # Giorni da ordine a consegna
service_level: float = 0.95 # Fill rate target (95%)
holding_cost_per_unit: float = 0.05 # Costo mantenimento/giorno
shortage_cost_per_unit: float = 2.50 # Costo stockout per unita
waste_cost_per_unit: float = 1.20 # Costo di buttare una unita
def calculate_optimal_order(
demand_forecast: np.ndarray, # Previsione media per i prossimi N giorni
demand_std: np.ndarray, # Deviazione standard previsione
current_stock: int,
params: InventoryParams
) -> dict:
"""
Calcola la quantità ottimale da ordinare considerando:
- Shelf life del prodotto (prodotti freschi deperibili)
- Service level target
- Trade-off costo stockout vs costo spreco
"""
from scipy import stats
# Z-score per il service level target
z_score = stats.norm.ppf(params.service_level)
# Domanda attesa nel lead time
lt_demand = demand_forecast[:params.lead_time_days].sum()
lt_std = np.sqrt((demand_std[:params.lead_time_days] ** 2).sum())
# Safety stock = Z * sigma * sqrt(lead_time)
safety_stock = z_score * lt_std
# Reorder point
reorder_point = lt_demand + safety_stock
# Domanda fino a shelf life (non ordinare più di quanto venderemo)
max_sellable_demand = demand_forecast[:params.shelf_life_days].sum()
max_sellable_std = np.sqrt((demand_std[:params.shelf_life_days] ** 2).sum())
# Order quantity
unconstrained_order = max(0, reorder_point - current_stock)
# Vincolo shelf life: non ordinare più di quanto venderemo nella vita utile
# Aggiusta per il rischio di spreco
waste_probability = 1 - stats.norm.cdf(
current_stock + unconstrained_order,
loc=max_sellable_demand,
scale=max_sellable_std
)
# Costo atteso
expected_waste_cost = waste_probability * unconstrained_order * params.waste_cost_per_unit
expected_shortage_cost = (1 - params.service_level) * lt_demand * params.shortage_cost_per_unit
# Ottimizzazione: riduce ordine se costo spreco > beneficio margine
if expected_waste_cost > expected_shortage_cost * 0.5:
adjustment_factor = 1 - (expected_waste_cost / (expected_waste_cost + expected_shortage_cost))
optimal_order = int(unconstrained_order * adjustment_factor)
else:
optimal_order = int(unconstrained_order)
return {
'optimal_order_qty': max(0, optimal_order),
'reorder_point': reorder_point,
'safety_stock': safety_stock,
'expected_demand_7d': demand_forecast[:7].sum(),
'waste_risk_pct': waste_probability * 100,
'shortage_risk_pct': (1 - params.service_level) * 100,
'decision_reason': 'waste_adjusted' if expected_waste_cost > expected_shortage_cost * 0.5
else 'standard_reorder'
}
# Esempio utilizzo
params = InventoryParams(
product_id='insalata_vaschetta_150g',
shelf_life_days=5,
lead_time_days=1,
service_level=0.95,
waste_cost_per_unit=0.85,
shortage_cost_per_unit=3.20
)
# Simulazione con forecast TFT (media e std da quantile forecast)
forecast_mean = np.array([45, 52, 68, 71, 89, 95, 48]) # Lunedi-Domenica
forecast_std = np.array([8, 9, 12, 13, 15, 16, 9]) # Deviazione standard
order = calculate_optimal_order(
demand_forecast=forecast_mean,
demand_std=forecast_std,
current_stock=120,
params=params
)
print('Raccomandazione ordine:')
for key, val in order.items():
print(f' {key}: {val}')
Dynamické stanovení cen pro snížení odpadu: Optimalizace markdown
Dynamické stanovování cen na základě data expirace je jedním z nejúčinnějších nástrojů ke snížení odpad v čerstvých produktech. Cílem je najít optimální diskontní cena což maximalizuje výnosy získané z produktů, kterým hrozí expirace, a urychluje jejich prodej bez kanibalizovat prodej na produkty za plnou cenu.
Bez odpadu, izraelský startup, vyvinul systém založený na elektronických regálech štítky (ESL), které automaticky aktualizují ceny na základě data expirace, úrovně zásoby a chování zákazníků. Maloobchodníci, kteří jej přijali, snížili množství odpadu 40 %a zároveň zvýšit výnosy z těchto produktů vzhledem k situaci předchozí (neprodaný produkt vyhoďte). Také Too Good To Go spustil v 2024 platforma AI, která zpracovává data na úrovni SKU za účelem optimalizace úrovní slev, snížení manuálních kontrol termínů o 93–99 %.
# Algoritmo markdown optimization per prodotti prossimi a scadenza
import numpy as np
from scipy.optimize import minimize_scalar
from dataclasses import dataclass
from typing import Tuple
@dataclass
class MarkdownConfig:
min_margin_pct: float = 0.10 # Margine minimo (non scendere sotto il 10%)
max_discount_pct: float = 0.70 # Sconto massimo (70%)
base_price: float = 1.99 # Prezzo normale
cost_per_unit: float = 0.95 # Costo di acquisto
waste_value: float = 0.00 # Valore residuo se buttato (es. recupero)
def price_elasticity_model(price: float, base_price: float,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Modello di elasticita del prezzo per prodotti freschi GDO.
Elasticita tipica prodotti freschi: -1.5 a -2.5
(sconto del 10% aumenta la domanda del 15-25%)
"""
price_ratio = price / base_price
return base_demand * (price_ratio ** elasticity)
def markdown_revenue(discount_pct: float,
config: MarkdownConfig,
current_stock: int,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Calcola il ricavo atteso da un determinato livello di sconto.
Massimizzare questa funzione = trovare lo sconto ottimale.
"""
discounted_price = config.base_price * (1 - discount_pct)
# Vincolo margine minimo
min_price = config.cost_per_unit * (1 + config.min_margin_pct)
if discounted_price < min_price:
return -1e10 # Penalita per prezzi sotto il costo
# Domanda attesa con il nuovo prezzo
expected_demand = price_elasticity_model(
discounted_price, config.base_price, base_demand, elasticity
)
# Unita vendute = min(domanda attesa, stock disponibile)
units_sold = min(expected_demand, current_stock)
units_wasted = max(0, current_stock - units_sold)
# Revenue = vendite + (eventuali recupero valore sprecato)
revenue = (units_sold * discounted_price) + (units_wasted * config.waste_value)
# Costo opportunità: confronto con vendita a prezzo pieno (se avessimo aspettato)
# ma qui massimizziamo il recupero dato che il prodotto scadera
return revenue
def find_optimal_markdown(config: MarkdownConfig,
current_stock: int,
days_to_expiry: int,
base_daily_demand: float,
demand_elasticity: float = -1.8) -> dict:
"""
Trova lo sconto ottimale considerando i giorni rimanenti a scadenza.
Logica: più vicini alla scadenza, maggiore lo sconto necessario.
"""
# Urgenza basata sui giorni a scadenza
if days_to_expiry >= 5:
# Nessun markdown necessario
return {'discount_pct': 0.0, 'recommended_action': 'no_action',
'expected_revenue': config.base_price * min(base_daily_demand * days_to_expiry, current_stock)}
# Moltiplicatore urgenza: più vicini alla scadenza, maggiore il markup di urgenza
urgency_factor = 1.0 + (5 - days_to_expiry) * 0.15 # +15% urgenza per giorno
# Domanda effettiva considerando urgenza (es. promozioni di fine giornata)
effective_demand = base_daily_demand * days_to_expiry * urgency_factor
# Ottimizzazione: trova lo sconto che massimizza il ricavo totale
result = minimize_scalar(
lambda d: -markdown_revenue(d, config, current_stock,
effective_demand, demand_elasticity),
bounds=(0.0, config.max_discount_pct),
method='bounded'
)
optimal_discount = result.x
optimal_revenue = -result.fun
waste_at_no_discount = max(0, current_stock - effective_demand)
waste_with_markdown = max(0, current_stock - price_elasticity_model(
config.base_price * (1 - optimal_discount),
config.base_price, effective_demand, demand_elasticity
))
waste_reduction_units = waste_at_no_discount - waste_with_markdown
revenue_recovered = optimal_revenue - (config.waste_value * current_stock)
action = (
'urgent_markdown' if days_to_expiry <= 1
else 'markdown' if optimal_discount > 0.15
else 'slight_discount'
)
return {
'discount_pct': round(optimal_discount * 100, 1),
'discounted_price': round(config.base_price * (1 - optimal_discount), 2),
'expected_revenue': round(optimal_revenue, 2),
'waste_reduction_units': round(waste_reduction_units, 0),
'revenue_recovered_vs_waste': round(revenue_recovered, 2),
'days_to_expiry': days_to_expiry,
'recommended_action': action
}
# Esempio: insalata con 2 giorni alla scadenza
config = MarkdownConfig(
base_price=1.99,
cost_per_unit=0.80,
min_margin_pct=0.05, # Riduco margine minimo vicino a scadenza
max_discount_pct=0.60
)
recommendation = find_optimal_markdown(
config=config,
current_stock=45,
days_to_expiry=2,
base_daily_demand=12,
demand_elasticity=-2.0
)
print('Raccomandazione markdown:')
for k, v in recommendation.items():
print(f' {k}: {v}')
# Output atteso:
# discount_pct: 35.0
# discounted_price: 1.29
# expected_revenue: 38.70
# waste_reduction_units: 28
# recommended_action: markdown
Případová studie: Italský velký maloobchod s více než 200 prodejními místy
Pojďme analyzovat skutečný (anonymizovaný) případ implementace prognózy poptávky ML v jednom Italský velký maloobchodní řetězec s 220 prodejními místy, z toho 18 000 aktivních SKU 3 200 čerstvých produktů a roční obrat přibližně 2,8 miliardy eur.
Výchozí situace
Před implementací ML obchodník používal klasický ERP systém s prognózováním na základě vážených klouzavých průměrů za poslední 4 měsíce. Průměrný MAPE na čerstvých produktech byl 28 %, s maximem 45 % u sezónních produktů. Míra plýtvání čerstvými produkty byla 12 % z nakoupené hodnoty s náklady na likvidaci, ztrátou hodnoty a dopadem na pověst významný. Kupující prováděli týdenní ruční kontroly objednávek, obsazení přibližně 3 FTE na plný úvazek.
Architektura řešení
Technologie Stack implementován
| Vrstvy | Volba technologie | Funkce |
|---|---|---|
| Datový sklad | Sněhové vločky | Centralizované úložiště, POS data ze všech 220 obchodů |
| ETL/ELT | dbt + Airbyte | Integrace POS, počasí (OpenWeather API), propagace (CRM systém) |
| Orchestr | Apache Airflow | Denní rekvalifikace 3:00-5:00, upozornění na anomálie |
| Modelky | TFT (top 500 SKU) + XGBoost (long tail) | Předpověď na 7 a 14 dní pro každý obchod SKU x |
| MLOps | MLflow na Databricks | Sledovací experimenty, registr modelů, A/B testování |
| Porce | FastAPI + mezipaměť Redis | Prognóza API s latencí < 100 ms, 24h mezipaměť |
| Motor Markdown | Mikroslužby Pythonu | Optimální výpočet slev každé 4 hodiny u rizikových produktů |
| Integrace ESL | Hanshow / Pricer API | Automatická aktualizace cen na elektronických regálových štítcích |
| Dar | API Food Bank | Automatické upozornění na neprodané produkty s expirací < 24h |
Časová osa a fáze implementace
- 1.–2. měsíc: Datový audit, čištění 3letých historických dat, integrace exogenních zdrojů (počasí, akce). Identifikace 50 velkoobjemových pilotních SKU podléhajících rychlé zkáze.
- 3.–4. měsíc: Školení pilotního modelu (XGBoost + Prophet), ověření vpřed, výchozí MAPE 27,8 %. První integrace s objednávkovým systémem ERP na 3 testovacích prodejnách.
- 5.–6. měsíc: TFT školení o top 500 nových SKU. A/B testování: 30 obchodů s ML vs 30 obchodů s tradičním systémem. Výsledek: 28% snížení odpadu v prodejnách ML.
- 7.–9. měsíc: Zavedení do všech 220 obchodů. Implementace motoru Markdown s integrací ESL na 80 vybavených prodejnách. Automatická aktivace daru prostřednictvím Food Bank.
- Měsíc 10–12: Optimalizace šablony, přidaná funkce sociálních médií (zmínky produkty na Instagramu/TikTok pro nové trendy). Implementace souboru TFT+XGBoost.
Výsledky měřeny za 12 měsíců
Obchodní KPI – výsledky po implementaci (12 měsíců)
| Metrický | Před ML | Po 12 měsících ML | Variace |
|---|---|---|---|
| Čerstvé produkty MAPE | 28,2 % | 9,1 % | -67,7 % |
| Míra odpadu z nakoupené hodnoty | 12,0 % | 7,8 % | -35,0 % |
| Míra vyprodání zásob (OOS) | 4,8 % | 2,9 % | -39,6 % |
| Darované produkty (tuny/rok) | 45 tun | 112 tun | +149 % (zákon o výhodách Gadda) |
| Efektivita markdown (% získaných výnosů) | 32 % | 71 % | +39 str |
| FTE věnované ruční kontrole objednávek | 3,0 FTE | 0,8 FTE | -73,3 % |
| Úspora nákladů na odpad + likvidace | - | 4,2 milionu EUR/rok | Nové ukládání |
| Daňové zvýhodnění darů (Gaddův zákon) | 85 tisíc €/rok | 210 tisíc €/rok | +147 % |
| ROI projektu (investice: 1,8 milionu EUR) | - | 234 % za 12 měsíců | Návratnost 5,2 měsíce |
Poučení z případové studie
- Kvalita historických dat je skutečným úzkým hrdlem: 40 % času Projekt byl věnován čištění a integraci dat. Historická POS data měla výrazné mezery v období dovolených a renovace prodejen.
- Ne všechny SKU si zaslouží stejný model: TFT pro 500 nejlepších SKU, Prophet pro středně pokročilé SKU (500-3000), klouzavý průměr pro long tail (>3000 SKU). Třístupňový přístup s optimálními poměry cena/přínos.
- Řízení změn je zásadní: kupující s 20+ letou zkušeností odolávali AI. Řešení: ukažte rozklad Proroka (trend + sezónnost + dovolená), což kupujícím umožňuje pochopit a ověřit logiku modelu.
- Markdown engine vyžaduje integraci ESL: bez elektronických regálových štítků, ruční aktualizace cen zrušila výhodu rychlosti reakce automatický systém.
- Nezbytné je neustálé sledování: modely časem degradují (posun konceptu) kvůli změnám v nákupním chování po COVID. Rekvalifikace automatické týdenní udržování stabilních výkonů.
Obchodní metriky pro prognózování poptávky ve FoodTech
Úspěch systému prognózování poptávky není měřen pouze MAPE. Je to nutné definovat rámec metrik, který pokrývá jak technickou kvalitu modelu, tak dopad skutečný byznys.
Metrický rámec: technický + obchodní
| Kategorie | Metrický | Vzorec / Definice | Typický cíl distribuce ve velkém měřítku |
|---|---|---|---|
| Model přesnosti | MAPE | Střední absolutní procentní chyba | < 15 % čerstvých produktů |
| RMSE | Root Mean Square Error (v jednotkách) | Záleží na objemu SKU | |
| Zaujatost | (Prognóza - Skutečná) / Skutečná, průměrná | |Zaujatost| < 5 % (žádné systematické nad/pod) | |
| WMAPE | MAPE vážený objemem prodeje | < 10 % (lepší než MAPE pro dlouhý ocas) | |
| Odpad | Míra snížení odpadu (WRR) | (Waste before - Waste after) / Waste before | > 30 % do 12 měsíců |
| Procento odpadu na nákupech | Promarněná hodnota / koupená hodnota | < 8 % (sektorové srovnání) | |
| Přesměrované tuny (dary) | Darované tuny / Celkový potenciální odpad | > 50 % přebytku | |
| Dostupnost | Míra vyprodání zásob (OOS) | % SKU se skladem / celkové SKU | < 3 % |
| Míra plnění | Dodané jednotky / objednané jednotky | > 97 % | |
| Days of Cover (DOC) | Aktuální zásoby / Průměrná denní poptávka | Optimální pro kategorii produktu | |
| Dynamické stanovování cen | Markdown Efficiency (ME) | Markdown výnosy / náklady Potenciální plýtvání | > 65 % |
| Produkty prodávané přes Markdown | % produktů s téměř exspirací prodaných se slevou | > 80 % ohrožených produktů | |
| Uplatněna průměrná sleva | Průměrná sleva na produkty se slevou | 25-45% (optimální pro elasticitu) |
Monitorovací řídicí panel: Detekce posunu konceptu
Modely prognózování potravin jsou obzvláště náchylné koncept drift: nákupní vzorce se mění podle sezóny v důsledku výživových trendů (např. nárůst veganských produktů), kvůli cenové krizi (inflace v letech 2022–2024 výrazně změnila nákupní chování v čerstvé produkty), pro mimořádné události (pandemie). Je nutné průběžně sledovat snížení výkonu.
# Monitoring sistema di rilevamento concept drift per food forecasting
import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftAlert:
severity: str # 'WARNING' | 'CRITICAL'
metric: str
current_value: float
baseline_value: float
change_pct: float
affected_skus: List[str]
recommendation: str
class ForecastMonitor:
def __init__(self, baseline_window_days: int = 30,
monitoring_window_days: int = 7):
self.baseline_window = baseline_window_days
self.monitoring_window = monitoring_window_days
self.thresholds = {
'mape_warning': 0.20, # MAPE > 20% = warning
'mape_critical': 0.35, # MAPE > 35% = critical, richiede retraining
'bias_warning': 0.10, # Bias sistematico > 10%
'bias_critical': 0.20, # Bias sistematico > 20%
'waste_rate_warning': 0.10, # Tasso spreco > 10%
}
def compute_current_metrics(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame) -> dict:
"""Calcola metriche sull'ultima finestra di monitoring"""
merged = predictions_df.merge(actuals_df, on=['date', 'sku_id', 'store_id'])
metrics_by_sku = merged.groupby('sku_id').apply(
lambda g: pd.Series({
'mape': np.mean(np.abs((g['actual'] - g['forecast']) / (g['actual'] + 1e-8))),
'bias': np.mean((g['forecast'] - g['actual']) / (g['actual'] + 1e-8)),
'rmse': np.sqrt(np.mean((g['actual'] - g['forecast']) ** 2)),
'n_observations': len(g)
})
)
return {
'mean_mape': metrics_by_sku['mape'].mean(),
'p90_mape': metrics_by_sku['mape'].quantile(0.9),
'mean_bias': metrics_by_sku['bias'].mean(),
'degraded_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_warning']
].index.tolist(),
'critical_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_critical']
].index.tolist(),
'per_sku_metrics': metrics_by_sku
}
def detect_drift(self, baseline_metrics: dict,
current_metrics: dict) -> List[DriftAlert]:
"""Confronta metriche correnti vs baseline e genera alert"""
alerts = []
# Alert MAPE
mape_change = ((current_metrics['mean_mape'] - baseline_metrics['mean_mape'])
/ baseline_metrics['mean_mape'])
if current_metrics['mean_mape'] > self.thresholds['mape_critical']:
alerts.append(DriftAlert(
severity='CRITICAL',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['critical_skus'],
recommendation='RETRAINING IMMEDIATO richiesto. Analizza concept drift.'
))
elif current_metrics['mean_mape'] > self.thresholds['mape_warning']:
alerts.append(DriftAlert(
severity='WARNING',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['degraded_skus'],
recommendation='Monitorare trend. Pianificare retraining prossima settimana.'
))
# Alert Bias sistematico
if abs(current_metrics['mean_bias']) > self.thresholds['bias_critical']:
direction = 'sovrastima' if current_metrics['mean_bias'] > 0 else 'sottostima'
alerts.append(DriftAlert(
severity='CRITICAL',
metric='Bias',
current_value=current_metrics['mean_bias'] * 100,
baseline_value=baseline_metrics.get('mean_bias', 0) * 100,
change_pct=0,
affected_skus=[],
recommendation=f'Bias sistematico di {direction}: aggiorna calibrazione modello.'
))
return alerts
def check_and_alert(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame,
baseline_metrics: dict) -> None:
"""Entry point principale del monitor"""
current = self.compute_current_metrics(predictions_df, actuals_df)
alerts = self.detect_drift(baseline_metrics, current)
for alert in alerts:
if alert.severity == 'CRITICAL':
logger.critical(
f'[DRIFT CRITICAL] {alert.metric}: {alert.current_value:.1f}% '
f'(baseline: {alert.baseline_value:.1f}%) - {alert.recommendation}'
)
# Qui si trigghera notifica Slack/PagerDuty
self._send_alert(alert)
else:
logger.warning(
f'[DRIFT WARNING] {alert.metric}: {alert.current_value:.1f}% - '
f'{len(alert.affected_skus)} SKU degradati'
)
def _send_alert(self, alert: DriftAlert) -> None:
"""Invia notifica al team ML Ops (implementazione dipende dal sistema)"""
# Integrazione con: Slack, PagerDuty, email, Prometheus Alertmanager
pass
Osvědčené postupy a anti-vzory v prognózování poptávky po potravinách
Anti-vzory, kterým je třeba se vyhnout
- Datum dočasného úniku: místo toho použijte standardní křížovou validaci dopředná validace zavádí budoucí data do tréninku. A nejčastější metodická chyba a vede k optimistickým MAPE o 30-50 %, které se při výrobě ukážou jako nepravdivé.
- Jediná šablona pro všechny SKU: produkty s přerušovanou poptávkou (prodej 2-3krát týdně) vyžadují různé modely z velkoobjemových produktů. Použít LSTM na produkt s 200 prodeji/rok vede k přemontování a MAPE o 60 %+.
- Ignorujte obchodní zaujatost: maloobchodníci historicky inklinovali k nadměrnému objednávání vyhnout se zásobám (nejhorší z vnímaných „dvou zel“). Pokud je model trénovaný na historických datech objednávek (nikoli skutečné poptávce), zdědí toto zkreslení. Použijte údaje o prodeji efektivní, nikoli příkazů.
- Nemodelujte budoucí propagační akce: prognózování, které nezná propagační akce plánovaná na následující týden poskytuje systematicky nesprávné předpovědi v obdobích propagační. Vždy integrujte propagační kalendář jako vstup.
- Optimalizujte pouze MAPE bez ohledu na směr chyby: v kontextu jídlo, chyba nadměrného odhadu 50 jednotek (50 jednotek plýtvání) je mnohem dražší chyba pod prognózou 50 jednotek (50 ztracených prodejů). Použijte asymetrické metriky (Weighted MAPE) nebo funkce asymetrické ztráty v tréninku.
Konsolidované osvědčené postupy
- Stratifikujte SKU ošetřením: klasifikuje každou SKU podle objemu, variability a trvanlivost. Aplikujte různé modely na různé clustery. Hierarchický přístup (globální + místní) často překonává oba čisté modely.
- Vždy obsahuje odhad nejistoty: přesná předpověď (jedno číslo) a méně užitečné než pravděpodobnostní předpověď (rozložení nebo intervaly). TFT a Prophet poskytují nativně kvantily, které jsou zásadní pro výpočet optimální bezpečnostní zásoby.
- Automatizujte rekvalifikaci, ale sledujte anomálie: denní rekvalifikace a užitečné, ale může zesílit chyby, pokud byla včerejší data anomální (např. selhání POS). Před uvedením nového modelu do výroby vždy proveďte kontrolu zdravého rozumu.
- Budujte důvěru u kupujících: Systémy ML často selhávají kvůli odporu organizační, nikoli pro technickou kvalitu. Ukažte rozklad modelu, vysvětlete předpovědi v obchodním jazyce, ponechává kupujícím možnost přepsat a zpětnovazební smyčka, která zlepšuje model.
- Průběžné měření ROI: vypočítat úspory odpadu na týdenní bázi vyhnout se, zotavení ze snížení, daňové výhody Gadda Law. Zviditelnit hodnotu vytvořené a zásadní pro udržení organizačního závazku v průběhu času.
Závěry: Prognózování jako nástroj udržitelnosti
Prognóza poptávky se strojovým učením v potravinářském sektoru je jedním z případů použití AI nejlepší poměr dopad/složitost, jaký je dnes k dispozici. Problém je dobře definován, data existují (každý prodejce má letitou historii POS), metriky úspěchu jsou jasné (MAPE, míra odpadu, návratnost investic) a dopad přesahuje rámec podnikání: snížit plýtvání potravinami a ekonomický, regulační a environmentální imperativ.
Navrhovaná cesta pro italský rozsáhlý maloobchod, který chce začít v roce 2025, je pragmatická: začít s Prophet nebo XGBoost na podmnožině 50 velkoobjemových, rychle se kazících SKU, změřte delta MAPE a míra snížení odpadu po 30 dnech, sestavení interního obchodního případu, a poté postupně škálovat směrem k TFT pro špičkové SKU a směrem k integraci markdown ESL motory a systémy.
Konečným cílem není mít co nejpřesnější model, ale rozhodovací ekosystém která mění předpovědi v činy: optimální objednávky, adaptivní ceny, dary automatické. V tomto ekosystému je strojové učení a infrastruktura umožňující, ale snížení odpadu a konkrétní obchodní výsledek, který ospravedlní každou investici.
Pokračujte v sérii FoodTech
V dalších článcích série zkoumáme doplňkové technologie pro předpovídání poptávky:
- Článek 9 – Satelitní API a přesné zemědělství: jako satelitní data NDVI a Sentinel-2 krmí modely předpovědi zemědělských výnosů a uzavírají smyčku od výroby až po distribuci.
- Článek 10 – ML Edge pro monitorování plodin: vložený závěr na IoT zařízení na poli pro sledování stavu plodin v reálném čase, s integrace do potrubí dodavatelského řetězce.
Zjistěte více o technologiích MLOps, které umožňují produkční nasazení prognostických modelů, nahlédněte také do řady MLOps for Business: Modely umělé inteligence ve výrobě s MLflow.







