IoT Pipeline pro přesné zemědělství s Pythonem a MQTT
Pšeničné pole v Puglii. Tři tisíce hektarů, čtyřicet senzorů pohřbených v různých hloubkách, data z teplota, vlhkost půdy, pH a elektrická vodivost, které přicházejí každých třicet sekund. Bez strukturovaný datový kanál, to je jen digitální šum. Se správným potrubím se stává motorem rozhodnutí, která snižují spotřebu vody o 40 %, zvyšují výnos o 15 % a snižují náklady hnojiv o čtvrtinu.
Precizní zemědělství není příslibem do budoucna: je to průmyslová realita, která platí v roce 2025 14,77 miliardy dolarů globálně, s projekcemi, které trh dovedou 26,86 miliardy do roku 2030 při CAGR 12,7 %. V Itálii 28,5 % firem zemědělství již využívá techniky přesného zemědělství podle údajů ISTAT 2024 s vrcholy 41,1 % ve společnostech nad 100 hektarů UAA. Italský zemědělský sektor vytvořil hodnotu přidal 42,4 miliardy eur v roce 2024potvrzuje Itálii jako první v Evropě, a digitalizace je hlavním motorem tohoto výkonu.
Ale mezi senzorem v terénu a správným agronomickým rozhodnutím existuje složitá technická cesta: nízkoenergetické bezdrátové protokoly, MQTT broker, validace schématu, obohacovací potrubí, medailonová architektura na datovém jezeře, řídicí panel v reálném čase a výstražný systém. Tento článek pokrývá každý krok tohoto řetězce funkčním kódem Pythonu, skutečnými architekturami a osvědčenými postupy ověřeno ve výrobě.
Co se dozvíte v tomto článku
- End-to-end architektura systému IoT pro přesné zemědělství
- Porovnání typů zemědělských senzorů a bezdrátových protokolů: MQTT, LoRaWAN, Zigbee
- Hluboký ponor MQTT: QoS 0/1/2, uchované zprávy, poslední vůle, návrh tématu
- Dokončete implementaci Pythonu pomocí paho-mqtt: vydavatel a spotřebitelský kanál
- Ověření a vynucení schémat IoT dat pomocí Pydantic
- Integrace s InfluxDB pro časové řady a Apache Kafka pro zpracování streamů
- Architektura medailonu (bronz/stříbro/zlato) pro zemědělská data
- Dashboard Grafana a systém upozornění na kritické prahy
- Italský kontext: CAP, PNRR Transition 5.0, pobídky AgriTech 2025
FoodTech Series – všechny články
| # | Položka | Úroveň | Stát |
|---|---|---|---|
| 1 | IoT Pipeline pro přesné zemědělství (jste zde) | Moderní | Proud |
| 2 | ML Edge pro monitorování plodin: Počítačové vidění v polích | Moderní | Již brzy |
| 3 | Satelitní API a vegetační indexy: NDVI s Pythonem a Sentinel-2 | Střední | Již brzy |
| 4 | Sledovatelnost blockchainu v potravinách: od pole až po supermarket | Střední | Již brzy |
| 5 | Počítačová vize pro kontrolu kvality v potravinářském průmyslu | Moderní | Již brzy |
| 6 | FSMA a Digital Compliance: Automatizace regulačních procesů | Střední | Již brzy |
| 7 | Vertikální zemědělství: Kontrola životního prostředí s IoT a ML | Moderní | Již brzy |
| 8 | Prognóza poptávky pro maloobchod s potravinami s Prophet a LightGBM | Střední | Již brzy |
| 9 | Farm Intelligence Dashboard: Analýza v reálném čase s Grafanou | Střední | Již brzy |
| 10 | Optimalizace potravin v dodavatelském řetězci: ML pro snížení odpadu | Střední | Již brzy |
Trh AgriTech v roce 2025: Čísla a trendy
Během několika let se precizní zemědělství změnilo ze specializované technologie na strategickou hnací sílu konkurenceschopnost primárního sektoru. Čísla to jasně potvrzují: globální trh platí precizní zemědělství 14,77 miliardy dolarů v roce 2025 a bude to růst 26,86 miliardy do roku 2030. Ale celkový obraz AgriTech je ještě širší: segment expandoval, který zahrnuje software pro správu, zemědělské drony, robotiku a digitální bio-vstupy, přesahuje 30 miliard v roce 2025 s očekávanou CAGR 16-23 % do roku 2031 podle různých výzkumných zdrojů.
V Itálii znamenal rok 2024 zlom: podle ISTAT dosáhlo italské zemědělství první místo v Evropě za přidanou hodnotu s 42,4 miliardami eur (+9 % ve srovnání s rokem 2023). Zemědělská výroba vzrostla v objemu o 1,4 % a přidaná hodnota o 3,5 %. Přitom 28,5 % italských zemědělských společností již používá přesné techniky, s větší koncentrací na severovýchodě (33 %) a severozápadě (32,1 %) a u velkých operátorů (41,1 % ve společnostech s UAA větší než 100 hektarů).
Povolení technologií precizního zemědělství v roce 2025
| Technologie | Hlavní aplikace | Adopce Itálie | Průměrná návratnost investic |
|---|---|---|---|
| IoT půdní senzory | Variabilní zálivka, hnojení | Alta (severní Itálie) | 15-25% snížení vstupních nákladů |
| Zemědělské drony | Mapování, ošetření, analýza listů | Průměrný | 30-40% úspora pesticidů |
| Satelitní snímky | NDVI, vodní stres, předpokládané výnosy | Středně vysoká | 5-10% optimalizace výnosu |
| IoT meteostanice | Modely predikce onemocnění, zavlažování | Vysoký | 10-20% snížení léčby |
| Technologie proměnlivé sazby | Výsev, variabilní hnojení | Nízko-střední | 8-15% úspora na vstupu |
| Strojové učení na terénních datech | Prognóza výnosů, agronomická optimalizace | Nízký | +10-20% výtěžek, -15% vstup |
PNRR hrála při akceleraci rozhodující roli: Mise 2 „Zelená revoluce a Ekologický přechod“ vyčlenil 400 milionů eur na modernizaci vozového parku zemědělství směrem k technologiím 4.0. Plán přechodu 5.0 s celkovým rozpočtem ve výši 12,7 miliardy eur ve dvouletém období 2024-2025 (6,3 miliardy konkrétně pro Přechod 5.0) a vztahuje se také na zemědělství: zákon o rozpočtu na rok 2025 (L. 207/2024) rozšířila oblast působnosti a učinila zemědělské podniky způsobilými pro daňové úlevy pro investice do digitálních a energeticky účinných technologií.
IoT architektura pro zemědělství: Od senzoru k datovému jezeru
Před napsáním jediného řádku kódu je nezbytné porozumět architektuře systému kompletní. Chyba v návrhu na začátku vede k nákladným přepisům, když se škálujete 10 senzorů při 10 000. Architektura, kterou zde popisujeme, je architektura přijatá těmi hlavními operátory v odvětví a ověřené v reálném produkčním prostředí.
End-to-End architektura: Vrstvy a komponenty
┌─────────────────────────────────────────────────────────────────────┐
│ FIELD LAYER (Campo) │
│ [Sensore Suolo] [Stazione Meteo] [Sensore pH] [Drone Mapping] │
│ │ │ │ │ │
│ └────────────────┴────────────────┘ │ │
│ │ LoRaWAN / Zigbee / RS-485 │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ GATEWAY LAYER (Edge) │
│ [Gateway LoRaWAN / Raspberry Pi 4] │
│ - Aggregazione dati multi-sensore │
│ - Pre-elaborazione e filtro outlier │
│ - Buffer locale (offline tolerance) │
│ - Protocollo: MQTT publish su broker locale │
└────────────────────────┼────────────────────────────────────────────┘
│ MQTT / TLS
┌────────────────────────┼────────────────────────────────────────────┐
│ BROKER LAYER (Fog/Cloud) │
│ [EMQX / HiveMQ / Eclipse Mosquitto] │
│ - Topic management gerarchico │
│ - Autenticazione mTLS / JWT │
│ - QoS management e message persistence │
│ - Bridge verso cloud (AWS IoT / Azure IoT Hub) │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ PROCESSING LAYER (Cloud) │
│ [Apache Kafka] ──► [Stream Processor] ──► [InfluxDB] │
│ - Ingestion stream - Validazione schema - Time-series store │
│ - Partitioning - Enrichment - Retention policy │
│ - Consumer groups - Alerting real-time - Downsampling │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ DATA LAKE (Medallion Architecture) │
│ [Bronze: Raw S3] ──► [Silver: Cleaned] ──► [Gold: Analytics] │
│ - Dati grezzi MQTT - Schema validato - Aggregazioni │
│ - Immutabile - Outlier rimossi - ML features │
│ - Formato: Parquet - Formato: Delta/Iceberg - Formato: Parquet │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ SERVING LAYER (Applicazioni) │
│ [Grafana Dashboard] [Alert Manager] [ML Models] [Mobile App] │
│ - Real-time monitoring - SMS/Email/Push - Previsioni rese │
│ - Mappa campo - Soglie critiche - Ottimizzazione input │
└─────────────────────────────────────────────────────────────────────┘
Každá úroveň má odlišné a oddělené odpovědnosti. Toto oddělení a kritika: vám to umožňuje vyměňte součást (např. přepněte z Mosquitto na EMQX při změně měřítka) bez dotyku ostatní. A umožňuje vám implementovat offline toleranci: pokud dojde k výpadku připojení ke cloudu, brána nadále lokálně shromažďuje a ukládá data.
Zemědělské senzory: typy, protokoly a nasazení
Hlavní kategorie senzorů pro pole
Výběr správných senzorů závisí na plodině, půdních podmínkách a cílech agronomové. Zde jsou hlavní kategorie s technickými charakteristikami a orientačními náklady pro rok 2025:
Senzory pro přesné zemědělství
| Kategorie | Parametr | Technologie | Jednotkové náklady | Protokol |
|---|---|---|---|---|
| Vlhkost půdy (VWC) | Objemový obsah vody | FDR, TDR, kapacitní | 30-150 EUR | SDI-12, RS-485, LoRa |
| Teplota půdy | T broušené na 10/30/50 cm | PT100, NTC | 20-80 EUR | SDI-12, I2C, 1-Wire |
| pH půdy | Kyselost in situ | ISE elektroda | 80-300 EUR | RS-485, Modbus |
| Elektrická vodivost (EC) | Slanost, plodnost | Indukční, kontaktní | 60-200 EUR | SDI-12, RS-485 |
| Meteorologická stanice | T, HR, vítr, déšť, záření | Integrovaný multi-senzor | 200-800 EUR | RS-485, WiFi, LoRa |
| Listový senzor | Vlhkost listů, teplota | Kapacitní, IR | 40-120 EUR | SDI-12, I2C |
| Průtokoměr zavlažování | Rychlost průtoku vody | Ultrazvuk, vrtule | 80-350 EUR | Pulzní, RS-485 |
| Přenosný NDVI senzor | Vegetativní index | Multispektrální | 300-1500 EUR | Bluetooth, WiFi |
Bezdrátové komunikační protokoly: Nejlepší srovnání
Výběr bezdrátového protokolu je možná nejkritičtějším rozhodnutím v architektuře systému Zemědělský IoT. Hřiště mají extrémní vlastnosti: vzdálenosti do 10 km, fyzické překážky (řady, stromy, venkovské budovy), bez napájení ze sítě, teploty od -20 do +60 stupňů Celsia.
Srovnání bezdrátových protokolů pro IoT v zemědělství
| Protokol | Rozsah | Kapela | Bubny | Cena infra | Případ použití |
|---|---|---|---|---|---|
| LoRaWAN | 3-15 km | 0,3-50 kbps | 5-10 let | Střední (brána) | Půdní senzory, vzdálené polní počasí |
| NB-IoT | 10+ km | 20-250 kbps | 3-8 let | Nízká (SIM karta) | Oblasti s pokrytím 4G/5G |
| Zigbee | 10-100m | 250 kbps | 1-3 roky | Nízká (síťovina) | Skleníky, automatické závlahové systémy |
| WiFi 6 | 100-200 m | Vysoká (Gbps) | Hodiny/dny | střední (AP) | Kamerové systémy, analýza kvality videa |
| 4G/LTE | Neomezený | Vysoký | 1-5 let | Střední (SIM) | Zemědělská technika, mobilní brána |
| RS-485 (kabelové) | 1200 m | 10 Mbps | N/A | Baskytara (svody) | Řízené skleníky, pevné systémy |
Pro typickou italskou farmu (50-500 hektarů na otevřených polích) je nejběžnějším řešením v 2025 a hybridní architektura: LoRaWAN pro půdní senzory v odlehlých polích, NB-IoT pro zemědělské stroje v pohybu, např WiFi/drátové pro skleníky kde přesnost a vzorkovací frekvence jsou maximální. Centrální brána (často a Raspberry Pi 4 nebo průmyslová brána Dragino) vše agreguje a publikuje prostřednictvím MQTT do cloudu.
MQTT Deep Dive: Architektura, QoS a osvědčené postupy
MQTT (Message Queuing Telemetry Transport) je de facto protokol pro IoT. Vytvořeno společností IBM v V 90. letech se pro sledování ropovodů přes satelit stal standardem ISO/IEC 20922 a srdcem jakéhokoli seriózního systému IoT. Díky své jednoduchosti a výkonu je ideální pro prostředí s zařízení s omezenou šířkou pásma a nízkou spotřebou energie.
Model publikování/přihlášení
Na rozdíl od typického modelu požadavek/odpověď HTTP používá MQTT paradigma zveřejnit/přihlásit se k odběru: producenti dat (vydavatelé) nevědí, kdo je čte, a i spotřebitelé (předplatitelé) nevědí, kdo publikuje. Oddělení je celkové a zprostředkované a centrální složka tzv makléř.
Data jsou organizována v téma, hierarchické řetězce oddělené lomítky, že popsat povahu dat. Dobře navržené téma je klíčem ke škálovatelné architektuře. Pro farmu s více parcelami se doporučuje následující struktura:
# Struttura topic MQTT consigliata per agricoltura di precisione
# Pattern: azienda/appezzamento/dispositivo/tipo-sensore/metrica
# Esempi concreti:
farm/campo-nord/sensor-001/soil/moisture # Umidita suolo sensore 001
farm/campo-nord/sensor-001/soil/temperature # Temperatura suolo sensore 001
farm/campo-nord/sensor-001/soil/ph # pH suolo sensore 001
farm/campo-nord/sensor-001/soil/ec # Conducibilita elettrica
farm/campo-sud/weather-station/air/temperature # Temperatura aria stazione meteo
farm/campo-sud/weather-station/air/humidity # Umidita aria
farm/campo-sud/weather-station/wind/speed # Velocita vento
farm/campo-sud/weather-station/rain/mm # Precipitazioni
farm/+/+/soil/moisture # Wildcard: umidita suolo da TUTTI i campi e sensori
farm/campo-nord/# # Wildcard: TUTTI i dati dal campo nord
farm/# # Wildcard: TUTTI i dati dell'azienda
# Topic di sistema (prefisso $)
$SYS/brokers/emqx/connections/count # Statistiche broker
farm/campo-nord/sensor-001/$status # Status device (LWT)
farm/campo-nord/sensor-001/$command # Comandi al dispositivo
Quality of Service (QoS): Tři úrovně
MQTT QoS definuje záruku doručení zpráv mezi klientem a brokerem. Výběr správné úrovně přímo ovlivňuje baterii, šířku pásma a spolehlivost systému:
MQTT QoS: Podrobné srovnání
| Úroveň | Jméno | Záruka | Nad hlavou | Použití v zemědělství |
|---|---|---|---|---|
| QoS 0 | Maximálně jednou | Žádný (oheň a zapomeň) | Minimálně (1 balení) | Vysokofrekvenční telemetrie (T každých 5 sekund), přijatelná ztráta |
| QoS 1 | Alespoň jednou | Alespoň jedna dodávka (možné duplikáty) | Nízká (2 pakety, ACK) | Hodnoty senzoru vlhkosti/pH, protokol zavlažování |
| QoS 2 | Přesně Jednou | Garantované doručení pouze jednou | Vysoká (4 balení) | Ovládání zavlažovacích ventilů, kritické alarmy, dávkování hnojiva |
Zachovaná poselství a poslední vůle
Dvě funkce MQTT, které jsou zvláště užitečné v zemědělství:
- Uchované zprávy: Broker uloží poslední zprávu k tématu a hle doručuje okamžitě každému novému předplatiteli. Pro aktuální stav senzorů je kritické: řídicí panel, který se připojuje, přijímá nejnovější hodnoty okamžitě, aniž by čekal na další publikační cyklus.
- Poslední vůle (LWT): Při připojování každé zařízení může nakonfigurovat zprávu "závěť", kterou makléř automaticky zveřejní, pokud připojení abnormálně selhává. Nepostradatelné pro detekci offline senzorů bez dotazování aktivní: pokud se snímač správně neodpojí (slabá baterie, rušení), broker automaticky zveřejňuje stav „offline“ na téma status.
Kompletní implementace Pythonu: Sensor Publisher
Implementujeme vydavatele Pythonu, který simuluje realistický uzel zemědělských senzorů s vlhkostí
půda, teplota, pH a EC. Kód používá paho-mqtt 2.x (Moderní API se zpětnými voláními
aktualizováno) a zavádí všechny osvědčené postupy: LWT, uchované zprávy, vhodné QoS,
automatické opětovné připojení a strukturované schéma JSON.
# sensor_node.py
# Nodo sensore MQTT per agricoltura di precisione
# Dipendenze: pip install paho-mqtt pydantic
import paho.mqtt.client as mqtt
import json
import time
import random
import math
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
import ssl
# ── Configurazione ────────────────────────────────────────────────────────────
BROKER_HOST = "emqx.azienda-agricola.it"
BROKER_PORT = 8883 # TLS
KEEPALIVE = 60 # secondi
CLIENT_ID = "sensor-campo-nord-001"
FARM_ID = "farm-001"
FIELD_ID = "campo-nord"
SENSOR_ID = "sensor-001"
# Topic base
TOPIC_BASE = f"{FARM_ID}/{FIELD_ID}/{SENSOR_ID}"
TOPIC_SOIL = f"{TOPIC_BASE}/soil"
TOPIC_STATUS = f"{TOPIC_BASE}/$status"
TOPIC_COMMAND = f"{TOPIC_BASE}/$command"
# Intervallo di pubblicazione in secondi
PUBLISH_INTERVAL = 30
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
# ── Modello dati sensore ───────────────────────────────────────────────────────
@dataclass
class SoilReading:
"""Lettura completa da un nodo sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str # ISO 8601 UTC
latitude: float
longitude: float
depth_cm: int # Profondità di installazione
# Misurazioni suolo
moisture_pct: float # Umidita volumetrica (VWC) %
temperature_c: float # Temperatura suolo gradi C
ph: float # pH suolo (4.0 - 9.0)
ec_ds_m: float # Conducibilita elettrica dS/m
# Metadata dispositivo
battery_pct: int # Livello batteria
rssi_dbm: int # Signal strength in dBm
firmware_version: str
# Flag qualità
quality_flag: str # "OK", "WARN", "ERROR"
quality_notes: Optional[str] = None
def read_sensors_from_hardware() -> SoilReading:
"""
In produzione: legge i sensori reali via SDI-12 o RS-485.
Qui: simula dati realistici con variazione temporale.
"""
now = datetime.now(timezone.utc)
# Ciclo circadiano per temperatura (più alta nelle ore centrali)
hour = now.hour
temp_base = 18.0
temp_variation = 6.0 * math.sin(math.pi * (hour - 6) / 12) if 6 <= hour <= 18 else -2.0
temperature = temp_base + temp_variation + random.gauss(0, 0.3)
# Umidita: degrada lentamente senza pioggia, segue ciclo stagionale
base_moisture = 35.0 # % VWC campo (35% = campo saturo irrigazione)
moisture = base_moisture + random.gauss(0, 1.5)
moisture = max(5.0, min(60.0, moisture)) # clamp fisico
# pH relativamente stabile
ph = 6.8 + random.gauss(0, 0.1)
ph = max(4.0, min(9.0, ph))
# EC: correlata alla salinita e fertilizzazione
ec = 1.2 + random.gauss(0, 0.05)
ec = max(0.1, min(5.0, ec))
# Battery che decresce lentamente (simulazione)
battery = max(10, 95 - int(time.time() / 3600) % 85)
# Quality flag automatico
quality = "OK"
notes = None
if moisture < 10.0:
quality = "WARN"
notes = "Umidita sotto soglia minima critica"
elif moisture > 55.0:
quality = "WARN"
notes = "Umidita sopra soglia saturazione"
if battery < 15:
quality = "WARN"
notes = (notes or "") + " | Batteria in esaurimento"
return SoilReading(
sensor_id = SENSOR_ID,
farm_id = FARM_ID,
field_id = FIELD_ID,
timestamp = now.isoformat(),
latitude = 40.4164,
longitude = 17.9308,
depth_cm = 30,
moisture_pct = round(moisture, 2),
temperature_c = round(temperature, 2),
ph = round(ph, 2),
ec_ds_m = round(ec, 3),
battery_pct = battery,
rssi_dbm = random.randint(-95, -45),
firmware_version = "2.4.1",
quality_flag = quality,
quality_notes = notes,
)
# ── Client MQTT ───────────────────────────────────────────────────────────────
class AgriSensorNode:
"""Nodo sensore MQTT con auto-reconnect e LWT"""
def __init__(self):
self.client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
self._setup_auth()
self._setup_tls()
self._setup_callbacks()
self._setup_lwt()
self.connected = False
def _setup_auth(self):
self.client.username_pw_set(
username="sensor-user",
password="<TOKEN_SEGRETO>"
)
def _setup_tls(self):
"""TLS mutuo con certificato dispositivo"""
self.client.tls_set(
ca_certs = "/certs/ca.crt",
certfile = "/certs/sensor.crt",
keyfile = "/certs/sensor.key",
tls_version = ssl.PROTOCOL_TLS_CLIENT,
)
def _setup_callbacks(self):
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
def _setup_lwt(self):
"""Last Will Testament: pubblicato dal broker se la connessione cade"""
lwt_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "connection_lost",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.will_set(
topic = TOPIC_STATUS,
payload = lwt_payload,
qos = 1,
retain = True, # Retain: dashboard vede subito lo stato offline
)
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
log.error(f"Connessione fallita: {reason_code}")
return
log.info(f"Connesso al broker: {BROKER_HOST}")
self.connected = True
# Pubblica stato online (retained)
online_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "online",
"timestamp": datetime.now(timezone.utc).isoformat(),
"firmware": "2.4.1",
})
client.publish(TOPIC_STATUS, online_payload, qos=1, retain=True)
# Sottoscrivi ai comandi
client.subscribe(TOPIC_COMMAND, qos=2)
log.info(f"Sottoscritto a: {TOPIC_COMMAND}")
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
self.connected = False
log.warning(f"Disconnesso: {reason_code}. Tentativo riconnessione...")
def _on_message(self, client, userdata, message):
"""Gestione comandi ricevuti dal broker (es. cambio intervallo)"""
try:
payload = json.loads(message.payload.decode())
cmd = payload.get("command")
log.info(f"Comando ricevuto: {cmd}")
if cmd == "set_interval":
global PUBLISH_INTERVAL
PUBLISH_INTERVAL = int(payload.get("value", 30))
log.info(f"Intervallo aggiornato a {PUBLISH_INTERVAL}s")
elif cmd == "reboot":
log.warning("Comando reboot ricevuto")
# In produzione: riavvia il sistema
except Exception as e:
log.error(f"Errore parsing comando: {e}")
def _on_publish(self, client, userdata, mid, reason_code, properties):
log.debug(f"Messaggio {mid} pubblicato con successo")
def connect(self):
self.client.connect(
host = BROKER_HOST,
port = BROKER_PORT,
keepalive = KEEPALIVE,
)
self.client.loop_start() # Thread background per I/O
def publish_reading(self, reading: SoilReading):
"""Pubblica lettura sensore su topic appropriati"""
# Payload principale: lettura completa
payload_full = json.dumps(asdict(reading), default=str)
result = self.client.publish(
topic = TOPIC_SOIL,
payload = payload_full,
qos = 1,
retain = True, # Ultimo valore sempre disponibile
)
# Pubblica anche metriche singole per dashboard real-time
metrics = {
"moisture": (reading.moisture_pct, 1),
"temperature": (reading.temperature_c, 0),
"ph": (reading.ph, 1),
"ec": (reading.ec_ds_m, 0),
}
for metric, (value, qos) in metrics.items():
self.client.publish(
topic = f"{TOPIC_SOIL}/{metric}",
payload = str(value),
qos = qos,
retain = True,
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
log.info(
f"Pubblicato | Moisture: {reading.moisture_pct}% | "
f"Temp: {reading.temperature_c}C | pH: {reading.ph} | "
f"EC: {reading.ec_ds_m} dS/m | Quality: {reading.quality_flag}"
)
else:
log.error(f"Errore pubblicazione: {result.rc}")
def run(self):
"""Loop principale del nodo sensore"""
self.connect()
# Attendi connessione iniziale
timeout = 10
while not self.connected and timeout > 0:
time.sleep(1)
timeout -= 1
if not self.connected:
log.error("Impossibile connettersi al broker")
return
log.info(f"Nodo sensore avviato. Intervallo: {PUBLISH_INTERVAL}s")
try:
while True:
reading = read_sensors_from_hardware()
self.publish_reading(reading)
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
log.info("Shutdown richiesto")
finally:
# Disconnessione pulita: il LWT NON viene inviato
offline_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "graceful_shutdown",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.publish(TOPIC_STATUS, offline_payload, qos=1, retain=True)
time.sleep(0.5)
self.client.loop_stop()
self.client.disconnect()
if __name__ == "__main__":
node = AgriSensorNode()
node.run()
Consumer Pipeline: validace, obohacení a skladování
Spotřebitel je druhý konec systému. Přihlašuje se k odběru témat MQTT, ověřuje data přijatá pomocí Pydantic, obohacuje čtení o kontextová data (agronomické informace, upozornění na počasí) a směruje je do InfluxDB pro ukládání časových řad a do S3 pro datové jezero.
# pipeline_consumer.py
# Consumer MQTT + validazione Pydantic + storage InfluxDB
# Dipendenze: pip install paho-mqtt pydantic influxdb-client boto3
import paho.mqtt.client as mqtt
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field, field_validator, ValidationError
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import boto3
import io
log = logging.getLogger(__name__)
# ── Schema di validazione Pydantic ────────────────────────────────────────────
class SoilReadingSchema(BaseModel):
"""Schema di validazione per letture sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str
latitude: float = Field(ge=-90, le=90)
longitude: float = Field(ge=-180, le=180)
depth_cm: int = Field(ge=0, le=200)
moisture_pct: float = Field(ge=0.0, le=100.0)
temperature_c: float = Field(ge=-40.0, le=80.0)
ph: float = Field(ge=0.0, le=14.0)
ec_ds_m: float = Field(ge=0.0, le=20.0)
battery_pct: int = Field(ge=0, le=100)
rssi_dbm: int = Field(ge=-150, le=0)
firmware_version: str
quality_flag: str = Field(pattern="^(OK|WARN|ERROR)$")
quality_notes: Optional[str] = None
@field_validator("timestamp")
@classmethod
def validate_timestamp(cls, v: str) -> str:
"""Verifica che il timestamp sia ISO 8601 valido e non nel futuro"""
try:
ts = datetime.fromisoformat(v)
if ts > datetime.now(timezone.utc):
raise ValueError("Timestamp nel futuro")
except ValueError as e:
raise ValueError(f"Timestamp non valido: {e}")
return v
@field_validator("ph")
@classmethod
def validate_ph_agronomico(cls, v: float) -> float:
"""pH fuori range agronomico (4.5-8.5) e anomalia"""
if v < 4.5 or v > 8.5:
log.warning(f"pH {v} fuori range agronomico tipico [4.5-8.5]")
return v
def has_critical_alert(self) -> bool:
"""Verifica se la lettura richiede un alert critico"""
return (
self.moisture_pct < 10.0 or
self.moisture_pct > 58.0 or
self.ph < 4.5 or
self.ph > 8.5 or
self.ec_ds_m > 4.0 or
self.battery_pct < 10
)
# ── Storage InfluxDB ──────────────────────────────────────────────────────────
class InfluxDBWriter:
"""Writer per time-series su InfluxDB 2.x"""
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
def write_soil_reading(self, reading: SoilReadingSchema):
"""Scrive una lettura suolo su InfluxDB con tags e fields ottimizzati"""
point = (
Point("soil_reading")
# Tags: usati per filtro e group-by (cardinalita limitata)
.tag("sensor_id", reading.sensor_id)
.tag("farm_id", reading.farm_id)
.tag("field_id", reading.field_id)
.tag("depth_cm", str(reading.depth_cm))
.tag("quality", reading.quality_flag)
# Fields: metriche numeriche
.field("moisture_pct", reading.moisture_pct)
.field("temperature_c", reading.temperature_c)
.field("ph", reading.ph)
.field("ec_ds_m", reading.ec_ds_m)
.field("battery_pct", float(reading.battery_pct))
.field("rssi_dbm", float(reading.rssi_dbm))
# Timestamp dalla lettura del sensore (non dall'arrivo)
.time(datetime.fromisoformat(reading.timestamp), WritePrecision.SECONDS)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
log.debug(f"Scritto su InfluxDB: {reading.sensor_id} @ {reading.timestamp}")
def close(self):
self.client.close()
# ── Bronze Layer su S3 ────────────────────────────────────────────────────────
class S3BronzeWriter:
"""Scrive dati grezzi su S3 (Bronze layer Medallion Architecture)"""
def __init__(self, bucket: str, region: str = "eu-south-1"):
self.s3 = boto3.client("s3", region_name=region)
self.bucket = bucket
def write_raw(self, raw_payload: str, topic: str, received_at: datetime):
"""
Salva il payload grezzo in formato NDJSON su S3.
Partitionamento per data: year/month/day/hour/
"""
prefix = received_at.strftime("year=%Y/month=%m/day=%d/hour=%H")
filename = f"{prefix}/{received_at.isoformat()}.json"
envelope = {
"topic": topic,
"received_at": received_at.isoformat(),
"payload": json.loads(raw_payload),
}
self.s3.put_object(
Bucket = self.bucket,
Key = filename,
Body = json.dumps(envelope).encode("utf-8"),
ContentType = "application/json",
)
log.debug(f"Bronze layer: scritto {filename}")
# ── Alert Manager ─────────────────────────────────────────────────────────────
class AlertManager:
"""Gestione alert critici con cooldown per evitare spam"""
def __init__(self):
self._last_alert: dict[str, datetime] = {}
self.cooldown_seconds = 300 # 5 minuti tra un alert e l'altro per sensor
def check_and_alert(self, reading: SoilReadingSchema):
if not reading.has_critical_alert():
return
sensor_key = reading.sensor_id
now = datetime.now(timezone.utc)
last = self._last_alert.get(sensor_key)
if last and (now - last).total_seconds() < self.cooldown_seconds:
return # In cooldown, skip
self._last_alert[sensor_key] = now
self._send_alert(reading)
def _send_alert(self, reading: SoilReadingSchema):
"""In produzione: invia SMS/email/push. Qui: log."""
alerts = []
if reading.moisture_pct < 10.0:
alerts.append(f"STRESS IDRICO: umidita {reading.moisture_pct}% sotto soglia critica 10%")
if reading.moisture_pct > 58.0:
alerts.append(f"SATURAZIONE: umidita {reading.moisture_pct}% sopra saturazione")
if reading.ph < 4.5:
alerts.append(f"pH CRITICO: {reading.ph} - suolo troppo acido")
if reading.ec_ds_m > 4.0:
alerts.append(f"SALINITA CRITICA: EC {reading.ec_ds_m} dS/m")
if reading.battery_pct < 10:
alerts.append(f"BATTERIA: {reading.battery_pct}% - sostituire")
for alert in alerts:
log.critical(f"ALERT [{reading.sensor_id}] {alert}")
# TODO: self.sms_client.send(...)
# TODO: self.email_client.send(...)
# ── Pipeline Consumer principale ──────────────────────────────────────────────
class AgriPipelineConsumer:
"""Consumer MQTT con validazione, storage e alerting integrati"""
def __init__(self):
self.influx = InfluxDBWriter(
url = "https://influxdb.azienda.it:8086",
token = "<INFLUX_TOKEN>",
org = "azienda-agricola",
bucket = "farm-sensors",
)
self.s3_bronze = S3BronzeWriter(bucket="farm-raw-data-bronze")
self.alert_mgr = AlertManager()
self.client = mqtt.Client(
client_id = "pipeline-consumer-001",
protocol = mqtt.MQTTv5,
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.stats = {"received": 0, "valid": 0, "errors": 0, "alerts": 0}
def _on_connect(self, client, userdata, flags, rc, props):
log.info("Consumer connesso al broker")
# Sottoscrive a TUTTI i sensori suolo di TUTTE le farm
client.subscribe("farm/+/+/soil", qos=1)
client.subscribe("farm/+/+/$status", qos=1)
log.info("Sottoscritto a: farm/+/+/soil e farm/+/+/$status")
def _on_message(self, client, userdata, message):
received_at = datetime.now(timezone.utc)
self.stats["received"] += 1
raw_payload = message.payload.decode("utf-8")
try:
# 1. Parse JSON
data = json.loads(raw_payload)
# 2. Salva Bronze layer (dato grezzo, prima di qualsiasi trasformazione)
self.s3_bronze.write_raw(raw_payload, message.topic, received_at)
# 3. Valida con Pydantic
reading = SoilReadingSchema(**data)
self.stats["valid"] += 1
# 4. Scrivi su InfluxDB (time-series)
self.influx.write_soil_reading(reading)
# 5. Check alert
self.alert_mgr.check_and_alert(reading)
if reading.has_critical_alert():
self.stats["alerts"] += 1
log.info(
f"Processed | {reading.sensor_id} | "
f"moisture={reading.moisture_pct}% | "
f"quality={reading.quality_flag}"
)
except json.JSONDecodeError as e:
self.stats["errors"] += 1
log.error(f"JSON non valido da {message.topic}: {e}")
except ValidationError as e:
self.stats["errors"] += 1
log.error(f"Validazione fallita per {message.topic}: {e}")
# Salva comunque su Bronze (dato anomalo ma registrato)
except Exception as e:
self.stats["errors"] += 1
log.exception(f"Errore imprevisto: {e}")
def run(self):
self.client.connect("emqx.azienda-agricola.it", 8883)
log.info("Pipeline consumer avviata. Ctrl+C per fermare.")
try:
self.client.loop_forever()
except KeyboardInterrupt:
pass
finally:
self.influx.close()
log.info(
f"Stats finali: {self.stats}"
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
consumer = AgriPipelineConsumer()
consumer.run()
Architektura medailonu pro zemědělská data: bronz, stříbro, zlato
The Medallion Architecture (zavedena společností Databricks, ale nyní je de facto standardem v datovém inženýrství) organizuje data do tří progresivních vrstev kvality. Aplikováno na zemědělská data IoT, řeší skutečné problémy: senzory odesílající odlehlé hodnoty, nesprávná časová razítka pro RTC drift, odečty duplikováno pro opakovaný přenos MQTT s QoS 1 a potřeba různých agregací na řídicí panel modely ML v reálném čase vs dlouhodobé.
Architektura medailonu pro IoT v zemědělství
| Vrstvy | Formát | Obsah | Operace | Udržení |
|---|---|---|---|---|
| bronz (surový) | JSON / Parkety | Syrové, neměnné užitečné zatížení MQTT | Žádná přeměna, jen sklad | 5 let (regulační) |
| Stříbro (vyčištěno) | Delta jezero / ledovce | Normalizovaný vzor, odlehlé hodnoty odstraněny | Deduplikace, filtr odlehlých hodnot, typové obsazení | 3 roky |
| zlato (Analytics) | Parkety | Hodinové/denní agregace, funkce ML | Agregace, spojení s počasím/satelitem | 10 let |
# medallion_pipeline.py
# Pipeline Medallion per dati agricoli IoT con PySpark / pandas
# In produzione: usa Databricks, AWS Glue, o dbt su Spark
# Qui: versione pandas per sviluppo locale e testing
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import json
import hashlib
DATA_ROOT = Path("/data/farm")
# ── BRONZE LAYER ──────────────────────────────────────────────────────────────
def load_bronze(date: str) -> pd.DataFrame:
"""
Carica dati grezzi dal Bronze layer (JSON NDJSON).
Nessuna trasformazione: solo lettura e schema enforcement minimo.
"""
bronze_path = DATA_ROOT / "bronze" / date
records = []
for f in bronze_path.glob("**/*.json"):
with open(f) as fp:
envelope = json.load(fp)
records.append(envelope)
if not records:
return pd.DataFrame()
# DataFrame con schema minimo garantito
df = pd.json_normalize(records, sep="_")
df["_bronze_loaded_at"] = datetime.utcnow().isoformat()
df["_source_file"] = [str(f) for f in bronze_path.glob("**/*.json")]
return df
# ── SILVER LAYER ──────────────────────────────────────────────────────────────
def transform_bronze_to_silver(df_bronze: pd.DataFrame) -> pd.DataFrame:
"""
Trasformazioni Bronze → Silver:
1. Parse e normalizza timestamp
2. Rimuovi duplicati (QoS 1 può consegnare più volte)
3. Filtra outlier fisicamente impossibili
4. Cast tipi corretti
5. Aggiungi colonne derivate
"""
if df_bronze.empty:
return pd.DataFrame()
df = df_bronze.copy()
# 1. Parse timestamp
df["ts"] = pd.to_datetime(df["payload_timestamp"], utc=True, errors="coerce")
df = df.dropna(subset=["ts"])
# 2. Deduplica: stesso sensor_id + timestamp = stesso dato
df["_dedup_key"] = df.apply(
lambda r: hashlib.md5(
f"{r.get('payload_sensor_id', '')}{r.get('payload_timestamp', '')}".encode()
).hexdigest(),
axis=1
)
df = df.drop_duplicates(subset=["_dedup_key"], keep="first")
# 3. Filtro outlier fisici
df = df[df["payload_moisture_pct"].between(0, 100)]
df = df[df["payload_temperature_c"].between(-40, 80)]
df = df[df["payload_ph"].between(0, 14)]
df = df[df["payload_ec_ds_m"].between(0, 20)]
# 4. Flag outlier agronomici (mantieni, ma etichetta)
df["is_agronomic_outlier"] = (
~df["payload_moisture_pct"].between(5, 60) |
~df["payload_ph"].between(4.5, 8.5) |
(df["payload_ec_ds_m"] > 4.0)
)
# 5. Colonne derivate
df["date"] = df["ts"].dt.date
df["hour"] = df["ts"].dt.hour
df["month"] = df["ts"].dt.month
# 6. Rename colonne per schema pulito
df = df.rename(columns={
"payload_sensor_id": "sensor_id",
"payload_farm_id": "farm_id",
"payload_field_id": "field_id",
"payload_moisture_pct": "moisture_pct",
"payload_temperature_c": "temperature_c",
"payload_ph": "ph",
"payload_ec_ds_m": "ec_ds_m",
"payload_battery_pct": "battery_pct",
"payload_quality_flag": "quality_flag",
})
# Seleziona solo colonne rilevanti
cols = [
"sensor_id", "farm_id", "field_id", "ts", "date", "hour", "month",
"moisture_pct", "temperature_c", "ph", "ec_ds_m", "battery_pct",
"quality_flag", "is_agronomic_outlier"
]
df = df[[c for c in cols if c in df.columns]]
df["_silver_processed_at"] = datetime.utcnow().isoformat()
return df
# ── GOLD LAYER ────────────────────────────────────────────────────────────────
def transform_silver_to_gold(df_silver: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""
Trasformazioni Silver → Gold:
Produce più tabelle di aggregazione per usi diversi.
"""
if df_silver.empty:
return {}
# Filtra outlier agronomici per analytics
df = df_silver[~df_silver["is_agronomic_outlier"]].copy()
gold_tables = {}
# ─── Aggregazione oraria per dashboard ───
hourly = df.groupby(["sensor_id", "farm_id", "field_id", "date", "hour"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_min = ("moisture_pct", "min"),
moisture_max = ("moisture_pct", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
readings_n = ("moisture_pct", "count"),
).reset_index()
hourly["moisture_avg"] = hourly["moisture_avg"].round(2)
gold_tables["hourly_aggregations"] = hourly
# ─── Aggregazione giornaliera per reporting agronomico ───
daily = df.groupby(["sensor_id", "farm_id", "field_id", "date"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_std = ("moisture_pct", "std"),
temp_min = ("temperature_c", "min"),
temp_max = ("temperature_c", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
stress_hours = ("moisture_pct", lambda x: (x < 20).sum()),
readings_n = ("moisture_pct", "count"),
).reset_index()
gold_tables["daily_agronomic"] = daily
# ─── Feature engineering per ML (predizione irrigazione) ───
ml_features = df.copy()
ml_features["moisture_lag_1h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(2)
ml_features["moisture_lag_3h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(6)
ml_features["moisture_trend"] = (
ml_features["moisture_pct"] - ml_features["moisture_lag_3h"]
)
ml_features["needs_irrigation"] = (ml_features["moisture_pct"] < 25.0).astype(int)
gold_tables["ml_irrigation_features"] = ml_features.dropna(
subset=["moisture_lag_1h", "moisture_lag_3h"]
)
return gold_tables
def run_medallion_pipeline(date: str):
"""Esegue la pipeline Medallion per una data specifica"""
log_prefix = f"[Medallion {date}]"
print(f"{log_prefix} Caricamento Bronze...")
bronze = load_bronze(date)
print(f"{log_prefix} Bronze: {len(bronze)} record")
print(f"{log_prefix} Trasformazione Silver...")
silver = transform_bronze_to_silver(bronze)
print(f"{log_prefix} Silver: {len(silver)} record (dopo dedup e filtro outlier)")
# Salva Silver
silver_path = DATA_ROOT / "silver" / date / "soil_readings.parquet"
silver_path.parent.mkdir(parents=True, exist_ok=True)
silver.to_parquet(silver_path, index=False)
print(f"{log_prefix} Trasformazione Gold...")
gold_tables = transform_silver_to_gold(silver)
for table_name, df in gold_tables.items():
gold_path = DATA_ROOT / "gold" / date / f"{table_name}.parquet"
gold_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(gold_path, index=False)
print(f"{log_prefix} Gold {table_name}: {len(df)} record")
print(f"{log_prefix} Pipeline completata.")
if __name__ == "__main__":
today = datetime.utcnow().strftime("%Y-%m-%d")
run_medallion_pipeline(today)
Integrace s InfluxDB a Apache Kafka
InfluxDB pro zemědělské časové řady
InfluxDB je nejvhodnější databáze pro vysokofrekvenční časové řady IoT. Na rozdíl od PostgreSQL
nebo MySQL a optimalizované pro masivní zápisy v časové posloupnosti, automatickou kompresi dat
historické (downsampling) a dotazy na časové intervaly s nativními agregačními funkcemi
(mean(), max(), moving_average()).
Pro farmu s 50 senzory, které každých 30 sekund odesílá zprávy, to InfluxDB zvládá pohodlně 100 zápisů za sekundu na spotřebním hardwaru se zásadami uchovávání automatické: nezpracovaná data za 30 dní, hodinové agregace za 1 rok, denní agregace na 10 let. To vše v několika MB úložiště za rok na senzor.
# influxdb_queries.py
# Query Flux per analytics agricole su InfluxDB 2.x
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
client = InfluxDBClient(
url = "https://influxdb.azienda.it:8086",
token = "<TOKEN>",
org = "azienda-agricola",
)
query_api = client.query_api()
# ─── Query 1: Media umidita ultime 24 ore per campo ───────────────────────────
QUERY_MOISTURE_24H = """
from(bucket: "farm-sensors")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r.field_id == "campo-nord")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> yield(name: "hourly_moisture")
"""
# ─── Query 2: Alert stress idrico (moisture < 20% nelle ultime 6h) ────────────
QUERY_DROUGHT_ALERT = """
from(bucket: "farm-sensors")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r._value < 20.0)
|> group(columns: ["sensor_id", "field_id"])
|> count()
|> filter(fn: (r) => r._value > 3)
|> yield(name: "drought_sensors")
"""
# ─── Query 3: Trend temperatura settimanale con banda statistica ──────────────
QUERY_TEMP_WEEKLY = """
from(bucket: "farm-sensors")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "temperature_c")
|> aggregateWindow(every: 6h, fn: mean)
|> movingAverage(n: 4)
|> yield(name: "temp_trend_7d")
"""
# ─── Query 4: Stato batterie sensori (per manutenzione preventiva) ────────────
QUERY_BATTERY_STATUS = """
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
|> filter(fn: (r) => r._value < 20)
|> sort(columns: ["_value"])
|> yield(name: "low_battery_sensors")
"""
def run_analytics():
print("=== Dashboard Analytics Agricole ===")
# Esegui query umidita
tables = query_api.query(QUERY_MOISTURE_24H)
for table in tables:
for record in table.records:
print(f" {record.get_time()} | "
f"sensor={record.values.get('sensor_id', 'N/A')} | "
f"moisture={record.get_value():.1f}%")
# Check alert stress idrico
drought_tables = query_api.query(QUERY_DROUGHT_ALERT)
for table in drought_tables:
for record in table.records:
print(f" ALERT STRESS IDRICO: sensor {record.values.get('sensor_id')} "
f"in field {record.values.get('field_id')}")
if __name__ == "__main__":
run_analytics()
Apache Kafka pro High-Scale Stream Processing
Když počet senzorů překročí stovky nebo když jsou vyžadovány subsekundové latence, Samotný MQTT jako distribuční systém nestačí. Apache Kafka přichází jako sběrnice zpráv podnik mezi brokerem MQTT a vrstvou zpracování. Standardní vzor je: MQTT Broker → Kafka Connect (MQTT Source Connector) → Kafka Topics → Consumer Groups.
Kafka přináší v tomto kontextu klíčové výhody: přehrávání zpráv (pokud má spotřebitel chybu, vše je znovu zpracováno), rozdělení podle oboru/geografické oblasti, více nezávislých spotřebitelů (Zapisovací programy InfluxDB, motory výstrah, ML inference, zapisovače datového jezera čtou stejná data bez zasahovat) a přesně jednou zaručuje s Kafka Transactions.
# kafka_consumer.py
# Consumer Kafka per dati sensori agricoli
# Dipendenze: pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import logging
log = logging.getLogger(__name__)
KAFKA_CONFIG = {
"bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"group.id": "farm-analytics-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Commit manuale per exactly-once
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "farm-consumer",
"sasl.password": "<KAFKA_PASSWORD>",
}
TOPICS = ["farm.soil.readings", "farm.weather.readings"]
def process_soil_message(data: dict) -> bool:
"""Elabora un messaggio sensore suolo"""
sensor_id = data.get("sensor_id", "unknown")
moisture = data.get("moisture_pct", 0)
# Routing logica: stress idrico → alert immediato
if moisture < 15.0:
log.warning(f"STRESS IDRICO critico: sensor {sensor_id} = {moisture}%")
# In produzione: pubblica su topic alert
return True
log.info(f"OK | sensor={sensor_id} | moisture={moisture}%")
return True
def run_kafka_consumer():
consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe(TOPICS)
log.info(f"Consumer Kafka avviato su topics: {TOPICS}")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
log.debug(f"Raggiunto EOF: {msg.topic()}[{msg.partition()}]")
else:
raise KafkaException(msg.error())
continue
# Processa messaggio
try:
data = json.loads(msg.value().decode("utf-8"))
success = process_soil_message(data)
if success:
consumer.commit(message=msg) # Commit solo se processato correttamente
except json.JSONDecodeError as e:
log.error(f"JSON non valido: {e}")
consumer.commit(message=msg) # Commit anche errori per non bloccare
except KeyboardInterrupt:
log.info("Consumer fermato")
finally:
consumer.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_kafka_consumer()
Grafana Dashboard and Alerting System
Grafana je standardní nástroj pro vizualizaci dat IoT v reálném čase. Integruje se nativně s InfluxDB 2.x prostřednictvím zdroje dat a umožňuje vám vytvářet interaktivní řídicí panely s geografickými mapami, grafy časových řad a stavové panely bez psaní kódu frontendu.
Pro zemědělský podnik standardní přístrojová deska obsahuje:
- Mapa pole: Plugin geomap s překryvy senzorů, barvy pro vodní stres
- Časová řada vlhkosti: 24hodinový graf s barevnými prahy (červená < 15 %, žlutá < 25 %)
- Ukazatel baterie: Stav baterie všech senzorů pro plánování údržby
- pH teplotní mapy: Tepelná mapa pH na pitch pro rozhodnutí nakopnout
- Předpověď zavlažování: Výstup modelu ML příštích 48 hodin
# grafana_alert_rules.yaml
# Regole di alerting Grafana per agricoltura di precisione
apiVersion: 1
groups:
- name: farm-critical-alerts
interval: 1m
rules:
# Alert stress idrico critico
- uid: drought-critical
title: "Stress Idrico Critico"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15.0]
type: lt
query: { params: ["A"] }
noDataState: Alerting
execErrState: Alerting
for: 10m
annotations:
summary: "Sensore {{ $labels.sensor_id }} in stress idrico"
description: "Umidita {{ $values.A }}% sotto soglia critica 15%"
labels:
severity: critical
team: agronomist
# Alert batteria scarica
- uid: battery-low
title: "Batteria Sensore in Esaurimento"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15]
type: lt
query: { params: ["A"] }
for: 5m
annotations:
summary: "Batteria bassa su {{ $labels.sensor_id }}"
labels:
severity: warning
team: maintenance
Anti-Pattern: Univerzální pevné prahové hodnoty
Častou chybou v raných implementacích je použití stejných prahových hodnot vlhkosti pro všechny plodiny a všechny druhy půdy. Pro půdu je správný práh kritické vlhkosti 20 %. hlinitá půda s rajčaty, ale úplně špatná pro písčitou půdu s vinnou révou nebo pro a zahradnická školka. Prahové hodnoty musí být konfigurovatelné pro každý senzor, kulturu a fázi fenologické. Implementuje systém konfigurace dynamického prahu, který není pevně zakódován do výstražný kód.
Nasazení, zabezpečení a škálovatelnost
Docker Compose pro místní rozvoj
Chcete-li vyvinout a otestovat celý zásobník lokálně bez cloudových závislostí, a
docker-compose.yml která zahrnuje makléře MQTT, InfluxDB a Grafana:
# docker-compose.yml
# Stack IoT agricolo completo per sviluppo locale
version: "3.9"
services:
# Broker MQTT - Eclipse Mosquitto
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT plain (solo sviluppo)
- "8883:8883" # MQTT TLS
- "9001:9001" # WebSocket
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./certs:/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
restart: unless-stopped
# Time-series database
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: changeme123!
DOCKER_INFLUXDB_INIT_ORG: azienda-agricola
DOCKER_INFLUXDB_INIT_BUCKET: farm-sensors
DOCKER_INFLUXDB_INIT_RETENTION: 30d
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-token
volumes:
- influxdb_data:/var/lib/influxdb2
restart: unless-stopped
# Dashboard e alerting
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
GF_INSTALL_PLUGINS: grafana-worldmap-panel,grafana-clock-panel
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
# Pipeline consumer (il nostro codice Python)
pipeline-consumer:
build:
context: .
dockerfile: Dockerfile.consumer
environment:
MQTT_HOST: mosquitto
MQTT_PORT: "1883"
INFLUX_URL: http://influxdb:8086
INFLUX_TOKEN: my-super-secret-token
INFLUX_ORG: azienda-agricola
INFLUX_BUCKET: farm-sensors
depends_on:
- mosquitto
- influxdb
restart: unless-stopped
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
grafana_data:
Nejlepší bezpečnostní postupy pro IoT v zemědělství
Bezpečnost zemědělských IoT systémů je často podceňována. Poškozený senzor se může změnit odečty a způsobit anomální zavlažování, plýtvání nebo poškození plodin. Naprosté minimum:
Kontrolní seznam zabezpečení IoT v zemědělství
- Vzájemné TLS (mTLS): Každé zařízení má jedinečný klientský certifikát. makléř ověřuje zařízení pomocí certifikátu, nikoli pouze uživatelského jména/hesla.
-
Tématické seznamy ACL: Každý senzor může publikovat POUZE na svá vlastní témata
(
farm/campo-nord/sensor-001/#) a nečtěte témata z jiných senzorů. - Podepisování firmwaru: Aktualizace OTA (over-the-air) musí být podepsány kryptograficky, aby se zabránilo škodlivému firmwaru.
- Střídání pověření: Tokeny a certifikáty s platností 1 rok, rotace automaticky prostřednictvím ACME nebo AWS IoT Certificate Rotation.
- Segmentace sítě: Zařízení IoT ve vyhrazené VLAN, izolované od sítě firemní. Přístup k brokerovi pouze z brány, nikdy ne přímo z internetu.
- Omezení sazby: Broker omezuje počet zpráv za sekundu zařízení, které zabrání náhodnému zaplavení nebo útokům.
Precizní zemědělství v Itálii: příležitosti a pobídky 2025
Itálie má jedinečnou pozici v evropském agrotech panoramatu: má potravinářský sektor nejlepší na světě (DOP, IGP, DOC), roztříštěná výrobní struktura (středně velké firmy 11 hektarů ve srovnání s průměrem EU 33), ale s velmi silnou agronomickou odborností a je první v Evropa pro zemědělskou přidanou hodnotu s 42,4 miliardami eur v roce 2024 (+9 % meziročně).
Úkolem je přinést IoT a AI technologie také do zemědělských malých a středních podniků, které tvoří 95 % produktivní tkaniny. Zde jsou finanční nástroje dostupné v roce 2025:
Pobídky AgriTech a IoT v Itálii (2025)
| Opatření | Částka/Výhoda | Příjemci | Vypršení platnosti |
|---|---|---|---|
| Plán přechodu 5.0 | 35–45% sleva na dani na investice do digitálních technologií + energie | Všechny podniky (včetně zemědělství) | 31/12/2025 |
| Mise PNRR 2.3 – Mechanizace | 400 mil. EUR nevratné příspěvky | Zemědělské podniky pro stroje 4.0 | Vyprodáno (očekávají se nové hovory) |
| Daňový kredit 4.0 (dříve Průmysl 4.0) | 20 % na technologické kapitálové statky | Společnosti se zemědělským ATECO | Prodlouženo 2025 |
| Oznámení o zemědělství INAIL ISI | Až 65 % nevratných | Zemědělské podniky a družstva | Roční (jarní otevření) |
| PSP (Strategický plán SZP 2023–2027) | Opatření ekologického systému s prémiemi za precizní zemědělství | Společnosti s registrovaným UAA | Roční (aplikace SZP) |
| PNRR Agrivoltaic Call | 1,5 MILIARDY EUR na fotovoltaické systémy integrované v zemědělství | Zemědělské podniky | 2025–2026 |
Pro farmu investující do kompletního systému IoT (senzory, brány, software, připojení) s celkovými náklady 50 000 EUR, Plán přechodu 5.0 může pokrýt až 22 500 EUR (45 % nákladů) jako slevu na dani, návratnost investice za 2-3 roky díky úspoře vody a hnojiv a pracovní síly.
Případová studie: Masseria Pugliese 500 hektarů – olivovník a pšenice
Reálný případ (agregovaná a anonymizovaná data) apulské zemědělské společnosti, která má implementovala IoT potrubí se 120 senzory půdní vlhkosti, 8 meteostanicemi a a automatický zavlažovací systém:
- Počáteční investice: 85 000 EUR (hardware + software + instalace)
- Příspěvky PNRR a Transition 4.0: 38 000 EUR (44,7 %)
- Čistá investice: 47 000 EUR
- Úspora vody v roce 1: -42 % (-18 000 EUR/rok na účtu za zavlažování)
- Úspora hnojiva: -22 % (-8 500 EUR/rok)
- Zvýšení výnosu obilí: +11 % (+15 000 EUR/rok obratu)
- Doba návratnosti: 2,3 roku
- 5letá návratnost investic: 342 %
Doporučené postupy a anti-vzorce, kterým je třeba se vyhnout
Konsolidované osvědčené postupy
-
Evoluční diagram s verzováním: Vždy uveďte pole
schema_versionv užitečném zatížení MQTT. Když přidáte nová pole, zvyšte verzi. Spotřebitel řídí různé verze bez porušení změn. - Časové razítko ze zařízení, nikoli ze serveru: Časové razítko musí být stejné senzoru v okamžiku odečtu, nikoli v okamžiku, kdy dorazí k brokerovi. Latence sítě může být variabilní a falšovat časové řady.
- QoS 1 standardně, QoS 2 pouze pro kritické příkazy: QoS 2 čtyřnásobek síťový provoz. Použijte jej pouze pro ovládací příkazy (otevření ventilů, spouštění čerpadel) kde je duplikace nepřijatelná.
- Povinná místní vyrovnávací paměť: Každá brána musí mít místní vyrovnávací paměť (soubory SQLite, NDJSON), abyste mohli pokračovat ve shromažďování dat během odpojení sítě. V zemědělství jsou spojení nespolehlivá.
- Pravidelná kalibrace senzoru: Čidla půdní vlhkosti jsou odvozena v průběhu času (obvykle 2–5 % ročně). Naplánujte pololetní kalibrace a implementujte automatická detekce driftu porovnáním sousedních senzorů.
- Zásady uchování InfluxDB: Konfigurace automatického převzorkování: nezpracovaná data 30 dní, hodinové agregace 1 rok, neomezené denní agregace. Uložit úložiště bez ztráty granularity potřebné pro dlouhodobou analýzu.
Kritické anti-vzorce, kterým je třeba se vyhnout
-
Ploché téma MQTT (bez hierarchie): Témata jako
sensor_001_moisturemístofarm/campo-nord/sensor-001/soil/moistureznemožnit použití zástupný znak pro agregované předplatné a škálování na stovky senzorů. - Dotazování namísto událostí řízeného: Neptejte se brokera MQTT každých N sekund „požádat“ o nová data. MQTT a push-based: vydavatel odešle, pokud existují data obdrží účastník okamžitě. Polling neguje všechny výhody protokolu.
- Žádné ověření schématu: Přijměte jakýkoli JSON od brokera bez validace schématu. Vadný senzor, který posílá nuly, řetězce místo čísel popř hodnoty mimo rozsah (teplota -999) znečišťuje celé datové jezero bez ověření.
- Přímé psaní od bronzu ke zlatu: Přeskočte vrstvu Silver a pište nezpracovaná data přímo do agregací. Když objevíte chybu v logice transformaci, musíte to celé znovu zpracovat, protože nezpracovaná data nebyla zachovalé.
- Veřejný makléř bez ověření: Používejte veřejné MQTT brokery (test.mosquitto.org) nebo soukromí brokeři bez TLS/autentizace. Zemědělská data jsou citlivá aktiva společnosti; soutěžící se může přihlásit k odběru vašich témat a číst výrobní situaci vaší společnosti v reálném čase.
- Jediný bod selhání u brokera: Jediný MQTT broker bez clustering nebo převzetí služeb při selhání. EMQX a HiveMQ podporují nativní clustering; Mosquit vyžaduje externí řešení pro HA. Broker dolů blokuje veškerý sběr dat.
Škálovatelnost: Od 10 do 10 000 senzorů
Popsaná architektura se lineárně škáluje až na několik tisíc senzorů bez úprav architektonický. Zde jsou typická čísla pro hardwarovou konfiguraci brokera MQTT:
Schopnost MQTT Broker pro konfiguraci
| Konfigurace | Makléř | Senzory Max | Zpráva/s | Měsíční náklady |
|---|---|---|---|---|
| Vývoj/Testování | Mosquitto na Raspberry Pi 4 | 100 | 500 | 0 (vlastněný hardware) |
| MSP (50–500 ha) | EMQX na VPS 4 vCPU/8GB | 1 000 | 5 000 | 40-80 EUR |
| Velká společnost (500+ ha) | Uzly EMQX Cluster 3 | 10 000 | 50 000 | 200-400 EUR |
| Družstvo/okres | HiveMQ Enterprise / AWS IoT Core | 100 000+ | Neomezený | Platba za použití |
AWS IoT Core a Azure IoT Hub jsou cloudové nativní volby pro národní nebo multipodnikové měřítka: automaticky spravují škálovatelnost, nabízejí 99,99% SLA a integrují se nativně s jejich příslušnými ekosystémy (AWS Timestream, Lambda, S3 pro AWS; Azure Data Explorer, Stream Analytics, Data Lake pro Azure). Cena je obvykle 1-5 dolarů za milion zpráv, s velkorysou bezplatnou úrovní pro první testy.
Závěry a další kroky
Postavili jsme celý IoT potrubí pro přesné zemědělství: ze senzorů na poli do strukturovaného datového jezera prostřednictvím MQTT s vhodnou QoS, Pydantic validací, Úložiště InfluxDB pro časové řady a architekturu Medallion pro datové jezero. Kód Pythonu zobrazený je připraven k výrobě a pokrývá základní vzory nalezené ve skutečných systémech.
Klíčové body, které si s sebou vezměte:
- MQTT je správný protokol pro zemědělský IoT: lehký, asynchronní, s podporou nestabilní konektivity
- LoRaWAN pro otevřené pole, WiFi/RS-485 pro skleníky: neexistuje jediný univerzální bezdrátový protokol
- Ověření schématu (Pydantic) není volitelné: senzory odesílají špatná data častěji, než si myslíte
- Architektura medailonu (bronzová/stříbrná/zlatá) zaručuje znovuzpracovatelnost a progresivní kvalitu dat
- InfluxDB + Grafana je minimální zásobník pro monitorování v reálném čase bez licenčních nákladů
- Pobídky PNRR a Transition 5.0 pokrývají až 45 % investic italských zemědělských podniků
V dalším článku ze série FoodTech prozkoumáme, jak aplikovat šablony hrana strojového učení pro monitorování plodin přímo na bráně, snížení latence z minut na milisekundy a umožnění reakcí na kritické události v reálném čase jako jsou náhlé mrazy nebo napadení houbami v počáteční fázi.
Zdroje a statistiky
- Oficiální dokumentace paho-mqtt: eclipse.dev/paho
- EMQX Broker (open source, připravený na cluster): emqx.io
- Dokumentace InfluxDB 2.x: docs.influxdata.com
- Plán přechodu 5.0 MIMIT: mimit.gov.it
- ISTAT Precision Agriculture 2024: istat.it
- Architektura medailonu Databricks: databricks.com
Související články v jiných sériích
- MLOps: Jak nasadit modely ML predikce výnosů do produkce
- Data & AI Business – AI ve výrobě: Vzory IoT aplikované na průmysl pomocí Kafka a OPC-UA
- AI inženýrství: RAG na agronomickou dokumentaci pro virtuální asistenty pro agronomy







