Rurociąg IoT dla rolnictwa precyzyjnego z Pythonem i MQTT
Pole pszenicy w Apulii. Trzy tysiące hektarów, czterdzieści czujników zakopanych na różnych głębokościach, dane z temperatura, wilgotność gleby, pH i przewodność elektryczna, które docierają co trzydzieści sekund. Bez ustrukturyzowany potok danych, to po prostu szum cyfrowy. Przy odpowiednim rurociągu staje się silnikiem decyzji, które zmniejszają zużycie wody o 40%, zwiększają wydajność o 15% i obniżają koszty nawozów o jedną czwartą.
Rolnictwo precyzyjne nie jest obietnicą przyszłości: to rzeczywistość przemysłowa, która będzie obowiązywać w roku 2025 14,77 miliardów dolarów globalnie, z prognozami dotyczącymi rynku 26,86 miliardów do 2030 r przy CAGR na poziomie 12,7%. We Włoszech 28,5% firm rolnictwo wykorzystuje już techniki rolnictwa precyzyjnego zgodnie z danymi ISTAT 2024, przy czym szczytowe wartości wynoszą: 41,1% w spółkach posiadających powyżej 100 ha UR. Włoski sektor rolniczy wygenerował wartość dodany przez 42,4 mld euro w 2024 r, potwierdzając, że Włochy zajmują pierwsze miejsce w Europie, a cyfryzacja jest głównym motorem tych wyników.
Ale pomiędzy czujnikiem na polu a właściwą decyzją agronomiczną istnieje złożona ścieżka techniczna: protokoły bezprzewodowe małej mocy, broker MQTT, walidacja schematu, potok wzbogacania, architektura medalionowa na jeziorze danych, pulpit nawigacyjny w czasie rzeczywistym i system ostrzegania. Ten artykuł obejmuje każdy etap tego łańcucha za pomocą działającego kodu Pythona, rzeczywistych architektur i najlepszych praktyk sprawdzone w produkcji.
Czego dowiesz się w tym artykule
- Kompleksowa architektura systemu IoT dla rolnictwa precyzyjnego
- Porównanie typów czujników rolniczych i protokołów bezprzewodowych: MQTT, LoRaWAN, Zigbee
- Głębokie nurkowanie MQTT: QoS 0/1/2, zachowane wiadomości, ostatnia wola, projekt tematu
- Ukończ implementację języka Python za pomocą paho-mqtt: wydawca czujników i potok konsumencki
- Walidacja i egzekwowanie schematu danych IoT za pomocą Pydantic
- Integracja z InfluxDB dla szeregów czasowych i Apache Kafka dla przetwarzania strumieniowego
- Architektura medalionowa (brązowa/srebrna/złota) dla danych rolniczych
- Panel Grafana i system ostrzegania o progach krytycznych
- Kontekst włoski: WPR, PNRR Przejście 5.0, zachęty AgriTech 2025
Seria FoodTech - Wszystkie artykuły
| # | Przedmiot | Poziom | Państwo |
|---|---|---|---|
| 1 | Rurociąg IoT dla rolnictwa precyzyjnego (tutaj jesteś) | Zaawansowany | Aktualny |
| 2 | ML Edge do monitorowania upraw: wizja komputerowa na polach | Zaawansowany | Już wkrótce |
| 3 | Satelitarne API i wskaźniki roślinności: NDVI z Pythonem i Sentinel-2 | Mediator | Już wkrótce |
| 4 | Identyfikowalność Blockchain w żywności: od pola do supermarketu | Mediator | Już wkrótce |
| 5 | Wizja komputerowa w kontroli jakości w przemyśle spożywczym | Zaawansowany | Już wkrótce |
| 6 | FSMA i zgodność cyfrowa: automatyzacja procesów regulacyjnych | Mediator | Już wkrótce |
| 7 | Rolnictwo pionowe: kontrola środowiska za pomocą IoT i ML | Zaawansowany | Już wkrótce |
| 8 | Prognozowanie popytu na sprzedaż detaliczną żywności za pomocą Prophet i LightGBM | Mediator | Już wkrótce |
| 9 | Pulpit nawigacyjny Farm Intelligence: analityka w czasie rzeczywistym za pomocą Grafany | Mediator | Już wkrótce |
| 10 | Optymalizacja żywności w łańcuchu dostaw: ML na rzecz redukcji odpadów | Mediator | Już wkrótce |
Rynek AgriTech w 2025 roku: Liczby i trendy
W ciągu zaledwie kilku lat rolnictwo precyzyjne z technologii niszowej stało się strategiczną siłą napędową konkurencyjność sektora pierwotnego. Liczby wyraźnie to potwierdzają: światowy rynek zastosowanie ma rolnictwo precyzyjne 14,77 miliardów dolarów w 2025 roku i będzie rosło 26,86 miliardów do 2030 r. Jednak ogólny obraz AgriTech jest jeszcze szerszy: segment rozszerzony, który obejmuje oprogramowanie zarządzające, drony rolnicze, robotykę i cyfrowe biowejścia, przekracza Według różnych źródeł badawczych, w 2025 r. będzie to 30 miliardów, przy oczekiwanym CAGR na poziomie 16–23% do 2031 r.
We Włoszech rok 2024 był punktem zwrotnym: według ISTAT włoskie rolnictwo osiągnęło już szczyt pierwsze miejsce w Europie pod względem wartości dodanej z 42,4 mld euro (+9% w porównaniu do 2023 r.). Produkcja rolna wzrosła wolumenowo o 1,4%, a wartość dodana o 3,5%. Jednocześnie 28,5% włoskich przedsiębiorstw rolniczych stosuje już techniki precyzyjne, z większą koncentracją na północnym wschodzie (33%) i północno-zachodnim (32,1%) oraz u dużych operatorów (41,1% w firmach o powierzchni użytków rolnych powyżej 100 ha).
Wdrożenie precyzyjnych technologii rolnictwa w 2025 r
| Technologia | Główna aplikacja | Adopcja Włochy | Średni zwrot z inwestycji |
|---|---|---|---|
| Czujniki gleby IoT | Zmienne podlewanie, nawożenie | Alta (północne Włochy) | Redukcja kosztów wejściowych o 15-25%. |
| Drony rolnicze | Mapowanie, zabiegi, analiza liści | Przeciętny | 30-40% oszczędności na pestycydach |
| Zdjęcia satelitarne | NDVI, stres wodny, przewidywane plony | Średnio-wysoki | Optymalizacja wydajności 5-10%. |
| Stacje pogodowe IoT | Modele przewidywania chorób, nawadnianie | Wysoki | 10-20% zniżki na zabiegi |
| Technologia zmiennej dawki | Siew, nawożenie zmienne | Nisko-średni | 8-15% oszczędności na wejściu |
| Uczenie maszynowe na danych terenowych | Prognoza plonów, optymalizacja agronomiczna | Niski | +10-20% wydajności, -15% wejścia |
PNRR odegrało decydującą rolę w akceleracji: Misja 2 „Zielona Rewolucja i Ekologiczna transformacja” przeznaczyła 400 milionów euro na modernizację floty pojazdów rolnictwo w stronę technologii 4.0. Plan Przejściowy 5.0, którego całkowity budżet wynosi 12,7 mld euro w dwuletnim okresie 2024-2025 (6,3 miliarda specjalnie dla Przejście 5.0), a także mające zastosowanie do rolnictwa: ustawa budżetowa na rok 2025 (L. 207/2024) rozszerzyło zakres stosowania, umożliwiając przedsiębiorstwom rolniczym korzystanie z ulg podatkowych na inwestycje w technologie cyfrowe i energooszczędne.
Architektura IoT dla rolnictwa: od czujnika do jeziora danych
Przed napisaniem choćby jednej linii kodu konieczne jest zrozumienie architektury systemu kompletny. Błąd projektowy na początku prowadzi do kosztownych przeróbek podczas skalowania 10 czujników na 10 000. Architektura, którą tutaj opisujemy, jest tą przyjętą przez główne operatorów w branży i zweryfikowane w rzeczywistych środowiskach produkcyjnych.
Architektura kompleksowa: warstwy i komponenty
┌─────────────────────────────────────────────────────────────────────┐
│ FIELD LAYER (Campo) │
│ [Sensore Suolo] [Stazione Meteo] [Sensore pH] [Drone Mapping] │
│ │ │ │ │ │
│ └────────────────┴────────────────┘ │ │
│ │ LoRaWAN / Zigbee / RS-485 │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ GATEWAY LAYER (Edge) │
│ [Gateway LoRaWAN / Raspberry Pi 4] │
│ - Aggregazione dati multi-sensore │
│ - Pre-elaborazione e filtro outlier │
│ - Buffer locale (offline tolerance) │
│ - Protocollo: MQTT publish su broker locale │
└────────────────────────┼────────────────────────────────────────────┘
│ MQTT / TLS
┌────────────────────────┼────────────────────────────────────────────┐
│ BROKER LAYER (Fog/Cloud) │
│ [EMQX / HiveMQ / Eclipse Mosquitto] │
│ - Topic management gerarchico │
│ - Autenticazione mTLS / JWT │
│ - QoS management e message persistence │
│ - Bridge verso cloud (AWS IoT / Azure IoT Hub) │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ PROCESSING LAYER (Cloud) │
│ [Apache Kafka] ──► [Stream Processor] ──► [InfluxDB] │
│ - Ingestion stream - Validazione schema - Time-series store │
│ - Partitioning - Enrichment - Retention policy │
│ - Consumer groups - Alerting real-time - Downsampling │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ DATA LAKE (Medallion Architecture) │
│ [Bronze: Raw S3] ──► [Silver: Cleaned] ──► [Gold: Analytics] │
│ - Dati grezzi MQTT - Schema validato - Aggregazioni │
│ - Immutabile - Outlier rimossi - ML features │
│ - Formato: Parquet - Formato: Delta/Iceberg - Formato: Parquet │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ SERVING LAYER (Applicazioni) │
│ [Grafana Dashboard] [Alert Manager] [ML Models] [Mobile App] │
│ - Real-time monitoring - SMS/Email/Push - Previsioni rese │
│ - Mappa campo - Soglie critiche - Ottimizzazione input │
└─────────────────────────────────────────────────────────────────────┘
Each level has distinct and decoupled responsibilities. To oddzielenie i krytyka: pozwala replace a component (e.g. switch from Mosquitto to EMQX when scaling) without touching inni. And it allows you to implement offline tolerance: if the connection to the cloud drops, the gateway nadal gromadzi i buforuje dane lokalnie.
Czujniki rolnicze: rodzaje, protokoły i zastosowanie
Główne kategorie czujników terenowych
Wybór odpowiednich czujników zależy od uprawy, warunków glebowych i celów agronomowie. Oto główne kategorie z charakterystyką techniczną i orientacyjnymi kosztami na rok 2025:
Czujniki dla rolnictwa precyzyjnego
| Kategoria | Parametr | Technologia | Koszt jednostkowy | Protokół |
|---|---|---|---|---|
| Wilgotność gleby (VWC) | Objętościowa zawartość wody | FDR, TDR, pojemnościowy | 30-150 EUR | SDI-12, RS-485, LoRa |
| Temperatura gleby | T uziemiony na poziomie 10/30/50 cm | PT100, NTC | 20-80 EUR | SDI-12, I2C, 1-przewodowy |
| pH gleby | Kwasowość na miejscu | Elektroda ISE | 80-300 EUR | RS-485, Modbus |
| Przewodność elektryczna (EC) | Zasolenie, płodność | Indukcyjny, kontaktowy | 60-200 EUR | SDI-12, RS-485 |
| Stacja pogodowa | T, HR, wiatr, deszcz, promieniowanie | Zintegrowany multisensor | 200-800 EUR | RS-485, WiFi, LoRa |
| Czujnik liści | Wilgotność liści, temperatura | Pojemnościowy, IR | 40-120 EUR | SDI-12, I2C |
| Przepływomierz do nawadniania | Natężenie przepływu wody | Ultradźwiękowy, śmigło | 80-350 EUR | Impuls, RS-485 |
| Przenośny czujnik NDVI | Indeks wegetatywny | Wielospektralny | 300-1500 EUR | Bluetooth, Wi-Fi |
Protokoły komunikacji bezprzewodowej: najlepsze porównanie
Wybór protokołu bezprzewodowego jest prawdopodobnie najważniejszą decyzją w architekturze systemu Rolniczy Internet Rzeczy. Pola mają ekstremalną charakterystykę: odległości do 10 km, przeszkody fizyczne (rzędy, drzewa, zabudowa wiejska), brak zasilania sieciowego, temperatury od -20 do +60 stopni Celsjusza.
Porównanie protokołów bezprzewodowych dla rolniczego Internetu Rzeczy
| Protokół | Zakres | Zespół | Bębny | Koszt infrastruktury | Użyj przypadku |
|---|---|---|---|---|---|
| LoRaWAN | 3-15 km | 0,3–50 kb/s | 5-10 lat | Średni (brama) | Czujniki gleby, odległa pogoda w terenie |
| NB-IoT | Ponad 10 km | 20-250 kb/s | 3-8 lat | Niski (karta SIM) | Obszary objęte zasięgiem 4G/5G |
| Zigbee | 10-100 m | 250 kb/s | 1-3 lata | Niska (siatka) | Szklarnie, automatyczne systemy nawadniające |
| Wi-Fi 6 | 100-200 m | Wysoka (Gb/s) | Godziny/dni | Średni (AP) | Systemy kamer, analiza jakości obrazu |
| 4G/LTE | Nieograniczony | Wysoki | 1-5 lat | Średni (karta SIM) | Maszyny rolnicze, bramka mobilna |
| RS-485 (przewodowy) | 1200 m | 10Mbps | Nie dotyczy | Bas (przewody) | Kontrolowane szklarnie, systemy stałe |
W typowym włoskim gospodarstwie (50-500 hektarów na otwartych polach) najczęściej spotykanym rozwiązaniem jest tzw 2025 i architektura hybrydowa: LoRaWAN dla czujników gleby na odległych polach, NB-IoT dla maszyn rolniczych w ruchu, tj WiFi/przewodowe dla szklarni gdzie precyzja i częstotliwość próbkowania są maksymalne. Centralna brama (często a Raspberry Pi 4 lub bramka przemysłowa Dragino) agreguje wszystko i publikuje poprzez MQTT w chmurze.
Głębokie nurkowanie MQTT: architektura, QoS i najlepsze praktyki
MQTT (Message Queuing Telemetry Transport) to de facto protokół dla IoT. Stworzony przez IBM w XX wieku monitorowanie rurociągów naftowych za pośrednictwem satelity, stało się normą ISO/IEC 20922 i sercem każdego poważnego systemu IoT. Jego prostota i moc sprawiają, że idealnie nadaje się do środowisk z urządzenia o ograniczonej przepustowości i niskim zużyciu energii.
Model publikowania/subskrypcji
W przeciwieństwie do typowego modelu żądania/odpowiedzi protokołu HTTP, MQTT wykorzystuje paradygmat publikuj/subskrybuj: producenci danych (wydawcy) nie wiedzą, kto je czyta, oraz tj konsumenci (abonenci) nie wiedzą, kto publikuje. Oddzielenie jest całkowite i pośredniczy a centralny element tzw pośrednik.
Dane są zorganizowane w temat, hierarchiczne ciągi znaków oddzielone ukośnikami opisać charakter danych. Dobrze zaprojektowany temat jest kluczem do skalowalnej architektury. Dla gospodarstwa wielodziałkowego zaleca się następującą strukturę:
# Struttura topic MQTT consigliata per agricoltura di precisione
# Pattern: azienda/appezzamento/dispositivo/tipo-sensore/metrica
# Esempi concreti:
farm/campo-nord/sensor-001/soil/moisture # Umidita suolo sensore 001
farm/campo-nord/sensor-001/soil/temperature # Temperatura suolo sensore 001
farm/campo-nord/sensor-001/soil/ph # pH suolo sensore 001
farm/campo-nord/sensor-001/soil/ec # Conducibilita elettrica
farm/campo-sud/weather-station/air/temperature # Temperatura aria stazione meteo
farm/campo-sud/weather-station/air/humidity # Umidita aria
farm/campo-sud/weather-station/wind/speed # Velocita vento
farm/campo-sud/weather-station/rain/mm # Precipitazioni
farm/+/+/soil/moisture # Wildcard: umidita suolo da TUTTI i campi e sensori
farm/campo-nord/# # Wildcard: TUTTI i dati dal campo nord
farm/# # Wildcard: TUTTI i dati dell'azienda
# Topic di sistema (prefisso $)
$SYS/brokers/emqx/connections/count # Statistiche broker
farm/campo-nord/sensor-001/$status # Status device (LWT)
farm/campo-nord/sensor-001/$command # Comandi al dispositivo
Jakość usług (QoS): trzy poziomy
MQTT QoS definiuje gwarancję dostarczenia komunikatu pomiędzy klientem a brokerem. Wybór odpowiedniego poziomu ma bezpośredni wpływ na baterię, przepustowość i niezawodność systemu:
MQTT QoS: szczegółowe porównanie
| Poziom | Nazwa | Gwarancja | Nad głową | Zastosowanie w rolnictwie |
|---|---|---|---|---|
| Jakość usług 0 | Co najwyżej raz | Brak (odpal i zapomnij) | Minimalna (1 opakowanie) | Telemetria wysokiej częstotliwości (T co 5 sekund), akceptowalna strata |
| Jakość usług 1 | Przynajmniej raz | Przynajmniej jedna dostawa (możliwe duplikaty) | Niski (2 pakiety, ACK) | Odczyty czujnika wilgotności/pH, dziennik nawadniania |
| Jakość usług 2 | Dokładnie raz | Gwarantowana dostawa tylko raz | Wysoka (4 opakowania) | Sterowanie zaworami nawadniającymi, alarmy krytyczne, dozowanie nawozów |
Zachowane przesłania i testament ostatniej woli
Dwie funkcje MQTT, które są szczególnie przydatne w rolnictwie:
- Zachowane wiadomości: Broker przechowuje ostatnią wiadomość na dany temat i lo dostarcza natychmiast każdemu nowemu abonentowi. Krytyczne dla aktualnego stanu czujników: dashboard, który się łączy, natychmiast otrzymuje najnowsze wartości, bez czekania na kolejne cykl wydawniczy.
- Testament ostatniej woli (LWT): Podczas podłączania każde urządzenie może skonfigurować wiadomość „testamentową”, którą broker automatycznie opublikuje, jeśli połączenie nie działa nieprawidłowo. Niezbędny do wykrywania czujników offline bez odpytywania aktywny: jeśli czujnik nie rozłącza się prawidłowo (słaba bateria, zakłócenia), broker automatycznie publikuje status „offline” w temacie statusu.
Kompletna implementacja Pythona: Sensor Publisher
We implement a Python publisher that simulates a realistic agricultural sensor node with humidity
gleba, temperatura, pH i EC. Kod wykorzystuje paho-mqtt 2.x (Nowoczesne API z wywołaniami zwrotnymi
updated) and implements all best practices: LWT, retained messages, appropriate QoS,
automatic reconnection and structured JSON schema.
# sensor_node.py
# Nodo sensore MQTT per agricoltura di precisione
# Dipendenze: pip install paho-mqtt pydantic
import paho.mqtt.client as mqtt
import json
import time
import random
import math
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
import ssl
# ── Configurazione ────────────────────────────────────────────────────────────
BROKER_HOST = "emqx.azienda-agricola.it"
BROKER_PORT = 8883 # TLS
KEEPALIVE = 60 # secondi
CLIENT_ID = "sensor-campo-nord-001"
FARM_ID = "farm-001"
FIELD_ID = "campo-nord"
SENSOR_ID = "sensor-001"
# Topic base
TOPIC_BASE = f"{FARM_ID}/{FIELD_ID}/{SENSOR_ID}"
TOPIC_SOIL = f"{TOPIC_BASE}/soil"
TOPIC_STATUS = f"{TOPIC_BASE}/$status"
TOPIC_COMMAND = f"{TOPIC_BASE}/$command"
# Intervallo di pubblicazione in secondi
PUBLISH_INTERVAL = 30
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
# ── Modello dati sensore ───────────────────────────────────────────────────────
@dataclass
class SoilReading:
"""Lettura completa da un nodo sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str # ISO 8601 UTC
latitude: float
longitude: float
depth_cm: int # Profondità di installazione
# Misurazioni suolo
moisture_pct: float # Umidita volumetrica (VWC) %
temperature_c: float # Temperatura suolo gradi C
ph: float # pH suolo (4.0 - 9.0)
ec_ds_m: float # Conducibilita elettrica dS/m
# Metadata dispositivo
battery_pct: int # Livello batteria
rssi_dbm: int # Signal strength in dBm
firmware_version: str
# Flag qualità
quality_flag: str # "OK", "WARN", "ERROR"
quality_notes: Optional[str] = None
def read_sensors_from_hardware() -> SoilReading:
"""
In produzione: legge i sensori reali via SDI-12 o RS-485.
Qui: simula dati realistici con variazione temporale.
"""
now = datetime.now(timezone.utc)
# Ciclo circadiano per temperatura (più alta nelle ore centrali)
hour = now.hour
temp_base = 18.0
temp_variation = 6.0 * math.sin(math.pi * (hour - 6) / 12) if 6 <= hour <= 18 else -2.0
temperature = temp_base + temp_variation + random.gauss(0, 0.3)
# Umidita: degrada lentamente senza pioggia, segue ciclo stagionale
base_moisture = 35.0 # % VWC campo (35% = campo saturo irrigazione)
moisture = base_moisture + random.gauss(0, 1.5)
moisture = max(5.0, min(60.0, moisture)) # clamp fisico
# pH relativamente stabile
ph = 6.8 + random.gauss(0, 0.1)
ph = max(4.0, min(9.0, ph))
# EC: correlata alla salinita e fertilizzazione
ec = 1.2 + random.gauss(0, 0.05)
ec = max(0.1, min(5.0, ec))
# Battery che decresce lentamente (simulazione)
battery = max(10, 95 - int(time.time() / 3600) % 85)
# Quality flag automatico
quality = "OK"
notes = None
if moisture < 10.0:
quality = "WARN"
notes = "Umidita sotto soglia minima critica"
elif moisture > 55.0:
quality = "WARN"
notes = "Umidita sopra soglia saturazione"
if battery < 15:
quality = "WARN"
notes = (notes or "") + " | Batteria in esaurimento"
return SoilReading(
sensor_id = SENSOR_ID,
farm_id = FARM_ID,
field_id = FIELD_ID,
timestamp = now.isoformat(),
latitude = 40.4164,
longitude = 17.9308,
depth_cm = 30,
moisture_pct = round(moisture, 2),
temperature_c = round(temperature, 2),
ph = round(ph, 2),
ec_ds_m = round(ec, 3),
battery_pct = battery,
rssi_dbm = random.randint(-95, -45),
firmware_version = "2.4.1",
quality_flag = quality,
quality_notes = notes,
)
# ── Client MQTT ───────────────────────────────────────────────────────────────
class AgriSensorNode:
"""Nodo sensore MQTT con auto-reconnect e LWT"""
def __init__(self):
self.client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
self._setup_auth()
self._setup_tls()
self._setup_callbacks()
self._setup_lwt()
self.connected = False
def _setup_auth(self):
self.client.username_pw_set(
username="sensor-user",
password="<TOKEN_SEGRETO>"
)
def _setup_tls(self):
"""TLS mutuo con certificato dispositivo"""
self.client.tls_set(
ca_certs = "/certs/ca.crt",
certfile = "/certs/sensor.crt",
keyfile = "/certs/sensor.key",
tls_version = ssl.PROTOCOL_TLS_CLIENT,
)
def _setup_callbacks(self):
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
def _setup_lwt(self):
"""Last Will Testament: pubblicato dal broker se la connessione cade"""
lwt_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "connection_lost",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.will_set(
topic = TOPIC_STATUS,
payload = lwt_payload,
qos = 1,
retain = True, # Retain: dashboard vede subito lo stato offline
)
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
log.error(f"Connessione fallita: {reason_code}")
return
log.info(f"Connesso al broker: {BROKER_HOST}")
self.connected = True
# Pubblica stato online (retained)
online_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "online",
"timestamp": datetime.now(timezone.utc).isoformat(),
"firmware": "2.4.1",
})
client.publish(TOPIC_STATUS, online_payload, qos=1, retain=True)
# Sottoscrivi ai comandi
client.subscribe(TOPIC_COMMAND, qos=2)
log.info(f"Sottoscritto a: {TOPIC_COMMAND}")
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
self.connected = False
log.warning(f"Disconnesso: {reason_code}. Tentativo riconnessione...")
def _on_message(self, client, userdata, message):
"""Gestione comandi ricevuti dal broker (es. cambio intervallo)"""
try:
payload = json.loads(message.payload.decode())
cmd = payload.get("command")
log.info(f"Comando ricevuto: {cmd}")
if cmd == "set_interval":
global PUBLISH_INTERVAL
PUBLISH_INTERVAL = int(payload.get("value", 30))
log.info(f"Intervallo aggiornato a {PUBLISH_INTERVAL}s")
elif cmd == "reboot":
log.warning("Comando reboot ricevuto")
# In produzione: riavvia il sistema
except Exception as e:
log.error(f"Errore parsing comando: {e}")
def _on_publish(self, client, userdata, mid, reason_code, properties):
log.debug(f"Messaggio {mid} pubblicato con successo")
def connect(self):
self.client.connect(
host = BROKER_HOST,
port = BROKER_PORT,
keepalive = KEEPALIVE,
)
self.client.loop_start() # Thread background per I/O
def publish_reading(self, reading: SoilReading):
"""Pubblica lettura sensore su topic appropriati"""
# Payload principale: lettura completa
payload_full = json.dumps(asdict(reading), default=str)
result = self.client.publish(
topic = TOPIC_SOIL,
payload = payload_full,
qos = 1,
retain = True, # Ultimo valore sempre disponibile
)
# Pubblica anche metriche singole per dashboard real-time
metrics = {
"moisture": (reading.moisture_pct, 1),
"temperature": (reading.temperature_c, 0),
"ph": (reading.ph, 1),
"ec": (reading.ec_ds_m, 0),
}
for metric, (value, qos) in metrics.items():
self.client.publish(
topic = f"{TOPIC_SOIL}/{metric}",
payload = str(value),
qos = qos,
retain = True,
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
log.info(
f"Pubblicato | Moisture: {reading.moisture_pct}% | "
f"Temp: {reading.temperature_c}C | pH: {reading.ph} | "
f"EC: {reading.ec_ds_m} dS/m | Quality: {reading.quality_flag}"
)
else:
log.error(f"Errore pubblicazione: {result.rc}")
def run(self):
"""Loop principale del nodo sensore"""
self.connect()
# Attendi connessione iniziale
timeout = 10
while not self.connected and timeout > 0:
time.sleep(1)
timeout -= 1
if not self.connected:
log.error("Impossibile connettersi al broker")
return
log.info(f"Nodo sensore avviato. Intervallo: {PUBLISH_INTERVAL}s")
try:
while True:
reading = read_sensors_from_hardware()
self.publish_reading(reading)
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
log.info("Shutdown richiesto")
finally:
# Disconnessione pulita: il LWT NON viene inviato
offline_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "graceful_shutdown",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.publish(TOPIC_STATUS, offline_payload, qos=1, retain=True)
time.sleep(0.5)
self.client.loop_stop()
self.client.disconnect()
if __name__ == "__main__":
node = AgriSensorNode()
node.run()
Rurociąg konsumencki: walidacja, wzbogacanie i przechowywanie
Konsument to drugi koniec systemu. Subskrybuje tematy MQTT, sprawdza otrzymane dane Pydantic, wzbogaca odczyty o dane kontekstowe (informacje agronomiczne, alerty pogodowe) i kieruje je do InfluxDB w celu przechowywania szeregów czasowych oraz do S3 w celu gromadzenia danych.
# pipeline_consumer.py
# Consumer MQTT + validazione Pydantic + storage InfluxDB
# Dipendenze: pip install paho-mqtt pydantic influxdb-client boto3
import paho.mqtt.client as mqtt
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field, field_validator, ValidationError
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import boto3
import io
log = logging.getLogger(__name__)
# ── Schema di validazione Pydantic ────────────────────────────────────────────
class SoilReadingSchema(BaseModel):
"""Schema di validazione per letture sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str
latitude: float = Field(ge=-90, le=90)
longitude: float = Field(ge=-180, le=180)
depth_cm: int = Field(ge=0, le=200)
moisture_pct: float = Field(ge=0.0, le=100.0)
temperature_c: float = Field(ge=-40.0, le=80.0)
ph: float = Field(ge=0.0, le=14.0)
ec_ds_m: float = Field(ge=0.0, le=20.0)
battery_pct: int = Field(ge=0, le=100)
rssi_dbm: int = Field(ge=-150, le=0)
firmware_version: str
quality_flag: str = Field(pattern="^(OK|WARN|ERROR)$")
quality_notes: Optional[str] = None
@field_validator("timestamp")
@classmethod
def validate_timestamp(cls, v: str) -> str:
"""Verifica che il timestamp sia ISO 8601 valido e non nel futuro"""
try:
ts = datetime.fromisoformat(v)
if ts > datetime.now(timezone.utc):
raise ValueError("Timestamp nel futuro")
except ValueError as e:
raise ValueError(f"Timestamp non valido: {e}")
return v
@field_validator("ph")
@classmethod
def validate_ph_agronomico(cls, v: float) -> float:
"""pH fuori range agronomico (4.5-8.5) e anomalia"""
if v < 4.5 or v > 8.5:
log.warning(f"pH {v} fuori range agronomico tipico [4.5-8.5]")
return v
def has_critical_alert(self) -> bool:
"""Verifica se la lettura richiede un alert critico"""
return (
self.moisture_pct < 10.0 or
self.moisture_pct > 58.0 or
self.ph < 4.5 or
self.ph > 8.5 or
self.ec_ds_m > 4.0 or
self.battery_pct < 10
)
# ── Storage InfluxDB ──────────────────────────────────────────────────────────
class InfluxDBWriter:
"""Writer per time-series su InfluxDB 2.x"""
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
def write_soil_reading(self, reading: SoilReadingSchema):
"""Scrive una lettura suolo su InfluxDB con tags e fields ottimizzati"""
point = (
Point("soil_reading")
# Tags: usati per filtro e group-by (cardinalita limitata)
.tag("sensor_id", reading.sensor_id)
.tag("farm_id", reading.farm_id)
.tag("field_id", reading.field_id)
.tag("depth_cm", str(reading.depth_cm))
.tag("quality", reading.quality_flag)
# Fields: metriche numeriche
.field("moisture_pct", reading.moisture_pct)
.field("temperature_c", reading.temperature_c)
.field("ph", reading.ph)
.field("ec_ds_m", reading.ec_ds_m)
.field("battery_pct", float(reading.battery_pct))
.field("rssi_dbm", float(reading.rssi_dbm))
# Timestamp dalla lettura del sensore (non dall'arrivo)
.time(datetime.fromisoformat(reading.timestamp), WritePrecision.SECONDS)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
log.debug(f"Scritto su InfluxDB: {reading.sensor_id} @ {reading.timestamp}")
def close(self):
self.client.close()
# ── Bronze Layer su S3 ────────────────────────────────────────────────────────
class S3BronzeWriter:
"""Scrive dati grezzi su S3 (Bronze layer Medallion Architecture)"""
def __init__(self, bucket: str, region: str = "eu-south-1"):
self.s3 = boto3.client("s3", region_name=region)
self.bucket = bucket
def write_raw(self, raw_payload: str, topic: str, received_at: datetime):
"""
Salva il payload grezzo in formato NDJSON su S3.
Partitionamento per data: year/month/day/hour/
"""
prefix = received_at.strftime("year=%Y/month=%m/day=%d/hour=%H")
filename = f"{prefix}/{received_at.isoformat()}.json"
envelope = {
"topic": topic,
"received_at": received_at.isoformat(),
"payload": json.loads(raw_payload),
}
self.s3.put_object(
Bucket = self.bucket,
Key = filename,
Body = json.dumps(envelope).encode("utf-8"),
ContentType = "application/json",
)
log.debug(f"Bronze layer: scritto {filename}")
# ── Alert Manager ─────────────────────────────────────────────────────────────
class AlertManager:
"""Gestione alert critici con cooldown per evitare spam"""
def __init__(self):
self._last_alert: dict[str, datetime] = {}
self.cooldown_seconds = 300 # 5 minuti tra un alert e l'altro per sensor
def check_and_alert(self, reading: SoilReadingSchema):
if not reading.has_critical_alert():
return
sensor_key = reading.sensor_id
now = datetime.now(timezone.utc)
last = self._last_alert.get(sensor_key)
if last and (now - last).total_seconds() < self.cooldown_seconds:
return # In cooldown, skip
self._last_alert[sensor_key] = now
self._send_alert(reading)
def _send_alert(self, reading: SoilReadingSchema):
"""In produzione: invia SMS/email/push. Qui: log."""
alerts = []
if reading.moisture_pct < 10.0:
alerts.append(f"STRESS IDRICO: umidita {reading.moisture_pct}% sotto soglia critica 10%")
if reading.moisture_pct > 58.0:
alerts.append(f"SATURAZIONE: umidita {reading.moisture_pct}% sopra saturazione")
if reading.ph < 4.5:
alerts.append(f"pH CRITICO: {reading.ph} - suolo troppo acido")
if reading.ec_ds_m > 4.0:
alerts.append(f"SALINITA CRITICA: EC {reading.ec_ds_m} dS/m")
if reading.battery_pct < 10:
alerts.append(f"BATTERIA: {reading.battery_pct}% - sostituire")
for alert in alerts:
log.critical(f"ALERT [{reading.sensor_id}] {alert}")
# TODO: self.sms_client.send(...)
# TODO: self.email_client.send(...)
# ── Pipeline Consumer principale ──────────────────────────────────────────────
class AgriPipelineConsumer:
"""Consumer MQTT con validazione, storage e alerting integrati"""
def __init__(self):
self.influx = InfluxDBWriter(
url = "https://influxdb.azienda.it:8086",
token = "<INFLUX_TOKEN>",
org = "azienda-agricola",
bucket = "farm-sensors",
)
self.s3_bronze = S3BronzeWriter(bucket="farm-raw-data-bronze")
self.alert_mgr = AlertManager()
self.client = mqtt.Client(
client_id = "pipeline-consumer-001",
protocol = mqtt.MQTTv5,
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.stats = {"received": 0, "valid": 0, "errors": 0, "alerts": 0}
def _on_connect(self, client, userdata, flags, rc, props):
log.info("Consumer connesso al broker")
# Sottoscrive a TUTTI i sensori suolo di TUTTE le farm
client.subscribe("farm/+/+/soil", qos=1)
client.subscribe("farm/+/+/$status", qos=1)
log.info("Sottoscritto a: farm/+/+/soil e farm/+/+/$status")
def _on_message(self, client, userdata, message):
received_at = datetime.now(timezone.utc)
self.stats["received"] += 1
raw_payload = message.payload.decode("utf-8")
try:
# 1. Parse JSON
data = json.loads(raw_payload)
# 2. Salva Bronze layer (dato grezzo, prima di qualsiasi trasformazione)
self.s3_bronze.write_raw(raw_payload, message.topic, received_at)
# 3. Valida con Pydantic
reading = SoilReadingSchema(**data)
self.stats["valid"] += 1
# 4. Scrivi su InfluxDB (time-series)
self.influx.write_soil_reading(reading)
# 5. Check alert
self.alert_mgr.check_and_alert(reading)
if reading.has_critical_alert():
self.stats["alerts"] += 1
log.info(
f"Processed | {reading.sensor_id} | "
f"moisture={reading.moisture_pct}% | "
f"quality={reading.quality_flag}"
)
except json.JSONDecodeError as e:
self.stats["errors"] += 1
log.error(f"JSON non valido da {message.topic}: {e}")
except ValidationError as e:
self.stats["errors"] += 1
log.error(f"Validazione fallita per {message.topic}: {e}")
# Salva comunque su Bronze (dato anomalo ma registrato)
except Exception as e:
self.stats["errors"] += 1
log.exception(f"Errore imprevisto: {e}")
def run(self):
self.client.connect("emqx.azienda-agricola.it", 8883)
log.info("Pipeline consumer avviata. Ctrl+C per fermare.")
try:
self.client.loop_forever()
except KeyboardInterrupt:
pass
finally:
self.influx.close()
log.info(
f"Stats finali: {self.stats}"
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
consumer = AgriPipelineConsumer()
consumer.run()
Architektura medalionów dla danych rolniczych: brąz, srebro, złoto
Architektura medalionu (wprowadzona przez Databricks, ale obecnie de facto standard w inżynierii danych) organizuje dane w trzy progresywne warstwy jakości. Zastosowanie do danych rolniczych IoT rozwiązuje problem prawdziwe problemy: czujniki wysyłające wartości odstające, nieprawidłowe znaczniki czasu dla dryftu RTC, odczyty duplikowane na potrzeby retransmisji MQTT z QoS 1 i wymagają różnych agregacji na panelu kontrolnym modele uczenia maszynowego w czasie rzeczywistym a modele długoterminowe.
Architektura medalionowa dla rolniczego Internetu Rzeczy
| Warstwy | Format | Treść | Operacje | Zatrzymanie |
|---|---|---|---|---|
| Brąz (surowy) | JSON / Parkiet | Surowy, niezmienny ładunek MQTT | Żadnej transformacji, po prostu przechowuj | 5 lat (przepisowe) |
| Srebro (wyczyszczone) | Jezioro Delta / Góry Lodowe | Znormalizowany wzór, usunięte wartości odstające | Deduplikacja, filtr wartości odstających, rzutowanie typów | 3 lata |
| Złoto (analityka) | Parkiet | Agregacje godzinowe/dzienne, funkcje ML | Agregacja, łączenie z pogodą/satelitą | 10 lat |
# medallion_pipeline.py
# Pipeline Medallion per dati agricoli IoT con PySpark / pandas
# In produzione: usa Databricks, AWS Glue, o dbt su Spark
# Qui: versione pandas per sviluppo locale e testing
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import json
import hashlib
DATA_ROOT = Path("/data/farm")
# ── BRONZE LAYER ──────────────────────────────────────────────────────────────
def load_bronze(date: str) -> pd.DataFrame:
"""
Carica dati grezzi dal Bronze layer (JSON NDJSON).
Nessuna trasformazione: solo lettura e schema enforcement minimo.
"""
bronze_path = DATA_ROOT / "bronze" / date
records = []
for f in bronze_path.glob("**/*.json"):
with open(f) as fp:
envelope = json.load(fp)
records.append(envelope)
if not records:
return pd.DataFrame()
# DataFrame con schema minimo garantito
df = pd.json_normalize(records, sep="_")
df["_bronze_loaded_at"] = datetime.utcnow().isoformat()
df["_source_file"] = [str(f) for f in bronze_path.glob("**/*.json")]
return df
# ── SILVER LAYER ──────────────────────────────────────────────────────────────
def transform_bronze_to_silver(df_bronze: pd.DataFrame) -> pd.DataFrame:
"""
Trasformazioni Bronze → Silver:
1. Parse e normalizza timestamp
2. Rimuovi duplicati (QoS 1 può consegnare più volte)
3. Filtra outlier fisicamente impossibili
4. Cast tipi corretti
5. Aggiungi colonne derivate
"""
if df_bronze.empty:
return pd.DataFrame()
df = df_bronze.copy()
# 1. Parse timestamp
df["ts"] = pd.to_datetime(df["payload_timestamp"], utc=True, errors="coerce")
df = df.dropna(subset=["ts"])
# 2. Deduplica: stesso sensor_id + timestamp = stesso dato
df["_dedup_key"] = df.apply(
lambda r: hashlib.md5(
f"{r.get('payload_sensor_id', '')}{r.get('payload_timestamp', '')}".encode()
).hexdigest(),
axis=1
)
df = df.drop_duplicates(subset=["_dedup_key"], keep="first")
# 3. Filtro outlier fisici
df = df[df["payload_moisture_pct"].between(0, 100)]
df = df[df["payload_temperature_c"].between(-40, 80)]
df = df[df["payload_ph"].between(0, 14)]
df = df[df["payload_ec_ds_m"].between(0, 20)]
# 4. Flag outlier agronomici (mantieni, ma etichetta)
df["is_agronomic_outlier"] = (
~df["payload_moisture_pct"].between(5, 60) |
~df["payload_ph"].between(4.5, 8.5) |
(df["payload_ec_ds_m"] > 4.0)
)
# 5. Colonne derivate
df["date"] = df["ts"].dt.date
df["hour"] = df["ts"].dt.hour
df["month"] = df["ts"].dt.month
# 6. Rename colonne per schema pulito
df = df.rename(columns={
"payload_sensor_id": "sensor_id",
"payload_farm_id": "farm_id",
"payload_field_id": "field_id",
"payload_moisture_pct": "moisture_pct",
"payload_temperature_c": "temperature_c",
"payload_ph": "ph",
"payload_ec_ds_m": "ec_ds_m",
"payload_battery_pct": "battery_pct",
"payload_quality_flag": "quality_flag",
})
# Seleziona solo colonne rilevanti
cols = [
"sensor_id", "farm_id", "field_id", "ts", "date", "hour", "month",
"moisture_pct", "temperature_c", "ph", "ec_ds_m", "battery_pct",
"quality_flag", "is_agronomic_outlier"
]
df = df[[c for c in cols if c in df.columns]]
df["_silver_processed_at"] = datetime.utcnow().isoformat()
return df
# ── GOLD LAYER ────────────────────────────────────────────────────────────────
def transform_silver_to_gold(df_silver: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""
Trasformazioni Silver → Gold:
Produce più tabelle di aggregazione per usi diversi.
"""
if df_silver.empty:
return {}
# Filtra outlier agronomici per analytics
df = df_silver[~df_silver["is_agronomic_outlier"]].copy()
gold_tables = {}
# ─── Aggregazione oraria per dashboard ───
hourly = df.groupby(["sensor_id", "farm_id", "field_id", "date", "hour"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_min = ("moisture_pct", "min"),
moisture_max = ("moisture_pct", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
readings_n = ("moisture_pct", "count"),
).reset_index()
hourly["moisture_avg"] = hourly["moisture_avg"].round(2)
gold_tables["hourly_aggregations"] = hourly
# ─── Aggregazione giornaliera per reporting agronomico ───
daily = df.groupby(["sensor_id", "farm_id", "field_id", "date"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_std = ("moisture_pct", "std"),
temp_min = ("temperature_c", "min"),
temp_max = ("temperature_c", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
stress_hours = ("moisture_pct", lambda x: (x < 20).sum()),
readings_n = ("moisture_pct", "count"),
).reset_index()
gold_tables["daily_agronomic"] = daily
# ─── Feature engineering per ML (predizione irrigazione) ───
ml_features = df.copy()
ml_features["moisture_lag_1h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(2)
ml_features["moisture_lag_3h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(6)
ml_features["moisture_trend"] = (
ml_features["moisture_pct"] - ml_features["moisture_lag_3h"]
)
ml_features["needs_irrigation"] = (ml_features["moisture_pct"] < 25.0).astype(int)
gold_tables["ml_irrigation_features"] = ml_features.dropna(
subset=["moisture_lag_1h", "moisture_lag_3h"]
)
return gold_tables
def run_medallion_pipeline(date: str):
"""Esegue la pipeline Medallion per una data specifica"""
log_prefix = f"[Medallion {date}]"
print(f"{log_prefix} Caricamento Bronze...")
bronze = load_bronze(date)
print(f"{log_prefix} Bronze: {len(bronze)} record")
print(f"{log_prefix} Trasformazione Silver...")
silver = transform_bronze_to_silver(bronze)
print(f"{log_prefix} Silver: {len(silver)} record (dopo dedup e filtro outlier)")
# Salva Silver
silver_path = DATA_ROOT / "silver" / date / "soil_readings.parquet"
silver_path.parent.mkdir(parents=True, exist_ok=True)
silver.to_parquet(silver_path, index=False)
print(f"{log_prefix} Trasformazione Gold...")
gold_tables = transform_silver_to_gold(silver)
for table_name, df in gold_tables.items():
gold_path = DATA_ROOT / "gold" / date / f"{table_name}.parquet"
gold_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(gold_path, index=False)
print(f"{log_prefix} Gold {table_name}: {len(df)} record")
print(f"{log_prefix} Pipeline completata.")
if __name__ == "__main__":
today = datetime.utcnow().strftime("%Y-%m-%d")
run_medallion_pipeline(today)
Integracja z InfluxDB i Apache Kafka
InfluxDB dla rolniczych szeregów czasowych
InfluxDB to najbardziej odpowiednia baza danych dla szeregów czasowych IoT o wysokiej częstotliwości. W przeciwieństwie do PostgreSQL
lub MySQL i zoptymalizowany pod kątem masowych zapisów w sekwencji czasowej, automatycznej kompresji danych
dane historyczne (próbkowanie w dół) i zapytania dotyczące przedziałów czasowych z natywnymi funkcjami agregującymi
(mean(), max(), moving_average()).
W przypadku farmy z 50 czujnikami wysyłającymi dane co 30 sekund radzi sobie z tym InfluxDB wygodnie 100 zapisów na sekundę na sprzęcie konsumenckim, z polityką przechowywania automatyczne: dane surowe z 30 dni, agregacje godzinowe przez 1 rok, agregacje dzienne przez 10 lat. Wszystko to w ilości kilku MB pamięci rocznie na czujnik.
# influxdb_queries.py
# Query Flux per analytics agricole su InfluxDB 2.x
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
client = InfluxDBClient(
url = "https://influxdb.azienda.it:8086",
token = "<TOKEN>",
org = "azienda-agricola",
)
query_api = client.query_api()
# ─── Query 1: Media umidita ultime 24 ore per campo ───────────────────────────
QUERY_MOISTURE_24H = """
from(bucket: "farm-sensors")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r.field_id == "campo-nord")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> yield(name: "hourly_moisture")
"""
# ─── Query 2: Alert stress idrico (moisture < 20% nelle ultime 6h) ────────────
QUERY_DROUGHT_ALERT = """
from(bucket: "farm-sensors")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r._value < 20.0)
|> group(columns: ["sensor_id", "field_id"])
|> count()
|> filter(fn: (r) => r._value > 3)
|> yield(name: "drought_sensors")
"""
# ─── Query 3: Trend temperatura settimanale con banda statistica ──────────────
QUERY_TEMP_WEEKLY = """
from(bucket: "farm-sensors")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "temperature_c")
|> aggregateWindow(every: 6h, fn: mean)
|> movingAverage(n: 4)
|> yield(name: "temp_trend_7d")
"""
# ─── Query 4: Stato batterie sensori (per manutenzione preventiva) ────────────
QUERY_BATTERY_STATUS = """
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
|> filter(fn: (r) => r._value < 20)
|> sort(columns: ["_value"])
|> yield(name: "low_battery_sensors")
"""
def run_analytics():
print("=== Dashboard Analytics Agricole ===")
# Esegui query umidita
tables = query_api.query(QUERY_MOISTURE_24H)
for table in tables:
for record in table.records:
print(f" {record.get_time()} | "
f"sensor={record.values.get('sensor_id', 'N/A')} | "
f"moisture={record.get_value():.1f}%")
# Check alert stress idrico
drought_tables = query_api.query(QUERY_DROUGHT_ALERT)
for table in drought_tables:
for record in table.records:
print(f" ALERT STRESS IDRICO: sensor {record.values.get('sensor_id')} "
f"in field {record.values.get('field_id')}")
if __name__ == "__main__":
run_analytics()
Apache Kafka do przetwarzania strumieniowego na dużą skalę
Gdy liczba czujników przekracza setki lub gdy wymagane są opóźnienia poniżej sekundy, Sam MQTT nie wystarczy jako system dystrybucji. Apache Kafka pojawia się jako magistrala wiadomości przedsiębiorstwa pomiędzy brokerem MQTT a warstwą przetwarzania. Standardowy wzór to: Broker MQTT → Kafka Connect (łącznik źródłowy MQTT) → Tematy Kafki → Grupy konsumenckie.
Kafka przynosi w tym kontekście kluczowe korzyści: ponowne odtwarzanie komunikatu (jeśli konsument ma błąd, wszystko jest ponownie przetwarzane), podział według pola/obszaru geograficznego, wielu niezależnych konsumentów (Programy piszące InfluxDB, silniki alertów, wnioskowanie ML, moduły zapisujące jezioro danych odczytują te same dane bez przeszkadzać) i dokładnie jednorazowe gwarancje w ramach transakcji Kafka.
# kafka_consumer.py
# Consumer Kafka per dati sensori agricoli
# Dipendenze: pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import logging
log = logging.getLogger(__name__)
KAFKA_CONFIG = {
"bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"group.id": "farm-analytics-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Commit manuale per exactly-once
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "farm-consumer",
"sasl.password": "<KAFKA_PASSWORD>",
}
TOPICS = ["farm.soil.readings", "farm.weather.readings"]
def process_soil_message(data: dict) -> bool:
"""Elabora un messaggio sensore suolo"""
sensor_id = data.get("sensor_id", "unknown")
moisture = data.get("moisture_pct", 0)
# Routing logica: stress idrico → alert immediato
if moisture < 15.0:
log.warning(f"STRESS IDRICO critico: sensor {sensor_id} = {moisture}%")
# In produzione: pubblica su topic alert
return True
log.info(f"OK | sensor={sensor_id} | moisture={moisture}%")
return True
def run_kafka_consumer():
consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe(TOPICS)
log.info(f"Consumer Kafka avviato su topics: {TOPICS}")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
log.debug(f"Raggiunto EOF: {msg.topic()}[{msg.partition()}]")
else:
raise KafkaException(msg.error())
continue
# Processa messaggio
try:
data = json.loads(msg.value().decode("utf-8"))
success = process_soil_message(data)
if success:
consumer.commit(message=msg) # Commit solo se processato correttamente
except json.JSONDecodeError as e:
log.error(f"JSON non valido: {e}")
consumer.commit(message=msg) # Commit anche errori per non bloccare
except KeyboardInterrupt:
log.info("Consumer fermato")
finally:
consumer.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_kafka_consumer()
Pulpit nawigacyjny i system ostrzegania Grafana
Grafana to standardowe narzędzie do wizualizacji danych IoT w czasie rzeczywistym. Integruje się natywnie z InfluxDB 2.x poprzez źródło danych i umożliwia budowanie interaktywnych dashboardów z mapami geograficznymi, wykresy szeregów czasowych i panele stanu bez konieczności pisania kodu frontendowego.
W przypadku firmy rolniczej standardowy pulpit nawigacyjny zawiera:
- Mapa pola: Wtyczka Geomap z nakładkami czujników, kolorami stresu wodnego
- Szereg czasowy wilgotności: Wykres 24-godzinny z kolorowymi progami (czerwony < 15%, żółty < 25%)
- Wskaźnik baterii: Stan baterii wszystkich czujników do planowania konserwacji
- Mapa cieplna pH: Mapa ciepła pH na boisko w celu podjęcia decyzji o kopnięciu
- Prognoza podlewania: Wyjście modelu ML w ciągu następnych 48 godzin
# grafana_alert_rules.yaml
# Regole di alerting Grafana per agricoltura di precisione
apiVersion: 1
groups:
- name: farm-critical-alerts
interval: 1m
rules:
# Alert stress idrico critico
- uid: drought-critical
title: "Stress Idrico Critico"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15.0]
type: lt
query: { params: ["A"] }
noDataState: Alerting
execErrState: Alerting
for: 10m
annotations:
summary: "Sensore {{ $labels.sensor_id }} in stress idrico"
description: "Umidita {{ $values.A }}% sotto soglia critica 15%"
labels:
severity: critical
team: agronomist
# Alert batteria scarica
- uid: battery-low
title: "Batteria Sensore in Esaurimento"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15]
type: lt
query: { params: ["A"] }
for: 5m
annotations:
summary: "Batteria bassa su {{ $labels.sensor_id }}"
labels:
severity: warning
team: maintenance
Anti-Pattern: Uniwersalne stałe progi
Częstym błędem we wczesnych wdrożeniach jest stosowanie dla wszystkich identycznych progów wilgotności uprawy i wszystkie rodzaje gleby. Krytyczny próg wilgotności wynoszący 20% jest prawidłowy dla gleby gleba gliniasta z pomidorami, ale całkowicie nieodpowiednia dla gleby piaszczystej z winoroślą lub dla a szkółka ogrodnicza. Progi muszą być konfigurowalne dla każdego czujnika, kultury i fazy fenologiczny. Implementuje dynamiczny system konfiguracji progów, który nie jest zakodowany na stałe w pliku kod ostrzegawczy.
Wdrożenie, bezpieczeństwo i skalowalność
Docker Compose dla rozwoju lokalnego
Aby opracować i przetestować cały stos lokalnie, bez zależności od chmury, a
docker-compose.yml w skład którego wchodzą brokerzy MQTT, InfluxDB i Grafana:
# docker-compose.yml
# Stack IoT agricolo completo per sviluppo locale
version: "3.9"
services:
# Broker MQTT - Eclipse Mosquitto
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT plain (solo sviluppo)
- "8883:8883" # MQTT TLS
- "9001:9001" # WebSocket
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./certs:/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
restart: unless-stopped
# Time-series database
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: changeme123!
DOCKER_INFLUXDB_INIT_ORG: azienda-agricola
DOCKER_INFLUXDB_INIT_BUCKET: farm-sensors
DOCKER_INFLUXDB_INIT_RETENTION: 30d
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-token
volumes:
- influxdb_data:/var/lib/influxdb2
restart: unless-stopped
# Dashboard e alerting
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
GF_INSTALL_PLUGINS: grafana-worldmap-panel,grafana-clock-panel
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
# Pipeline consumer (il nostro codice Python)
pipeline-consumer:
build:
context: .
dockerfile: Dockerfile.consumer
environment:
MQTT_HOST: mosquitto
MQTT_PORT: "1883"
INFLUX_URL: http://influxdb:8086
INFLUX_TOKEN: my-super-secret-token
INFLUX_ORG: azienda-agricola
INFLUX_BUCKET: farm-sensors
depends_on:
- mosquitto
- influxdb
restart: unless-stopped
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
grafana_data:
Najlepsze praktyki w zakresie bezpieczeństwa w rolniczym IoT
Bezpieczeństwo systemów IoT w rolnictwie jest często niedoceniane. Uszkodzony czujnik może ulec zmianie odczytów i powodować nieprawidłowe nawadnianie, marnotrawstwo lub uszkodzenie upraw. Absolutne minimum:
Lista kontrolna bezpieczeństwa IoT w rolnictwie
- Wzajemny TLS (mTLS): Każde urządzenie posiada unikalny certyfikat klienta. Broker uwierzytelnia urządzenia za pomocą certyfikatu, a nie tylko nazwy użytkownika/hasła.
-
Tematyczne listy ACL: Każdy czujnik może publikować TYLKO we własnych tematach
(
farm/campo-nord/sensor-001/#) i nie czytaj tematów z innych sensorów. - Podpisywanie oprogramowania: Aktualizacje OTA (over-the-air) muszą być podpisane kryptograficznie, aby zapobiec złośliwemu oprogramowaniu sprzętowemu.
- Rotacja danych uwierzytelniających: Tokeny i certyfikaty z datą ważności 1 rok, rotacja automatycznie poprzez rotację certyfikatu ACME lub AWS IoT.
- Segmentacja sieci: Urządzenia IoT w dedykowanej sieci VLAN, odizolowane od sieci korporacyjny. Dostęp do brokera wyłącznie z poziomu bramki, nigdy bezpośrednio z Internetu.
- Ograniczenie szybkości: Broker ogranicza liczbę komunikatów na sekundę urządzenie zabezpieczające przed przypadkowym zalaniem lub atakami.
Rolnictwo precyzyjne we Włoszech: możliwości i zachęty 2025
Włochy zajmują wyjątkową pozycję w europejskiej panoramie technologii agrotechnicznej: mają sektor spożywczy najlepszych na świecie (DOP, IGP, DOC), rozdrobniona struktura produkcyjna (przedsiębiorstwa średniej wielkości 11 hektarów w porównaniu ze średnią UE wynoszącą 33), ale z bardzo dużą wiedzą agronomiczną i jest pierwszym na świecie Europa na rzecz rolnej wartości dodanej wynoszącej 42,4 miliarda euro w 2024 r. (+9% rok do roku).
Wyzwanie polega na udostępnieniu technologii IoT i sztucznej inteligencji także MŚP z branży rolniczej, które stanowią 95% tkanki produkcyjnej. Oto narzędzia finansowania dostępne w 2025 roku:
Zachęty w zakresie AgriTech i IoT we Włoszech (2025)
| Mierzyć | Kwota/Korzyść | Beneficjenci | Wygaśnięcie |
|---|---|---|---|
| Plan przejścia 5.0 | 35-45% ulgi podatkowej na inwestycje cyfrowe + energetyczne | Wszystkie firmy (w tym rolne) | 31.12.2025 |
| Misja PNRR 2.3 – Mechanizacja | Bezzwrotne składki o wartości 400 mln EUR | Przedsiębiorstwa rolnicze dla maszyn 4.0 | Wyprzedane (oczekiwane nowe połączenia) |
| Ulga podatkowa 4.0 (dawniej Przemysł 4.0) | 20% na technologiczne dobra inwestycyjne | Spółki z branży rolniczej ATECO | Przedłużony 2025 |
| INAIL ISI Zawiadomienie dotyczące rolnictwa | Do 65% bezzwrotnej | Przedsiębiorstwa i spółdzielnie rolnicze | Roczne (otwarcie wiosenne) |
| PSP (Plan strategiczny WPR na lata 2023-2027) | Działania w ramach ekoprogramu z premiami dla rolnictwa precyzyjnego | Firmy z zarejestrowanymi UAA | Roczne (stosowanie WPR) |
| Wezwanie agrowoltaiczne PNRR | 1,5 MLD EURO na systemy fotowoltaiczne zintegrowane w rolnictwie | Przedsiębiorstwa rolnicze | 2025-2026 |
Dla gospodarstwa inwestującego w kompletny system IoT (czujniki, bramki, oprogramowanie, łączność) o całkowitym koszcie ok 50 000 EUR, Plan Przejścia 5.0 może zakryć do 22 500 EUR (45% kosztów) jako ulga podatkowa, zwrot inwestycji w ciągu 2-3 lat dzięki oszczędnościom na wodzie i nawozach i siła robocza.
Studium przypadku: Masseria Pugliese 500 hektarów – drzewo oliwne i pszenica
Prawdziwy przypadek (dane zagregowane i anonimowe) apulijskiej firmy rolniczej, która to zrobiła wdrożył rurociąg IoT ze 120 czujnikami wilgotności gleby, 8 stacjami pogodowymi i a automatyczny system nawadniania:
- Inwestycja początkowa: 85 000 EUR (sprzęt + oprogramowanie + instalacja)
- Wkłady PNRR i Przejścia 4.0: 38 000 EUR (44,7%)
- Inwestycja netto: 47 000 EUR
- Oszczędzanie wody w roku 1: -42% (-18 000 EUR/rok na rachunku za nawadnianie)
- Oszczędności nawozów: -22% (-8500 EUR/rok)
- Wzrost plonu ziarna: +11% (+15 000 EUR/rok obrotu)
- Okres zwrotu: 2,3 roku
- 5-letni zwrot z inwestycji: 342%
Najlepsze praktyki i antywzorce, których należy unikać
Skonsolidowane najlepsze praktyki
-
Diagram ewolucji z wersjonowaniem: Zawsze dołączaj pole
schema_versionw ładunkach MQTT. Dodając nowe pola, zwiększ wersję. Konsument zarządza różne wersje bez zakłócania zmian. - Sygnatura czasowa z urządzenia, a nie serwera: Znacznik czasu musi być taki sam czujnika w momencie odczytu, a nie w momencie dotarcia do brokera. Opóźnienie sieci może być zmienna i fałszować szeregi czasowe.
- Domyślnie QoS 1, QoS 2 tylko dla poleceń krytycznych: QoS 2 czterokrotnie ruch sieciowy. Używaj go tylko do poleceń wykonawczych (otwieranie zaworów, uruchamianie pomp) gdzie powielanie jest niedopuszczalne.
- Obowiązkowy bufor lokalny: Każda brama musi mieć lokalny bufor (pliki SQLite, NDJSON), aby kontynuować gromadzenie danych podczas rozłączeń sieci. W rolnictwie połączenia są zawodne.
- Okresowa kalibracja czujnika: Wyprowadzono czujniki wilgotności gleby w miarę upływu czasu (zwykle 2–5% rocznie). Zaplanuj półroczne kalibracje i wykonaj je automatyczne wykrywanie dryfu poprzez porównanie sąsiadujących czujników.
- Polityka przechowywania InfluxDB: Skonfiguruj automatyczne próbkowanie w dół: surowe dane 30 dni, agregacje godzinowe 1 rok, nieograniczona agregacja dzienna. Zapisz przechowywania danych bez utraty szczegółowości potrzebnej do długoterminowych analiz.
Krytyczne anty-wzorce, których należy unikać
-
Płaski temat MQTT (bez hierarchii): Tematy takie jak
sensor_001_moisturezamiastfarm/campo-nord/sensor-001/soil/moistureuniemożliwiają korzystanie symbol wieloznaczny dla zagregowanych subskrypcji i skalowania do setek czujników. - Sondowanie zamiast oparte na zdarzeniach: Nie wysyłaj zapytań do brokera MQTT co N sekund na „zapytanie” o nowe dane. MQTT i push: wydawca wysyła, gdy są dane, abonent otrzymuje natychmiast. Odpytywanie neguje wszystkie zalety protokołu.
- Brak sprawdzania poprawności schematu: Zaakceptuj dowolny JSON od brokera bez weryfikacja schematu. Wadliwy czujnik, który wysyła wartości zerowe, ciągi znaków zamiast liczb lub wartości poza zakresem (temperatura -999) zanieczyszczają całe jezioro danych bez sprawdzania poprawności.
- Bezpośrednie pisanie od brązu do złota: Pomiń warstwę Silver i pisz surowe dane bezpośrednio do agregacji. Kiedy odkryjesz błąd w logice transformacji, trzeba to wszystko przetworzyć od nowa, bo surowych danych nie było zachowane.
- Broker publiczny bez uwierzytelnienia: Skorzystaj z publicznych brokerów MQTT (test.mosquitto.org) lub prywatni brokerzy bez TLS/uwierzytelniania. Dane rolnicze są wrażliwe aktywa firmy; konkurencja może subskrybować Twoje tematy i czytać sytuację produkcyjną Twojej firmy w czasie rzeczywistym.
- Pojedynczy punkt awarii u brokera: Pojedynczy broker MQTT bez klastrowanie lub przełączanie awaryjne. EMQX i HiveMQ obsługują natywne klastrowanie; Komar wymaga rozwiązania zewnętrzne dla HA. Awaria brokera blokuje gromadzenie wszystkich danych.
Skalowalność: od 10 do 10 000 czujników
Opisana architektura skaluje się liniowo do kilku tysięcy czujników bez modyfikacji architektoniczny. Oto typowe liczby dla konfiguracji sprzętowej brokera MQTT:
Możliwość konfiguracji brokera MQTT
| Konfiguracja | Pośrednik | Czujniki Maks | Wiadomości/sek | Koszt miesięczny |
|---|---|---|---|---|
| Rozwój/testowanie | Mosquitto na Raspberry Pi 4 | 100 | 500 | 0 (posiadany sprzęt) |
| MŚP (50-500 ha) | EMQX na VPS 4 vCPU/8 GB | 1000 | 5000 | 40-80 EUR |
| Duża firma (500+ ha) | Węzły klastra EMQX 3 | 10 000 | 50 000 | 200-400 EUR |
| Spółdzielnia/Okręg | HiveMQ Enterprise / AWS IoT Core | 100 000+ | Nieograniczony | Płać za użycie |
AWS IoT Core i Azure IoT Hub to natywne rozwiązania chmurowe dla skal krajowych lub obejmujących wiele przedsiębiorstw: automatycznie zarządzają skalowalnością, oferują SLA na poziomie 99,99% i integrują się natywnie z odpowiednimi ekosystemami (AWS Timestream, Lambda, S3 dla AWS; Azure Data Explorer, Stream Analytics, Data Lake dla platformy Azure). Koszt wynosi zazwyczaj 1-5 dolarów za milion wiadomości, z hojnym bezpłatnym poziomem na pierwsze testy.
Wnioski i dalsze kroki
Zbudowaliśmy cały rurociąg IoT dla rolnictwa precyzyjnego: od czujników w terenie do ustrukturyzowanego jeziora danych, poprzez MQTT z odpowiednim QoS, walidacją Pydantic, Magazyn InfluxDB dla szeregów czasowych i architektura Medalion dla jeziora danych. Kod Pythona pokazany jest gotowy do produkcji i obejmuje podstawowe wzorce występujące w rzeczywistych systemach.
Najważniejsze punkty, które należy zabrać ze sobą:
- MQTT to właściwy protokół dla rolniczego IoT: lekki, asynchroniczny, z obsługą niestabilnej łączności
- LoRaWAN na otwartej przestrzeni, WiFi/RS-485 w szklarniach: nie ma jednego, uniwersalnego protokołu bezprzewodowego
- Walidacja schematu (Pydantic) nie jest opcjonalna: czujniki wysyłają błędne dane częściej niż myślisz
- Architektura medalionu (brązowy/srebrny/złoty) gwarantuje możliwość ponownego przetwarzania i progresywną jakość danych
- InfluxDB + Grafana to minimalny stos do monitorowania w czasie rzeczywistym bez kosztów licencji
- Zachęty PNRR i Przejście 5.0 pokrywają do 45% inwestycji włoskich przedsiębiorstw rolniczych
W kolejnym artykule z serii FoodTech omówimy, jak zastosować szablony Przewaga uczenia maszynowego do monitorowania upraw bezpośrednio na bramce, redukując opóźnienia z minut do milisekund i umożliwiając reakcję w czasie rzeczywistym na krytyczne zdarzenia jak nagłe przymrozki lub ataki grzybów w początkowej fazie.
Zasoby i spostrzeżenia
- Oficjalna dokumentacja paho-mqtt: http://eclipse.dev/paho
- Broker EMQX (open source, gotowy do pracy w klastrze): emqx.io
- Dokumentacja InfluxDB 2.x: docs.influxdata.com
- Plan przejścia 5.0 MIMIT: mimit.gov.it
- ISTAT Rolnictwo precyzyjne 2024: ist.it
- Architektura medalionu datakostek: databricks.com
Powiązane artykuły z innych serii
- MLOps: Jak wdrożyć modele ML prognozujące plony w środowisku produkcyjnym
- Biznes danych i sztucznej inteligencji - sztuczna inteligencja w produkcji: Wzorce IoT stosowane w przemyśle za pomocą Kafki i OPC-UA
- Inżynieria sztucznej inteligencji: RAG w sprawie dokumentacji agronomicznej dla wirtualnych asystentów agronomów







