Prognoza cererii pentru reducerea deșeurilor: ML și serii cronologice în FoodTech
În fiecare an, lumea produce cca 1,05 miliarde de tone de alimente risipite, conform Raportul UNEP privind indicele deșeurilor alimentare 2024. Dintre aceștia, 60% provin din familii, 28% din restaurante și 12% din distribuția pe scară largă. Tradus în termeni economici: dincolo 1 trilion de dolari ars în fiecare an, cu un impact asupra mediului care generează 10% din emisiile globale de gaze sera, de aproape cinci ori mai mult decât întreaga aviație mondială.
Cu toate acestea, în mod paradoxal, în timp ce un miliard de porții de alimente sunt irosite în fiecare zi, 783 milioane de oameni suferă de foame. Rădăcina problemei nu este doar culturală sau logistică: and basically a problem of prognoza cererii. Comenzi comerciale cu amănuntul la scară largă prea multe Nu riscați epuizarea stocurilor. Furnizorii produc în exces pentru a fi în siguranță. Rezultatul este că fiecare link al lanțului trofic acumulează tampoane care devin deșeuri.
În Italia, Legea Gadda (n. 166/2016) a introdus stimulente fiscale pentru donație a surplusurilor alimentare și a procedurilor simplificate de recuperare. La nivel european, strategia De la fermă la furculiță își propune să reducă la jumătate risipa alimentară până în 2030, cu obiective obligatorii pentru comercianții cu amănuntul și industria alimentară. Reglementarea creează urgență, dar Machine Learning dă rezultate instrumentele concrete pentru atingerea acestor obiective.
Prognoza cererii cu ML nu este o teorie: comercianții cu amănuntul care au implementat modelele LSTM și Temporal Fusion Transformatoarele raportează reduceri ale MAPE de la 28% tipice metodelor tradiționale la 5-15%, rezultând în Reducerea deșeurilor cu 25-40% și rentabilitatea investiției măsurabilă în 12 luni. În acest articol vom construi o conductă completă, de la date brute până la modelul de producție, analizând fiecare alegere arhitecturală cu cod Python funcțional.
Ce veți învăța în acest articol
- Cadrul economic și de reglementare pentru risipa alimentară în 2025
- Modele statistice clasice: ARIMA, SARIMA, Holt-Winters și Prophet cu cod Python
- Deep Learning pentru serii temporale: LSTM, GRU, Transformator de fuziune temporală și N-BEATS
- Inginerie avansată a caracteristicilor: variabile exogene, codificare ciclică, caracteristici de lag
- Conductă ML completă de la capăt la capăt cu validare directă
- Benchmark comparativ pe setul de date reale: MAPE, RMSE, MAE, timpul de antrenament
- Integrarea prognozelor cu gestionarea stocurilor și stabilirea prețurilor dinamice
- Algoritm de optimizare Markdown pentru produse aproape de expirare
- Studiu de caz în comerțul cu amănuntul italian: peste 200 de puncte de vânzare, reducerea deșeurilor cu 35%
- Măsuri de afaceri: Rata de reducere a deșeurilor, Acuratețea prognozei, Eficiența reducerii
Seria FoodTech: 10 articole despre IA și tehnologie în industria alimentară
| # | Articol | Concentrează-te |
|---|---|---|
| 1 | Introducere în FoodTech | Privire de ansamblu asupra ecosistemului și tehnologii cheie |
| 2 | IoT și senzori în lanțul alimentar | Conductă de date de la câmp la nor |
| 3 | Viziune computerizată pentru calitate și inspecție | Clasificarea defectelor și gradarea automată |
| 4 | Blockchain și trasabilitate | Siguranța alimentară de la fermă la furculiță |
| 5 | Agricultura verticală și AgriTech | Cultivare controlată cu ML |
| 6 | Optimizarea lanțului de aprovizionare | Logistica, lanțul de frig și transparență |
| 7 | Tabloul de bord pentru managementul fermei | Monitorizarea în timp real a companiilor agricole |
| 8 | Sunteți aici - Prognoza cererii și reducerea deșeurilor | Serii cronologice ML, LSTM, TFT, prețuri dinamice |
| 9 | Satelit API și Precision Agriculture | NDVI, teledetecție, monitorizarea culturilor |
| 10 | ML Edge pentru monitorizarea culturilor | Inferență încorporată pe dispozitive de teren |
Problema risipei alimentare: date și reglementări 2025
Pentru a înțelege de ce prognoza cererii cu ML a devenit o prioritate strategică în FoodTech, trebuie să pleci de la numerele reale. The Raportul UNEP privind indicele deșeurilor alimentare 2024 dezvăluie că în 2022 (anul trecut cu date complete) au fost irosite 1,05 miliarde de tone de alimente, egal cu 132 kg de persoană pe an și aproape o cincime din totalul alimentelor disponibile consumatorilor.
Impactul global al risipei alimentare (UNEP 2024)
| Indicator | Valoare | Sursă |
|---|---|---|
| Risip alimentar anual | 1,05 miliarde de tone | Indicele deșeurilor alimentare UNEP 2024 |
| Valoarea economică pierdută | ~1 trilion USD/an | Estimare FAO/Banca Mondială |
| % emisii de GES | 8-10% global | UNEP 2024 |
| Deșeuri pe cap de locuitor | 132 kg/persoana/an | UNEP 2024 |
| Ponderea comerțului cu amănuntul în comerțul cu amănuntul la scară largă | 12% din totalul risipit | UNEP 2024 |
| Cota familie | 60% din total | UNEP 2024 |
| Cota de servicii alimentare | 28% din total | UNEP 2024 |
| Pierderi înainte de vânzarea cu amănuntul (lanțul de aprovizionare) | 13% din alimentele produse | FAO |
Cadrul de reglementare: de la Legea Gadda la Farm to Fork
Italia a fost un pionier în Europa cu Legea 166/2016 (Legea Gadda), care are a introdus o abordare plină de satisfacții, mai degrabă decât punitivă, a risipei alimentare. Punctele cheie sunt:
- Stimulente fiscale pentru cei care donează surplusuri alimentare: echivalent cu distrugere pentru calculul IRPEF/IRES
- Simplificarea birocratică: eliminarea taxelor documentare pentru donații până la la 15.000 euro/an
- Promovarea „doggy bag” in restaurante si vanzarea produselor in apropiere termen limită cu reduceri transparente
- Educația alimentară în şcoli ca măsură structurală de prevenire
La nivel european, Strategia de la fermă la furculiță (parte din Green Deal) stabilește ținte obligatoriu: reducerea cu 50% a deșeurilor la nivel de vânzare cu amănuntul și de consum până în 2030, cu măsuri obligatoriu pentru comerțul cu amănuntul la scară largă care va intra treptat în vigoare începând cu 2025-2026. Pentru i retailer cu peste 250 de angajați, raportarea anuală privind risipa alimentară devine obligatorie.
Impactul reglementărilor asupra strategiilor corporative
Combinația de stimulente (Legea Gadda), obligații de raportare (Fermă la furculiță) și presiunea din partea consumatorii creează un caz de afaceri clar pentru a investi în prognoza cererii cu ML. Nu da totul ține de sustenabilitate: pentru un retailer cu 200 de magazine și marje de 2-3%, reduceți 35% deșeuri echivalează cu recuperarea a 50-80 de puncte de bază din marja operațională.
De ce este dificilă prognoza cererii de alimente
Prognoza cererii în industria alimentară prezintă provocări unice care o fac una dintre cele probleme mai complexe în învățarea automată aplicată afacerilor. Caracteristicile structurale sunt cinci care îl diferențiază de alte sectoare de retail.
1. Perisabilitate și fereastră de vânzare îngustă
Un articol de îmbrăcăminte nevândut săptămâna aceasta poate fi vândut săptămâna viitoare. O salată proaspat nr. Produsele alimentare proaspete au termen de valabilitate de la 1 la 14 zile, care înseamnă că excesul de eroare de predicție se traduce direct în deșeuri fizice irecuperabile. Această asimetrie a costurilor de eroare (costul supraprognozării depășește adesea costul sub-prognoză) necesită modele care minimizează în mod specific eroarea ascendentă.
2. Sezonalitate pe mai multe niveluri
Serii de timp alimentare prezintă sezonalitate simultană la mai multe frecvențe:
- Sezonalitate zilnică: vânzări de vineri seara diferite de marți dimineața
- Sezonalitate săptămânală: modele de cumpărare diferite pentru fiecare zi a săptămânii
- Sezonalitate lunară: Efecte de început/sfârșit de lună legate de ciclurile salariale
- Sezonalitate anuală: vara vs iarna pentru produse de sezon
- Sezonul sărbătorilor: Crăciun, Paște, mijlocul lunii august cu vârfuri bruște
Modelele tradiționale ARIMA gestionează o singură componentă sezonieră. Modele moderne ca Prophet și TFT gestionează mai multe sezoane nativ.
3. Variabile exogene non-staționare
Cererea de alimente este puternic influențată de factori externi care nu urmează modele regulate: conditiile meteo (o saptamana de ploaie creste vanzarile de supa cu 40%), campanii promotionale (un fluturaș promoțional poate tripla temporar cererea), evenimente locale (meciuri de fotbal, târguri), prețurile concurenților și tendințele rețelelor sociale. Încorporați aceste variabile exogene în mod corespunzător eficient și diferența dintre un model mediocru și un model excelent.
4. Coada lungă a SKU-urilor cu date slabe
Un comerț cu amănuntul obișnuit la scară largă gestionează 15.000-30.000 de SKU-uri active. 20% din SKU generează 80% din vânzări, dar restul de 80% au serii temporale rare, intermitente, cu multe zerouri. Pentru acestea modelele standard ale produselor eșuează și sunt necesare abordări specializate, cum ar fi metoda Croston, modele zero sau transferați învățarea de la SKU-uri similare.
5. Erori tipice fără ML
Il MAPE tipic (Eroare procentuală medie absolută). în sistemele tradiţionale de prognoză bazate pe medii mobile, reguli manuale sau sisteme ERP fără ML oscilează între 20% și 40% pentru produse proaspete. Introducerea modelelor ML reduce această eroare la 5-15%, cu LSTM care demonstrează MAPE de 16,43% față de 28,76% din metodele tradiționale în benchmark-uri recente, o reducere de 43%.
Modele statistice clasice: ARIMA, SARIMA, Holt-Winters și Prophet
Înainte de a trece la deep learning, este esențial să înțelegem modelele statistice clasice. Nu pentru că sunt învechite, dar pentru că reprezintă adesea linia de bază de batut, sunt interpretabile, ușoare din punct de vedere computațional și în unele contexte (date puține, produse simple) competitive cu abordări mai complexe.
ARIMA și SARIMA: Fundamentele previziunii
ARIMA (AutoRegressive Integrated Moving Average) este cel mai utilizat model statistic pe serie scale de timp univariate. Modelul ARIMA(p,d,q) combină trei componente: AutoRegresie (p lags del valoarea trecută), integrare (diferențe pentru a face seria staționară) și medie mobilă (q decalaje ale erorilor reziduale). SARIMA adaugă o componentă sezonieră (P,D,Q).
# SARIMA per forecasting vendite settimanali - prodotto fresco
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_percentage_error
# Caricamento dati di esempio
# Formato: date index, colonna 'sales' con unita vendute
df = pd.read_csv('vendite_insalata.csv', index_col='date', parse_dates=True)
df = df.asfreq('D') # Frequenza giornaliera
df['sales'] = df['sales'].fillna(0)
# Test di stazionarieta (ADF Test)
result = adfuller(df['sales'].dropna())
print(f'ADF Statistic: {result[0]:.4f}')
print(f'p-value: {result[1]:.4f}')
print(f'La serie e {"stazionaria" if result[1] < 0.05 else "non stazionaria"}')
# Split train/test (80/20 con holdout degli ultimi 30 giorni)
train_size = len(df) - 30
train = df.iloc[:train_size]
test = df.iloc[train_size:]
# Modello SARIMA(1,1,1)(1,1,1)7 - stagionalita settimanale
# p=1, d=1, q=1: componenti ARIMA
# P=1, D=1, Q=1, s=7: componenti stagionali settimanali
model = SARIMAX(
train['sales'],
order=(1, 1, 1),
seasonal_order=(1, 1, 1, 7), # s=7 per stagionalita settimanale
enforce_stationarity=False,
enforce_invertibility=False
)
results = model.fit(disp=False)
print(results.summary())
# Previsione sullo stesso periodo del test set
forecast = results.forecast(steps=len(test))
forecast = np.maximum(forecast, 0) # Nessun valore negativo
# Metriche
mape = mean_absolute_percentage_error(test['sales'], forecast) * 100
rmse = np.sqrt(np.mean((test['sales'].values - forecast.values) ** 2))
mae = np.mean(np.abs(test['sales'].values - forecast.values))
print(f'\nMetriche SARIMA:')
print(f' MAPE: {mape:.2f}%')
print(f' RMSE: {rmse:.2f} unita')
print(f' MAE: {mae:.2f} unita')
# Visualizzazione
plt.figure(figsize=(12, 5))
plt.plot(train['sales'][-60:], label='Train (ultimi 60gg)')
plt.plot(test['sales'], label='Reale', color='green')
plt.plot(test.index, forecast, label='Previsione SARIMA', color='red', linestyle='--')
plt.title('SARIMA - Previsione Vendite Prodotto Fresco')
plt.legend()
plt.tight_layout()
plt.savefig('sarima_forecast.png', dpi=150)
plt.show()
Profet: Prognoza interpretabilă cu variabile exogene
Profet (Meta/Facebook, 2017) și o alegere excelentă pentru prognoza alimentară deoarece gestionează nativ: sezoane multiple (zilnic, săptămânal, anual), vacanță efecte cu calendar personalizabil, tendințe neliniare cu puncte de schimbare automate și variabile exogene (regresori suplimentari). Interpretabilitatea sa (descompunere automată în tendință + sezonalitate + vacanță) îl face apreciat chiar și de utilizatorii netehnici.
# Prophet per forecasting con variabili esogene (meteo, promozioni)
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
import pandas as pd
import numpy as np
# Prophet richiede colonne 'ds' (datetime) e 'y' (target)
df_prophet = df.reset_index().rename(columns={'date': 'ds', 'sales': 'y'})
# Aggiunta variabili esogene (regressori)
# Esempio: temperatura media giornaliera e flag promozionale
df_prophet['temperature'] = meteo_df['temp_media'] # gradi Celsius
df_prophet['is_promotion'] = promo_df['active_promo'].astype(int)
# Calendario festivo italiano (personalizzato)
holidays_it = pd.DataFrame({
'holiday': [
'Natale', 'Capodanno', 'Pasqua', 'Ferragosto',
'Festa Repubblica', 'Ognissanti'
],
'ds': pd.to_datetime([
'2024-12-25', '2025-01-01', '2025-04-20', '2025-08-15',
'2025-06-02', '2025-11-01'
]),
'lower_window': [-3, -1, -3, -5, -1, -1], # Giorni prima
'upper_window': [3, 2, 2, 2, 1, 1] # Giorni dopo
})
# Configurazione modello
model = Prophet(
holidays=holidays_it,
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='multiplicative', # Meglio per dati con forte trend
changepoint_prior_scale=0.05, # Flessibilità del trend
seasonality_prior_scale=10.0 # Forza delle componenti stagionali
)
# Aggiunta regressori
model.add_regressor('temperature', standardize=True)
model.add_regressor('is_promotion', standardize=False)
# Training
train_prophet = df_prophet.iloc[:train_size]
model.fit(train_prophet)
# Previsione (con valori futuri dei regressori)
future = model.make_future_dataframe(periods=30)
future['temperature'] = future_meteo_df['temp_media']
future['is_promotion'] = future_promo_df['active_promo'].astype(int)
forecast_prophet = model.predict(future)
# Cross-validation interna (walk-forward)
df_cv = cross_validation(
model,
initial='365 days', # Periodo di training iniziale
period='30 days', # Frequenza di re-fitting
horizon='14 days' # Orizzonte di previsione
)
metrics_cv = performance_metrics(df_cv)
print(f'Prophet MAPE (CV): {metrics_cv["mape"].mean()*100:.2f}%')
# Visualizzazione decomposizione
fig = model.plot_components(forecast_prophet)
fig.savefig('prophet_components.png', dpi=150)
print('Componenti Prophet salvate: trend + stagionalita + holiday effects')
Când să folosiți modelele statistice clasice
| Model | Puncte forte | Limitări | Caz de utilizare ideal |
|---|---|---|---|
| ARIMA/SARIMA | Serii interpretabile, rapide, staţionare | O singură sezonalitate, fără variabile exogene | Produse stabile, date puține |
| Holt-Winters | Gestionează tendințele + sezonalitate, simplitate | Nu se ocupă de valori aberante, de sezonalitate fixă | Serii cu tendință liniară și sezonalitate stabilă |
| Profet | Multisezonalitate, vacanță, regresori | Nu este ideal pentru seriale foarte neregulate | Produse cu un puternic efect festiv/promoțional |
| LSTM/GRU | Modele complexe, multivariate, de lungă durată | Sunt necesare date abundente, cutie neagră | SKU-uri de volum mare, multe variabile exogene |
| TFT | Interpretabil + DL, multi-orizont, atenție | Greu din punct de vedere computațional, este necesar un GPU | Prognoza centralizată pentru multe SKU-uri |
Deep Learning pentru Time-Series: LSTM, GRU, TFT și N-BEATS
Modelele de învățare profundă au revoluționat prognoza cererii începând din 2018-2020. Superioritatea lor reiese mai ales în prezența: multor variabile exogene corelate, modele dependențe complexe neliniare, pe termen lung și volume mari de date multi-SKU care vă permit să exploatați transferul de învățare între produse similare.
LSTM și GRU: Memorie selectivă în secvențe
Le Memoria pe termen lung pe termen scurt (LSTM) iar cel Unități recurente închise (GRU) sunt rețele recurente concepute pentru a captura dependențe pe distanță lungă în secvențe temporale. LSTM folosește trei porți (intrare, uitare, ieșire) pentru a decide ce informații să păstreze sau să renunțe în memoria celulară. GRU simplifică cu două porți (resetare, actualizare) obținând performanțe similare cu mai puțini parametri.
# LSTM multi-variate per demand forecasting alimentare
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_percentage_error
# ---- Preparazione dati ----
class FoodDemandDataset(torch.utils.data.Dataset):
def __init__(self, data, seq_len=14, pred_len=7):
self.seq_len = seq_len
self.pred_len = pred_len
self.data = torch.FloatTensor(data)
def __len__(self):
return len(self.data) - self.seq_len - self.pred_len + 1
def __getitem__(self, idx):
x = self.data[idx: idx + self.seq_len]
y = self.data[idx + self.seq_len: idx + self.seq_len + self.pred_len, 0]
return x, y # x: (seq_len, features), y: (pred_len,)
# ---- Architettura LSTM ----
class LSTMForecaster(nn.Module):
def __init__(self, input_size, hidden_size=128, num_layers=2,
pred_len=7, dropout=0.2):
super(LSTMForecaster, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.pred_len = pred_len
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, pred_len)
def forward(self, x):
# x shape: (batch, seq_len, input_size)
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.dropout(out[:, -1, :]) # Ultimo time step
out = self.fc(out) # (batch, pred_len)
return out
# ---- Feature preparation ----
def prepare_features(df):
"""Crea feature matrix multi-variate per LSTM"""
features = pd.DataFrame()
features['sales'] = df['sales']
# Lag features: vendite passate come input esplicito
for lag in [1, 2, 3, 7, 14]:
features[f'sales_lag_{lag}'] = df['sales'].shift(lag)
# Rolling statistics
features['sales_rolling_mean_7d'] = df['sales'].rolling(7).mean()
features['sales_rolling_std_7d'] = df['sales'].rolling(7).std()
features['sales_rolling_mean_14d'] = df['sales'].rolling(14).mean()
# Feature temporali cicliche (encoding sinusoidale)
features['day_sin'] = np.sin(2 * np.pi * df.index.dayofweek / 7)
features['day_cos'] = np.cos(2 * np.pi * df.index.dayofweek / 7)
features['month_sin'] = np.sin(2 * np.pi * df.index.month / 12)
features['month_cos'] = np.cos(2 * np.pi * df.index.month / 12)
# Variabili esogene (meteo, promozioni)
features['temperature'] = df['temperature']
features['is_weekend'] = (df.index.dayofweek >= 5).astype(int)
features['is_holiday'] = df['is_holiday'].astype(int)
features['is_promotion'] = df['is_promotion'].astype(int)
features = features.dropna()
return features
# ---- Training loop ----
def train_lstm_model(train_data, val_data, config):
model = LSTMForecaster(
input_size=config['input_size'],
hidden_size=config['hidden_size'],
num_layers=config['num_layers'],
pred_len=config['pred_len']
)
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.5
)
criterion = nn.HuberLoss(delta=1.0) # Robusto agli outlier
train_loader = torch.utils.data.DataLoader(
FoodDemandDataset(train_data, config['seq_len'], config['pred_len']),
batch_size=config['batch_size'],
shuffle=True
)
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(config['epochs']):
model.train()
train_loss = 0
for x_batch, y_batch in train_loader:
optimizer.zero_grad()
y_pred = model(x_batch)
loss = criterion(y_pred, y_batch)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
# Validazione
model.eval()
with torch.no_grad():
val_dataset = FoodDemandDataset(val_data, config['seq_len'], config['pred_len'])
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32)
val_loss = sum(criterion(model(x), y).item() for x, y in val_loader)
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_lstm_model.pt')
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= config['early_stopping_patience']:
print(f'Early stopping all\'epoca {epoch}')
break
if epoch % 10 == 0:
print(f'Epoch {epoch}: Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}')
# Carica modello migliore
model.load_state_dict(torch.load('best_lstm_model.pt'))
return model
# Configurazione
config = {
'input_size': 15, # Numero di feature (sales + lag + temporal + exog)
'hidden_size': 128,
'num_layers': 2,
'pred_len': 7, # Previsione a 7 giorni
'seq_len': 14, # Lookback di 14 giorni
'batch_size': 32,
'lr': 0.001,
'epochs': 100,
'early_stopping_patience': 15
}
Transformator de fuziune temporală: interpretabilitate + putere
Il Transformator de fuziune temporală (TFT) de Google DeepMind (2021) și în prezent considerat stadiul tehnicii pentru prognoza cererii întreprinderilor. Arhitectura sa se combină mecanisme de atenție cu mai multe capete cu porți reziduale (GRN) care gestionează ambele variabile statice (tip de produs, categorie de produs) și variabile temporale cunoscute dinainte (calendar, promoții planificate) și observate (vânzări anterioare, vreme).
În 2024, evidențierea setului de date M5 (30.490 de serii temporale din 10 magazine Walmart), TFT și modelele MASE-urile bazate pe transformatoare au demonstrat îmbunătățiri 26-29% si reducerea al WQL al 34% comparativ cu metoda naiva sezonieră. Picnic, comerciantul de produse alimentare Olandeza online, a adoptat TFT ca model principal pentru prognoza cererii, publicare rezultate detaliate în producție.
# Temporal Fusion Transformer con PyTorch Forecasting
# pip install pytorch-forecasting pytorch-lightning
from pytorch_forecasting import TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import QuantileLoss
import pytorch_lightning as pl
import pandas as pd
import torch
# ---- Preparazione dati nel formato TFT ----
def prepare_tft_dataset(df):
"""
df deve avere: date, store_id, product_id, sales,
temperature, is_holiday, is_promotion, price, category
"""
df = df.copy()
df['time_idx'] = (df['date'] - df['date'].min()).dt.days
df['month'] = df['date'].dt.month.astype(str)
df['day_of_week'] = df['date'].dt.dayofweek.astype(str)
max_prediction_length = 7 # Previsione 7 giorni
max_encoder_length = 28 # Lookback 28 giorni
training_cutoff = df['time_idx'].max() - max_prediction_length
training_dataset = TimeSeriesDataSet(
df[df.time_idx <= training_cutoff],
time_idx='time_idx',
target='sales',
group_ids=['store_id', 'product_id'],
# Variabili statiche (non cambiano nel tempo per ogni gruppo)
static_categoricals=['store_id', 'product_id', 'category'],
# Variabili temporali note in anticipo
time_varying_known_categoricals=['month', 'day_of_week', 'is_holiday'],
time_varying_known_reals=['time_idx', 'is_promotion', 'price'],
# Variabili osservate (disponibili solo per il passato)
time_varying_unknown_reals=['sales', 'temperature'],
min_encoder_length=max_encoder_length // 2,
max_encoder_length=max_encoder_length,
min_prediction_length=1,
max_prediction_length=max_prediction_length,
target_normalizer=GroupNormalizer(
groups=['store_id', 'product_id'],
transformation='softplus' # Mantiene valori positivi
),
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
)
return training_dataset, training_cutoff
# ---- Modello TFT ----
def build_tft_model(training_dataset):
tft = TemporalFusionTransformer.from_dataset(
training_dataset,
learning_rate=0.03,
hidden_size=64, # Dimensione hidden layer
attention_head_size=4, # Numero attention heads
dropout=0.1,
hidden_continuous_size=16, # Feature continue
loss=QuantileLoss(), # Previsione probabilistica
log_interval=10,
reduce_on_plateau_patience=4,
)
print(f'Numero parametri: {tft.size()/1e3:.1f}k')
return tft
# ---- Training con PyTorch Lightning ----
def train_tft(training_dataset, validation_dataset):
train_loader = training_dataset.to_dataloader(
train=True, batch_size=128, num_workers=4
)
val_loader = validation_dataset.to_dataloader(
train=False, batch_size=128, num_workers=4
)
trainer = pl.Trainer(
max_epochs=30,
accelerator='gpu' if torch.cuda.is_available() else 'cpu',
gradient_clip_val=0.1,
callbacks=[
pl.callbacks.EarlyStopping(
monitor='val_loss', patience=5, mode='min'
),
pl.callbacks.ModelCheckpoint(
monitor='val_loss', save_top_k=1
)
]
)
tft_model = build_tft_model(training_dataset)
trainer.fit(tft_model, train_loader, val_loader)
# Interpretabilita: variable importance
best_model = TemporalFusionTransformer.load_from_checkpoint(
trainer.checkpoint_callback.best_model_path
)
raw_predictions, x = best_model.predict(
val_loader, mode='raw', return_x=True
)
# Feature importance - chi contribuisce di più alle previsioni
interpretation = best_model.interpret_output(
raw_predictions, reduction='sum'
)
print('Feature importance:')
for key, vals in interpretation.items():
print(f' {key}: {vals}')
return best_model
N-BEATS: Extinderea bazei neuronale fără arhitecturi recurente
N-BEATS (Element AI, 2020) și o abordare radical diferită: utilizați numai straturi complet conectat organizat în stive și blocuri, fără circumvoluții sau mecanisme de atenție. Fiecare bloc descompune semnalul în expansiune de bază (tendință, sezonalitate) și reziduuri. Pe M4 Setul de date de concurență a depășit toate metodele statistice cu 11% și, de asemenea, hibridul neural-statistic câștigător cu 3%. Pentru produsele alimentare cu o puternică sezonalitate, versiunea interpretabilă a N-BEATS produce descompuneri de tendințe/sezoniere pe care analiștii de afaceri le apreciază.
Inginerie caracteristică pentru prognoza cererii de alimente
Calitatea ingineriei caracteristicilor este adesea cel mai critic factor în performanța modelului. Un LSTM cu o inginerie excelentă a caracteristicilor depășește un TFT cu o inginerie mediocră a caracteristicilor. Să vedem principalele categorii de caracteristici care trebuie construite pentru contextul alimentar.
Codificare ciclică pentru variabile de timp
O greșeală comună este tratarea zilei săptămânii ca o valoare întreagă (0-6). Problema este că modelul nu înțelege că ziua 6 (duminică) este „aproape” de ziua 0 (luni). Codificare ciclic cu sinus și cosinus rezolvă această problemă.
# Feature engineering completo per food demand forecasting
import pandas as pd
import numpy as np
from typing import List, Dict
def create_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
"""Crea feature temporali cicliche e categoriche"""
df = df.copy()
idx = df.index
# Encoding ciclico - preserva la ciclicita (es. lunedi vicino a domenica)
df['hour_sin'] = np.sin(2 * np.pi * idx.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * idx.hour / 24)
df['dow_sin'] = np.sin(2 * np.pi * idx.dayofweek / 7)
df['dow_cos'] = np.cos(2 * np.pi * idx.dayofweek / 7)
df['month_sin'] = np.sin(2 * np.pi * idx.month / 12)
df['month_cos'] = np.cos(2 * np.pi * idx.month / 12)
df['doy_sin'] = np.sin(2 * np.pi * idx.dayofyear / 365.25)
df['doy_cos'] = np.cos(2 * np.pi * idx.dayofyear / 365.25)
# Feature binarie utili per GDO italiana
df['is_weekend'] = (idx.dayofweek >= 5).astype(int)
df['is_monday'] = (idx.dayofweek == 0).astype(int)
df['is_friday'] = (idx.dayofweek == 4).astype(int)
# Posizione nel mese (utile per effetti stipendio)
df['day_of_month'] = idx.day
df['is_month_start'] = (idx.day <= 5).astype(int)
df['is_month_end'] = (idx.day >= 25).astype(int)
# Settimana dell'anno
df['week_of_year'] = idx.isocalendar().week.astype(int)
df['quarter'] = idx.quarter
return df
def create_lag_features(df: pd.DataFrame, target_col: str,
lags: List[int] = None) -> pd.DataFrame:
"""Crea lag features del target"""
if lags is None:
lags = [1, 2, 3, 7, 14, 21, 28]
df = df.copy()
for lag in lags:
df[f'{target_col}_lag_{lag}d'] = df[target_col].shift(lag)
return df
def create_rolling_features(df: pd.DataFrame, target_col: str,
windows: List[int] = None) -> pd.DataFrame:
"""Crea rolling statistics"""
if windows is None:
windows = [3, 7, 14, 28]
df = df.copy()
for window in windows:
df[f'{target_col}_roll_mean_{window}d'] = (
df[target_col].shift(1).rolling(window).mean()
)
df[f'{target_col}_roll_std_{window}d'] = (
df[target_col].shift(1).rolling(window).std()
)
df[f'{target_col}_roll_min_{window}d'] = (
df[target_col].shift(1).rolling(window).min()
)
df[f'{target_col}_roll_max_{window}d'] = (
df[target_col].shift(1).rolling(window).max()
)
# Exponential moving average
df[f'{target_col}_ema_{window}d'] = (
df[target_col].shift(1).ewm(span=window).mean()
)
return df
def create_weather_features(df: pd.DataFrame,
meteo_df: pd.DataFrame) -> pd.DataFrame:
"""
Integra variabili meteorologiche
meteo_df: DataFrame con colonne [date, temp_max, temp_min, precipitazione_mm,
umidita, radiazione_solare, vento_kmh]
"""
df = df.merge(meteo_df, left_index=True, right_on='date', how='left')
# Feature derivate dal meteo
df['temp_delta'] = df['temp_max'] - df['temp_min']
df['is_hot'] = (df['temp_max'] > 28).astype(int) # Ondata di caldo
df['is_cold'] = (df['temp_min'] < 5).astype(int) # Freddo invernale
df['is_rain'] = (df['precipitazione_mm'] > 1).astype(int)
# Interazioni meteo x prodotto (es. gelati vanno bene con caldo)
# Necessità di parametrizzazione per categoria prodotto
df['heat_wave_consecutive'] = (
df['is_hot'].rolling(3).sum() == 3
).astype(int)
return df
def create_promotion_features(df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""
Crea feature da calendario promozionale
promo_df: colonne [date, sku_id, discount_pct, is_volantino, is_digital_promo]
"""
df = df.merge(promo_df, left_index=True, right_on='date', how='left')
df['discount_pct'] = df['discount_pct'].fillna(0)
df['is_promotion'] = (df['discount_pct'] > 0).astype(int)
df['is_deep_discount'] = (df['discount_pct'] > 0.3).astype(int)
# Effetto anticipazione promozione (pre-promo dip)
df['promo_lag_1'] = df['is_promotion'].shift(1).fillna(0)
df['promo_lead_1'] = df['is_promotion'].shift(-1).fillna(0) # Solo in training
# Effetto post-promozione (inventory loading)
df['days_since_last_promo'] = (
df['is_promotion']
.pipe(lambda s: s.where(s.eq(1)).ffill().index - s.index)
.dt.days
.clip(upper=30)
)
return df
def create_price_features(df: pd.DataFrame) -> pd.DataFrame:
"""Feature relative al prezzo e alla sua variazione"""
df = df.copy()
df['price_lag_1'] = df['price'].shift(1)
df['price_change_pct'] = df['price'].pct_change()
df['price_vs_avg_30d'] = df['price'] / df['price'].rolling(30).mean() - 1
return df
def full_feature_engineering_pipeline(df: pd.DataFrame,
meteo_df: pd.DataFrame,
promo_df: pd.DataFrame) -> pd.DataFrame:
"""Pipeline completa di feature engineering"""
df = create_temporal_features(df)
df = create_lag_features(df, 'sales')
df = create_rolling_features(df, 'sales')
df = create_weather_features(df, meteo_df)
df = create_promotion_features(df, promo_df)
df = create_price_features(df)
df = df.dropna()
print(f'Feature totali create: {df.shape[1]}')
print(f'Sample size dopo feature engineering: {df.shape[0]}')
return df
Conductă ML completă: de la datele brute la modelul în producție
O conductă profesională de prognoză a cererii nu se limitează la formarea modelelor. Necesită o arhitectură end-to-end care gestionează colectarea datelor, preprocesarea, validarea directă, selectarea modelului, implementarea și monitorizarea continuă. Mai jos este structura unei conducte pregătite pentru producție.
# Pipeline ML completa per food demand forecasting
import mlflow
import mlflow.pytorch
from dataclasses import dataclass, field
from typing import Optional, Tuple
import pandas as pd
import numpy as np
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ---- Configurazione Pipeline ----
@dataclass
class ForecastConfig:
# Data
store_ids: list = field(default_factory=list)
product_categories: list = field(default_factory=list)
training_lookback_days: int = 365
forecast_horizon_days: int = 7
# Feature engineering
lag_days: list = field(default_factory=lambda: [1, 2, 3, 7, 14, 28])
rolling_windows: list = field(default_factory=lambda: [7, 14, 28])
# Model selection
model_type: str = 'tft' # 'sarima', 'prophet', 'lstm', 'tft', 'xgboost'
use_ensemble: bool = False
# Validation
n_cv_splits: int = 4
val_horizon_days: int = 14
# MLflow
experiment_name: str = 'food_demand_forecasting'
run_name: Optional[str] = None
# Deployment
min_mape_threshold: float = 0.20 # Non deployare se MAPE > 20%
model_registry_name: str = 'food_demand_model'
# ---- Step 1: Data Collection e Validation ----
class DataCollector:
def __init__(self, config: ForecastConfig):
self.config = config
def collect(self, start_date: str, end_date: str) -> pd.DataFrame:
"""Raccoglie dati da data warehouse (es. Snowflake, BigQuery)"""
query = f"""
SELECT
date,
store_id,
product_id,
category,
sales_units,
price,
stock_level,
is_holiday,
is_promotion,
discount_pct
FROM gold.sales_daily
WHERE date BETWEEN '{start_date}' AND '{end_date}'
AND store_id IN ({','.join(map(str, self.config.store_ids))})
ORDER BY date, store_id, product_id
"""
# Qui si userebbe il connector del DWH (es. snowflake-connector-python)
# df = snowflake_connector.execute(query)
logger.info(f'Dati raccolti: {start_date} - {end_date}')
return df # Placeholder
def validate(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, dict]:
"""Validazione qualità dati"""
issues = {}
# Check valori negativi
neg_sales = (df['sales_units'] < 0).sum()
if neg_sales > 0:
issues['negative_sales'] = neg_sales
df.loc[df['sales_units'] < 0, 'sales_units'] = 0
# Check missing dates per gruppo
for group_id, group_df in df.groupby(['store_id', 'product_id']):
date_range = pd.date_range(group_df['date'].min(), group_df['date'].max())
missing = len(date_range) - len(group_df)
if missing > 0:
issues[f'missing_dates_{group_id}'] = missing
# Check outliers (IQR method)
Q1 = df['sales_units'].quantile(0.25)
Q3 = df['sales_units'].quantile(0.75)
IQR = Q3 - Q1
outliers = ((df['sales_units'] < Q1 - 3*IQR) | (df['sales_units'] > Q3 + 3*IQR)).sum()
issues['outliers'] = int(outliers)
logger.info(f'Validazione dati: {issues}')
return df, issues
# ---- Step 2: Walk-Forward Validation ----
class WalkForwardValidator:
"""
Validazione temporale corretta per time series.
NON usare cross-validation classico che crea data leakage.
"""
def __init__(self, config: ForecastConfig):
self.config = config
def validate(self, df: pd.DataFrame, model_class) -> dict:
results = []
total_days = len(df['date'].unique())
split_size = total_days // (self.config.n_cv_splits + 1)
for fold in range(self.config.n_cv_splits):
train_end_idx = split_size * (fold + 1)
val_start_idx = train_end_idx
val_end_idx = val_start_idx + self.config.val_horizon_days
train_dates = sorted(df['date'].unique())[:train_end_idx]
val_dates = sorted(df['date'].unique())[val_start_idx:val_end_idx]
train_df = df[df['date'].isin(train_dates)]
val_df = df[df['date'].isin(val_dates)]
if len(val_df) == 0:
continue
# Training sul fold
model = model_class(self.config)
model.fit(train_df)
predictions = model.predict(val_df)
# Metriche
actuals = val_df['sales_units'].values
preds = predictions['forecast'].values
mape = np.mean(np.abs((actuals - preds) / (actuals + 1e-8))) * 100
rmse = np.sqrt(np.mean((actuals - preds) ** 2))
mae = np.mean(np.abs(actuals - preds))
results.append({
'fold': fold,
'mape': mape,
'rmse': rmse,
'mae': mae,
'train_size': len(train_df),
'val_size': len(val_df)
})
logger.info(f'Fold {fold}: MAPE={mape:.2f}%, RMSE={rmse:.2f}')
# Aggrega risultati
results_df = pd.DataFrame(results)
return {
'mean_mape': results_df['mape'].mean(),
'std_mape': results_df['mape'].std(),
'mean_rmse': results_df['rmse'].mean(),
'mean_mae': results_df['mae'].mean(),
'folds': results
}
# ---- Step 3: MLflow Tracking e Model Registry ----
class ExperimentTracker:
def __init__(self, config: ForecastConfig):
self.config = config
mlflow.set_experiment(config.experiment_name)
def run_experiment(self, df: pd.DataFrame, model_class) -> str:
run_name = self.config.run_name or f'{self.config.model_type}_{datetime.now():%Y%m%d_%H%M}'
with mlflow.start_run(run_name=run_name) as run:
# Log parametri
mlflow.log_params({
'model_type': self.config.model_type,
'forecast_horizon': self.config.forecast_horizon_days,
'training_lookback': self.config.training_lookback_days,
'n_cv_splits': self.config.n_cv_splits
})
# Walk-forward validation
validator = WalkForwardValidator(self.config)
metrics = validator.validate(df, model_class)
# Log metriche
mlflow.log_metrics({
'cv_mean_mape': metrics['mean_mape'],
'cv_std_mape': metrics['std_mape'],
'cv_mean_rmse': metrics['mean_rmse'],
'cv_mean_mae': metrics['mean_mae']
})
# Training finale su tutti i dati
final_model = model_class(self.config)
final_model.fit(df)
# Log artefatti
mlflow.log_dict(metrics, 'validation_metrics.json')
# Registra modello se MAPE accettabile
if metrics['mean_mape'] < self.config.min_mape_threshold * 100:
mlflow.pytorch.log_model(
final_model,
artifact_path='model',
registered_model_name=self.config.model_registry_name
)
logger.info(f'Modello registrato: MAPE={metrics["mean_mape"]:.2f}%')
else:
logger.warning(
f'Modello NON registrato: MAPE {metrics["mean_mape"]:.2f}% '
f'> soglia {self.config.min_mape_threshold * 100}%'
)
return run.info.run_id
Benchmark: comparație între modele pe setul de date real
Mai jos este o comparație sistematică a performanței diferitelor abordări pe un set de date tipic GDO: 150 SKU de produse proaspete (lactate, fructe, legume, carne), 2 ani de date zilnice din 5 puncte de vanzare, cu variabile exogene (meteo, promotii, sarbatori).
Modele comparative de referință - Setul de date GDO italian (150 SKU, 2 ani)
| Model | MAPE (%) | RMSE (unitate) | MAE (unită) | Timp de antrenament | Interpretabilitate | Note |
|---|---|---|---|---|---|---|
| Media mobilă (linie de bază) | 31.4 | 18.7 | 12.3 | < 1 min | Ridicat | Fără sezonalitate, fără regresori |
| SARIMA | 22.8 | 14.2 | 9.8 | ~15 min | Ridicat | Doar un singur decalaj sezonier |
| Holt-Winters | 20.1 | 13.1 | 8.9 | ~5 min | Ridicat | Bun pentru seriale obișnuite |
| Profet | 14.6 | 10.4 | 7.2 | ~30 min | Ridicat | Excelent cu vacanțe/regressori |
| XGBoost + Feature Eng. | 12.9 | 9.8 | 6.7 | ~10 min | Medie | Inginerie de caracteristici critice |
| LSTM (univariat) | 16.4 | 11.2 | 7.8 | ~45 min GPU | Scăzut | Mai rău decât Profetul fără exog |
| LSTM (cu mai multe variante) | 10.8 | 8.4 | 5.9 | ~2 ore GPU | Scăzut | Îmbunătățire puternică cu exog |
| N-BEATS | 11.2 | 8.7 | 6.1 | ~1 oră GPU | Medie | Descompunere interpretabilă |
| TFT (transformator de fuziune temporală) | 8.3 | 6.9 | 4.8 | ~4 ore GPU | Ridicat | Cel mai bun per ansamblu, interpretabil |
| Ansamblu TFT + XGBoost | 7.6 | 6.4 | 4.4 | ~5 ore GPU | Medie | +8% la un singur TFT |
Set de date: 150 SKU-uri de produse proaspete, 5 magazine, date zilnice de 2 ani. Validare de mers înainte pe 4 ori (orizont de 14 zile). GPU: NVIDIA A100.
Considerații privind alegerea modelului
TFT este cea mai puternică opțiune, dar necesită GPU și date abundente pentru fiecare SKU. Pentru comercianții cu amănuntul cu mii de SKU, dar volume reduse per singur produs, o abordare hibridă funcționează cel mai bine: XGBoost sau Prophet pentru SKU-uri cu coadă lungă (volum scăzut, date slabe), TFT pentru SKU-uri industriile cu volum mare care generează cele mai multe venituri și deșeuri.
Reducerea deșeurilor: integrarea prognozelor cu inventar și prețuri dinamice
Numai un model de prognoză precis nu reduce risipa: este necesar act pe prognoze prin managementul stocurilor și sisteme dinamice de stabilire a prețurilor. Bucla luarea deciziilor si: previziunea cererii viitoare → comanda optima catre furnizor → monitorizare niveluri de stoc → reducere dinamică pentru produse aproape de expirare → donare automată pentru surplusurile nevândute.
Optimizarea stocurilor cu stoc de siguranță dinamic
# Inventory optimization basato su forecast probabilistico
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class InventoryParams:
product_id: str
shelf_life_days: int # Vita utile del prodotto
lead_time_days: int # Giorni da ordine a consegna
service_level: float = 0.95 # Fill rate target (95%)
holding_cost_per_unit: float = 0.05 # Costo mantenimento/giorno
shortage_cost_per_unit: float = 2.50 # Costo stockout per unita
waste_cost_per_unit: float = 1.20 # Costo di buttare una unita
def calculate_optimal_order(
demand_forecast: np.ndarray, # Previsione media per i prossimi N giorni
demand_std: np.ndarray, # Deviazione standard previsione
current_stock: int,
params: InventoryParams
) -> dict:
"""
Calcola la quantità ottimale da ordinare considerando:
- Shelf life del prodotto (prodotti freschi deperibili)
- Service level target
- Trade-off costo stockout vs costo spreco
"""
from scipy import stats
# Z-score per il service level target
z_score = stats.norm.ppf(params.service_level)
# Domanda attesa nel lead time
lt_demand = demand_forecast[:params.lead_time_days].sum()
lt_std = np.sqrt((demand_std[:params.lead_time_days] ** 2).sum())
# Safety stock = Z * sigma * sqrt(lead_time)
safety_stock = z_score * lt_std
# Reorder point
reorder_point = lt_demand + safety_stock
# Domanda fino a shelf life (non ordinare più di quanto venderemo)
max_sellable_demand = demand_forecast[:params.shelf_life_days].sum()
max_sellable_std = np.sqrt((demand_std[:params.shelf_life_days] ** 2).sum())
# Order quantity
unconstrained_order = max(0, reorder_point - current_stock)
# Vincolo shelf life: non ordinare più di quanto venderemo nella vita utile
# Aggiusta per il rischio di spreco
waste_probability = 1 - stats.norm.cdf(
current_stock + unconstrained_order,
loc=max_sellable_demand,
scale=max_sellable_std
)
# Costo atteso
expected_waste_cost = waste_probability * unconstrained_order * params.waste_cost_per_unit
expected_shortage_cost = (1 - params.service_level) * lt_demand * params.shortage_cost_per_unit
# Ottimizzazione: riduce ordine se costo spreco > beneficio margine
if expected_waste_cost > expected_shortage_cost * 0.5:
adjustment_factor = 1 - (expected_waste_cost / (expected_waste_cost + expected_shortage_cost))
optimal_order = int(unconstrained_order * adjustment_factor)
else:
optimal_order = int(unconstrained_order)
return {
'optimal_order_qty': max(0, optimal_order),
'reorder_point': reorder_point,
'safety_stock': safety_stock,
'expected_demand_7d': demand_forecast[:7].sum(),
'waste_risk_pct': waste_probability * 100,
'shortage_risk_pct': (1 - params.service_level) * 100,
'decision_reason': 'waste_adjusted' if expected_waste_cost > expected_shortage_cost * 0.5
else 'standard_reorder'
}
# Esempio utilizzo
params = InventoryParams(
product_id='insalata_vaschetta_150g',
shelf_life_days=5,
lead_time_days=1,
service_level=0.95,
waste_cost_per_unit=0.85,
shortage_cost_per_unit=3.20
)
# Simulazione con forecast TFT (media e std da quantile forecast)
forecast_mean = np.array([45, 52, 68, 71, 89, 95, 48]) # Lunedi-Domenica
forecast_std = np.array([8, 9, 12, 13, 15, 16, 9]) # Deviazione standard
order = calculate_optimal_order(
demand_forecast=forecast_mean,
demand_std=forecast_std,
current_stock=120,
params=params
)
print('Raccomandazione ordine:')
for key, val in order.items():
print(f' {key}: {val}')
Prețuri dinamice pentru reducerea deșeurilor: optimizarea reducerii
Tarifarea dinamică bazată pe data de expirare este unul dintre cele mai eficiente instrumente de reducere deșeuri în produse proaspete. Scopul este de a găsi pret optim de reducere care maximizează veniturile recuperate din produsele cu risc de expirare, accelerând vânzarea acestora fără canibalizarea vânzărilor la produse la preț întreg.
Fără risipă, un startup israelian, a dezvoltat un sistem bazat pe rafturi electronice etichete (ESL) care actualizează automat prețurile în funcție de data de expirare, nivelurile de stocul și comportamentul clienților. Comercianții cu amănuntul care l-au adoptat au redus deșeurile 40%, crescând în același timp veniturile pe aceste produse în raport cu situația anterior (aruncați produsul nevândut). Asemenea Prea Bun pentru a merge a lansat în 2024 o platformă AI care procesează date la nivel de SKU pentru a optimiza nivelurile de reduceri, reducerea verificărilor manuale a termenelor limită cu 93-99%.
# Algoritmo markdown optimization per prodotti prossimi a scadenza
import numpy as np
from scipy.optimize import minimize_scalar
from dataclasses import dataclass
from typing import Tuple
@dataclass
class MarkdownConfig:
min_margin_pct: float = 0.10 # Margine minimo (non scendere sotto il 10%)
max_discount_pct: float = 0.70 # Sconto massimo (70%)
base_price: float = 1.99 # Prezzo normale
cost_per_unit: float = 0.95 # Costo di acquisto
waste_value: float = 0.00 # Valore residuo se buttato (es. recupero)
def price_elasticity_model(price: float, base_price: float,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Modello di elasticita del prezzo per prodotti freschi GDO.
Elasticita tipica prodotti freschi: -1.5 a -2.5
(sconto del 10% aumenta la domanda del 15-25%)
"""
price_ratio = price / base_price
return base_demand * (price_ratio ** elasticity)
def markdown_revenue(discount_pct: float,
config: MarkdownConfig,
current_stock: int,
base_demand: float,
elasticity: float = -1.8) -> float:
"""
Calcola il ricavo atteso da un determinato livello di sconto.
Massimizzare questa funzione = trovare lo sconto ottimale.
"""
discounted_price = config.base_price * (1 - discount_pct)
# Vincolo margine minimo
min_price = config.cost_per_unit * (1 + config.min_margin_pct)
if discounted_price < min_price:
return -1e10 # Penalita per prezzi sotto il costo
# Domanda attesa con il nuovo prezzo
expected_demand = price_elasticity_model(
discounted_price, config.base_price, base_demand, elasticity
)
# Unita vendute = min(domanda attesa, stock disponibile)
units_sold = min(expected_demand, current_stock)
units_wasted = max(0, current_stock - units_sold)
# Revenue = vendite + (eventuali recupero valore sprecato)
revenue = (units_sold * discounted_price) + (units_wasted * config.waste_value)
# Costo opportunità: confronto con vendita a prezzo pieno (se avessimo aspettato)
# ma qui massimizziamo il recupero dato che il prodotto scadera
return revenue
def find_optimal_markdown(config: MarkdownConfig,
current_stock: int,
days_to_expiry: int,
base_daily_demand: float,
demand_elasticity: float = -1.8) -> dict:
"""
Trova lo sconto ottimale considerando i giorni rimanenti a scadenza.
Logica: più vicini alla scadenza, maggiore lo sconto necessario.
"""
# Urgenza basata sui giorni a scadenza
if days_to_expiry >= 5:
# Nessun markdown necessario
return {'discount_pct': 0.0, 'recommended_action': 'no_action',
'expected_revenue': config.base_price * min(base_daily_demand * days_to_expiry, current_stock)}
# Moltiplicatore urgenza: più vicini alla scadenza, maggiore il markup di urgenza
urgency_factor = 1.0 + (5 - days_to_expiry) * 0.15 # +15% urgenza per giorno
# Domanda effettiva considerando urgenza (es. promozioni di fine giornata)
effective_demand = base_daily_demand * days_to_expiry * urgency_factor
# Ottimizzazione: trova lo sconto che massimizza il ricavo totale
result = minimize_scalar(
lambda d: -markdown_revenue(d, config, current_stock,
effective_demand, demand_elasticity),
bounds=(0.0, config.max_discount_pct),
method='bounded'
)
optimal_discount = result.x
optimal_revenue = -result.fun
waste_at_no_discount = max(0, current_stock - effective_demand)
waste_with_markdown = max(0, current_stock - price_elasticity_model(
config.base_price * (1 - optimal_discount),
config.base_price, effective_demand, demand_elasticity
))
waste_reduction_units = waste_at_no_discount - waste_with_markdown
revenue_recovered = optimal_revenue - (config.waste_value * current_stock)
action = (
'urgent_markdown' if days_to_expiry <= 1
else 'markdown' if optimal_discount > 0.15
else 'slight_discount'
)
return {
'discount_pct': round(optimal_discount * 100, 1),
'discounted_price': round(config.base_price * (1 - optimal_discount), 2),
'expected_revenue': round(optimal_revenue, 2),
'waste_reduction_units': round(waste_reduction_units, 0),
'revenue_recovered_vs_waste': round(revenue_recovered, 2),
'days_to_expiry': days_to_expiry,
'recommended_action': action
}
# Esempio: insalata con 2 giorni alla scadenza
config = MarkdownConfig(
base_price=1.99,
cost_per_unit=0.80,
min_margin_pct=0.05, # Riduco margine minimo vicino a scadenza
max_discount_pct=0.60
)
recommendation = find_optimal_markdown(
config=config,
current_stock=45,
days_to_expiry=2,
base_daily_demand=12,
demand_elasticity=-2.0
)
print('Raccomandazione markdown:')
for k, v in recommendation.items():
print(f' {k}: {v}')
# Output atteso:
# discount_pct: 35.0
# discounted_price: 1.29
# expected_revenue: 38.70
# waste_reduction_units: 28
# recommended_action: markdown
Studiu de caz: comerț cu amănuntul pe scară largă italiană cu peste 200 de puncte de vânzare
Să analizăm un caz real (anonimizat) de implementare a previziunii cererii ML într-unul singur Lanț italian de vânzare cu amănuntul la scară largă, cu 220 de puncte de vânzare, dintre care 18.000 SKU-uri active 3.200 de produse proaspete și o cifră de afaceri anuală de aproximativ 2,8 miliarde de euro.
Situația inițială
Înainte de implementarea ML, retailerul folosea un sistem ERP clasic cu prognoză pe baza mediilor mobile ponderate din ultimele 4 luni. MAPE mediu pe produse proaspete a fost de 28%, cu vârfuri de 45% la produsele de sezon. Rata risipei la produsele proaspete a fost 12% din valoarea achiziționată, cu costuri de eliminare, pierdere de valoare și impact reputațional semnificative. Cumpărătorii au efectuat revizuiri manuale săptămânale ale comenzilor, ocupând aproximativ 3 FTE cu normă întreagă.
Arhitectura soluției
Stiva tehnologică implementată
| Straturi | Tehnologia de alegere | Funcţie |
|---|---|---|
| Depozitul de date | Fulgi de nea | Stocare centralizată, date POS din toate cele 220 de magazine |
| ETL/ELT | dbt + Airbyte | Integrare POS, vreme (OpenWeather API), promoții (sistem CRM) |
| Orchestrație | Apache Airflow | Recalificare zilnică 3:00-5:00 AM, alertă de anomalie |
| Modele | TFT (top 500 SKU) + XGBoost (coadă lungă) | Prognoza de 7 și 14 zile pentru fiecare SKU x magazin |
| MLOps | MLflow pe Databricks | Experimente de urmărire, registru de modele, testare A/B |
| Servire | Cache FastAPI + Redis | Prognoza API cu latență < 100 ms, cache 24 de ore |
| Motorul Markdown | Microservicii Python | Calcul optim de reducere la fiecare 4 ore pentru produsele cu risc |
| Integrare ESL | API Hanshow / Pricer | Actualizarea automată a prețurilor pe etichetele electronice ale raftului |
| Donare | API Food Bank | Notificare automata pentru produsele cu expirare < 24h nevandute |
Cronologie și faze de implementare
- Luna 1-2: Auditul datelor, curățarea datelor istorice de 3 ani, integrarea surselor exogene (vreme, promoții). Identificarea a 50 de SKU-uri pilot de mare volum și perisabilitate ridicată.
- Luna 3-4: Antrenament model pilot (XGBoost + Prophet), validare înainte, MAPE inițial 27,8%. Prima integrare cu sistemul de comenzi ERP pe 3 magazine de testare.
- Luna 5-6: Training TFT pe primele 500 de SKU-uri noi. Testare A/B: 30 de magazine cu ML vs 30 de magazine cu sistem traditional. Rezultat: reducerea deșeurilor cu 28% în magazinele ML.
- Luna 7-9: Lansare în toate cele 220 de magazine. Implementarea motorului Markdown cu integrare ESL pe 80 de magazine echipate. Activare automată a donației prin Food Bank.
- Luna 10-12: Optimizare șabloane, funcție adăugată pentru rețelele sociale (mențiuni produse pe Instagram/TikTok pentru tendințele emergente). Implementarea ansamblului TFT+XGBoost.
Rezultate măsurate la 12 luni
KPI-uri de afaceri - Rezultate post-implementare (12 luni)
| Metric | Înainte de ML | După 12 luni ML | Variaţie |
|---|---|---|---|
| Produse proaspete MAPE | 28,2% | 9,1% | -67,7% |
| Rata deșeurilor pe valoarea achiziționată | 12,0% | 7,8% | -35,0% |
| Rata de epuizare a stocului (OOS) | 4,8% | 2,9% | -39,6% |
| Produse donate (tone/an) | 45 de tone | 112 tone | +149% (legea beneficiilor Gadda) |
| Eficiența reducerii (% venituri recuperate) | 32% | 71% | +39 pp |
| FTE dedicate revizuirii manuale a comenzilor | 3,0 FTE | 0,8 FTE | -73,3% |
| Economisirea costurilor deșeurilor + eliminare | - | 4,2 milioane de euro/an | Economie nouă |
| Beneficiu fiscal pentru donații (Legea Gadda) | 85.000 euro/an | 210.000 euro/an | +147% |
| Rentabilitatea investiției proiectului (investiție: 1,8 milioane EUR) | - | 234% la 12 luni | Rambursare 5,2 luni |
Lecții învățate din studiul de caz
- Calitatea datelor istorice este adevăratul blocaj: 40% din timp a proiectului a fost dedicat curățării și integrării datelor. Datele istorice POS au avut lacune semnificative în perioadele de vacanță și renovarea magazinelor.
- Nu toate SKU-urile merită același model: TFT pentru primele 500 de SKU-uri, Profet pentru SKU intermediare (500-3000), medie mobilă pentru coada lungă (>3000 SKU). Abordare pe trei niveluri cu cost/beneficiu optim.
- Managementul schimbării este esențial: cumpărători cu peste 20 de ani de experiență au rezistat AI. Soluția: arată descompunerea Profetului (tendință + sezonalitate + vacanță), permițând cumpărătorilor să înțeleagă și să valideze logica modelului.
- Motorul de reducere necesită integrarea ESL: fără etichete electronice pentru rafturi, actualizarea manuală a prețurilor a anulat beneficiul vitezei de reacție a sistem automat.
- Monitorizarea continuă este esențială: modelele se degradează în timp (derive de concept) din cauza schimbărilor în comportamentele de cumpărare post-COVID. Recalificare automate saptamanale mentinute performante stabile.
Valori de afaceri pentru prognoza cererii în FoodTech
Succesul unui sistem de prognoză a cererii nu este măsurat doar de MAPE. Este necesar definiți un cadru de metrică care acoperă atât calitatea tehnică a modelului, cât și impactul afaceri adevărate.
Cadrul de metrici: Tehnic + Business
| Categorie | Metric | Formula / Definiție | Țintă tipică de distribuție pe scară largă |
|---|---|---|---|
| Model de precizie | MAPE | Eroare procentuală absolută medie | < 15% produse proaspete |
| RMSE | Eroare pătratică medie (în unități) | Depinde de volumul SKU | |
| Părtinire | (Prognoză - Actual) / Actual, medie | |Bias| < 5% (fără peste/sub sistematic) | |
| WMAPE | MAPE ponderat în funcție de volumul vânzărilor | < 10% (mai bine decât MAPE pentru coada lungă) | |
| Deşeuri | Rata de reducere a deșeurilor (WRR) | (Waste before - Waste after) / Waste before | > 30% în 12 luni |
| % deșeuri la achiziții | Valoare irosită/Valoare cumpărată | < 8% (sector de referință) | |
| Tone deturnate (donații) | Tone donate / Total deșeuri potențiale | > 50% din exces | |
| Disponibilitate | Rata de epuizare a stocului (OOS) | % SKU cu stoc epuizat / SKU total | < 3% |
| Rata de umplere | Unități livrate / Unități comandate | > 97% | |
| Zile de acoperire (DOC) | Stoc curent/Cerere medie zilnică | Optim pentru categoria de produse | |
| Prețuri dinamice | Eficiență Markdown (ME) | Venituri de reducere/Risipuri potențiale de costuri | > 65% |
| Produse vândute prin Markdown | % produse aproape de expirare vândute cu reducere | > 80% dintre produse sunt expuse riscului | |
| Reducere medie aplicată | Reducere medie la produsele reduse | 25-45% (optim pentru elasticitate) |
Tabloul de bord de monitorizare: Detectarea derive a conceptului
Modelele de prognoză alimentară sunt deosebit de susceptibile la deriva conceptului: modelele de cumpărare se modifică în funcție de sezon, datorită tendințelor nutriționale (de exemplu, creșterea produselor vegane), din cauza crizei prețurilor (inflația 2022-2024 a modificat puternic comportamentul de cumpărare în produse proaspete), pentru evenimente extraordinare (pandemie). Este necesar să se monitorizeze continuu degradarea performanței.
# Monitoring sistema di rilevamento concept drift per food forecasting
import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftAlert:
severity: str # 'WARNING' | 'CRITICAL'
metric: str
current_value: float
baseline_value: float
change_pct: float
affected_skus: List[str]
recommendation: str
class ForecastMonitor:
def __init__(self, baseline_window_days: int = 30,
monitoring_window_days: int = 7):
self.baseline_window = baseline_window_days
self.monitoring_window = monitoring_window_days
self.thresholds = {
'mape_warning': 0.20, # MAPE > 20% = warning
'mape_critical': 0.35, # MAPE > 35% = critical, richiede retraining
'bias_warning': 0.10, # Bias sistematico > 10%
'bias_critical': 0.20, # Bias sistematico > 20%
'waste_rate_warning': 0.10, # Tasso spreco > 10%
}
def compute_current_metrics(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame) -> dict:
"""Calcola metriche sull'ultima finestra di monitoring"""
merged = predictions_df.merge(actuals_df, on=['date', 'sku_id', 'store_id'])
metrics_by_sku = merged.groupby('sku_id').apply(
lambda g: pd.Series({
'mape': np.mean(np.abs((g['actual'] - g['forecast']) / (g['actual'] + 1e-8))),
'bias': np.mean((g['forecast'] - g['actual']) / (g['actual'] + 1e-8)),
'rmse': np.sqrt(np.mean((g['actual'] - g['forecast']) ** 2)),
'n_observations': len(g)
})
)
return {
'mean_mape': metrics_by_sku['mape'].mean(),
'p90_mape': metrics_by_sku['mape'].quantile(0.9),
'mean_bias': metrics_by_sku['bias'].mean(),
'degraded_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_warning']
].index.tolist(),
'critical_skus': metrics_by_sku[
metrics_by_sku['mape'] > self.thresholds['mape_critical']
].index.tolist(),
'per_sku_metrics': metrics_by_sku
}
def detect_drift(self, baseline_metrics: dict,
current_metrics: dict) -> List[DriftAlert]:
"""Confronta metriche correnti vs baseline e genera alert"""
alerts = []
# Alert MAPE
mape_change = ((current_metrics['mean_mape'] - baseline_metrics['mean_mape'])
/ baseline_metrics['mean_mape'])
if current_metrics['mean_mape'] > self.thresholds['mape_critical']:
alerts.append(DriftAlert(
severity='CRITICAL',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['critical_skus'],
recommendation='RETRAINING IMMEDIATO richiesto. Analizza concept drift.'
))
elif current_metrics['mean_mape'] > self.thresholds['mape_warning']:
alerts.append(DriftAlert(
severity='WARNING',
metric='MAPE',
current_value=current_metrics['mean_mape'] * 100,
baseline_value=baseline_metrics['mean_mape'] * 100,
change_pct=mape_change * 100,
affected_skus=current_metrics['degraded_skus'],
recommendation='Monitorare trend. Pianificare retraining prossima settimana.'
))
# Alert Bias sistematico
if abs(current_metrics['mean_bias']) > self.thresholds['bias_critical']:
direction = 'sovrastima' if current_metrics['mean_bias'] > 0 else 'sottostima'
alerts.append(DriftAlert(
severity='CRITICAL',
metric='Bias',
current_value=current_metrics['mean_bias'] * 100,
baseline_value=baseline_metrics.get('mean_bias', 0) * 100,
change_pct=0,
affected_skus=[],
recommendation=f'Bias sistematico di {direction}: aggiorna calibrazione modello.'
))
return alerts
def check_and_alert(self, predictions_df: pd.DataFrame,
actuals_df: pd.DataFrame,
baseline_metrics: dict) -> None:
"""Entry point principale del monitor"""
current = self.compute_current_metrics(predictions_df, actuals_df)
alerts = self.detect_drift(baseline_metrics, current)
for alert in alerts:
if alert.severity == 'CRITICAL':
logger.critical(
f'[DRIFT CRITICAL] {alert.metric}: {alert.current_value:.1f}% '
f'(baseline: {alert.baseline_value:.1f}%) - {alert.recommendation}'
)
# Qui si trigghera notifica Slack/PagerDuty
self._send_alert(alert)
else:
logger.warning(
f'[DRIFT WARNING] {alert.metric}: {alert.current_value:.1f}% - '
f'{len(alert.affected_skus)} SKU degradati'
)
def _send_alert(self, alert: DriftAlert) -> None:
"""Invia notifica al team ML Ops (implementazione dipende dal sistema)"""
# Integrazione con: Slack, PagerDuty, email, Prometheus Alertmanager
pass
Cele mai bune practici și anti-modele în prognoza cererii alimentare
Anti-modele de evitat
- Data scurgerii temporare: utilizați validarea încrucișată standard în loc de validarea walk-forward introduce date viitoare în formare. Și cea mai frecventă eroare metodologică și duce la MAPE-uri optimiste de 30-50% care se dovedesc false în producție.
- Șablon unic pentru toate SKU-urile: produse cu cerere intermitentă (vânzări de 2-3 ori pe săptămână) necesită modele diferite de produse cu volum mare. Aplicați LSTM la un produs cu 200 de vânzări/an duce la supraadaptare și MAPE de 60%+.
- Ignorați părtinirea comercială: comercianții cu amănuntul au avut tendința istorică să comandă în exces pentru a evita epuizarea stocurilor (cel mai rău dintre cele „două rele”) percepute. Dacă modelul este antrenat pe datele istorice ale comenzii (nu cererea reală), moștenește această părtinire. Utilizați datele de vânzări eficace, nu a comenzilor.
- Nu modelați promoțiile viitoare: o prognoză care nu cunoaşte promoţii planificat pentru săptămâna următoare oferă previziuni sistematic incorecte în perioade promoțională. Integrați întotdeauna calendarul promoțional ca intrare.
- Optimizați numai MAPE ignorând direcția erorii: în context mâncare, o eroare de supraprognoză de 50 de unități (50 de unități irosite) este mult mai scumpă a unei erori de subprognoză de 50 de unități (50 de vânzări pierdute). Utilizați valori asimetrice (MAPE ponderat) sau funcții de pierdere asimetrică în antrenament.
Cele mai bune practici consolidate
- Stratificați SKU-urile după tratament: clasifică fiecare SKU după volum, variabilitate și termenul de valabilitate. Aplicați modele diferite la diferite clustere. O abordare ierarhică (global + local) depășește adesea ambele modele pure.
- Include întotdeauna o estimare a incertitudinii: o prognoză precisă (un singur număr) şi mai puţin utilă decât o prognoză probabilistică (distribuţie sau intervale). TFT și Profet ele furnizează cuantile în mod nativ, fundamentale pentru calcularea stocului optim de siguranță.
- Automatizați recalificarea, dar monitorizați anomaliile: recalificare zilnică și util, dar poate amplifica erorile dacă datele de ieri au fost anormale (de exemplu, eșecul POS). Întotdeauna implementați o verificare a sanității înainte de a introduce un nou model în producție.
- Creați încredere cu cumpărătorii: Sistemele ML eșuează adesea din cauza rezistenței organizatoric, nu pentru calitate tehnică. Arătați descompunerea modelului, explicați previziunile în limbajul de afaceri, lasă cumpărătorilor posibilitatea de a înlocui cu a buclă de feedback care îmbunătățește modelul.
- Măsurați continuu rentabilitatea investiției: calculați săptămânal economiile de deșeuri evitat, recuperarea de la reducere, beneficiul fiscal Gadda Legea. Faceți valoarea vizibilă creat și fundamental pentru menținerea angajamentului organizațional în timp.
Concluzii: Prognoza ca instrument de durabilitate
Prognoza cererii cu învățarea automată în sectorul alimentar este unul dintre cazurile de utilizare a AI cel mai bun raport impact/complexitate disponibil astăzi. Problema este bine definită, datele există (fiecare retailer are ani de istorie POS), valorile de succes sunt clare (MAPE, Waste Rate, ROI), iar impactul se extinde dincolo de afacere: reduce risipa alimentară și un imperativ economic, de reglementare și de mediu.
Calea sugerată pentru un comerț cu amănuntul italian la scară largă care dorește să înceapă în 2025 este pragmatică: începe cu Prophet sau XGBoost pe un subset de 50 de SKU-uri cu volum mare și perisabilitate mare, măsurați delta MAPE și Rata de Reducere a Deșeurilor după 30 de zile, construiți cazul de afaceri intern, și apoi scalați progresiv către TFT pentru SKU-uri de top și spre integrarea cu reduceri Motoare și sisteme ESL.
Scopul final nu este acela de a avea cel mai precis model, dar un ecosistem decizional care transformă previziunile în acțiuni: comenzi optime, prețuri adaptive, donații automată. În acest ecosistem, Machine Learning și infrastructura de abilitare, dar reducerea deșeurilor și rezultatul concret al afacerii care justifică fiecare investiție.
Continuați în seria FoodTech
În următoarele articole ale seriei, explorăm tehnologii complementare pentru prognoza cererii:
- Articolul 9 - Satelit API și Precision Agriculture: cum ar fi datele satelitare Modele de prognoză a randamentului agricol pentru furaje NDVI și Sentinel-2, închizând bucla de la producție la distribuție.
- Articolul 10 - ML Edge pentru monitorizarea culturilor: inferență încorporată pe Dispozitive IoT în teren pentru monitorizarea în timp real a stării culturilor, cu integrarea în conducta lanțului de aprovizionare.
Aflați mai multe despre tehnologiile MLOps care fac posibilă implementarea în producție a modelelor de prognoză, consultați și seria MLOps pentru afaceri: modele AI în producție cu MLflow.







