Python ve MQTT ile Hassas Tarım için IoT Pipeline
Puglia'da bir buğday tarlası. Üç bin hektar, farklı derinliklere gömülü kırk sensör, veriler her otuz saniyede bir gelen sıcaklık, toprak nemi, pH ve elektrik iletkenliği. olmadan yapılandırılmış bir veri hattı, bu sadece dijital gürültü. Doğru boru hattıyla motor haline gelir Su tüketimini %40 azaltan, verimi %15 artıran ve maliyetleri azaltan kararların sayısı gübrelerin dörtte biri oranında.
Hassas tarım bir gelecek vaadi değil: 2025 yılında geçerli olacak endüstriyel bir gerçekliktir 14,77 milyar dolar küresel olarak piyasayı ileriye taşıyacak tahminlerle 2030'a kadar 26,86 milyar %12,7'lik bir Bileşik Büyüme Oranı ile. İtalya'da şirketlerin %28,5'i Tarımda hali hazırda ISTAT 2024 verilerine göre hassas tarım teknikleri kullanılıyor; 100 hektarın üzerinde UAA alanına sahip şirketlerde %41,1. İtalyan tarım sektörü değer üretti tarafından eklendi 2024'te 42,4 milyar euroİtalya'nın Avrupa'da birinci olduğunu teyit ederek, ve dijitalleşme bu performansın ana itici gücüdür.
Ancak tarladaki sensör ile doğru tarımsal karar arasında karmaşık bir teknik yol vardır: düşük güçlü kablosuz protokoller, MQTT komisyoncusu, şema doğrulama, zenginleştirme hattı, veri gölünde madalyon mimarisi, gerçek zamanlı kontrol paneli ve uyarı sistemi. Bu makale çalışan Python kodu, gerçek mimariler ve en iyi uygulamalarla bu zincirin her adımını kapsar üretimde doğrulanmıştır.
Bu Makalede Neler Öğreneceksiniz?
- Hassas tarım için uçtan uca IoT sisteminin mimarisi
- Tarımsal sensör türleri ve kablosuz protokoller: MQTT, LoRaWAN, Zigbee karşılaştırıldı
- MQTT'nin ayrıntılı incelemesi: QoS 0/1/2, saklanan mesajlar, son istek, konu tasarımı
- Python uygulamasını paho-mqtt ile tamamlayın: sensör yayıncısı ve tüketici hattı
- Pydantic ile IoT verilerinin doğrulanması ve şema uygulaması
- Zaman serileri için InfluxDB ve akış işleme için Apache Kafka ile entegrasyon
- Tarımsal veriler için madalyon mimarisi (Bronz/Gümüş/Altın)
- Grafana kontrol paneli ve kritik eşik uyarı sistemi
- İtalya bağlamı: CAP, PNRR Geçiş 5.0, AgriTech 2025 teşvikleri
FoodTech Serisi - Tüm Makaleler
| # | Öğe | Seviye | Durum |
|---|---|---|---|
| 1 | Hassas Tarım için IoT Boru Hattı (şu anda buradasınız) | Gelişmiş | Akım |
| 2 | Mahsul İzleme için ML Edge: Tarlalarda Bilgisayarlı Görme | Gelişmiş | Yakında gelecek |
| 3 | Uydu API ve Bitki Örtüsü Endeksleri: Python ve Sentinel-2 ile NDVI | Orta seviye | Yakında gelecek |
| 4 | Gıdada Blockchain izlenebilirliği: Tarladan süpermarkete | Orta seviye | Yakında gelecek |
| 5 | Gıda Endüstrisinde Kalite Kontrol için Bilgisayarlı Görme | Gelişmiş | Yakında gelecek |
| 6 | FSMA ve Dijital Uyumluluk: Düzenleyici Süreçlerin Otomasyonu | Orta seviye | Yakında gelecek |
| 7 | Dikey Tarım: IoT ve ML ile Çevresel Kontrol | Gelişmiş | Yakında gelecek |
| 8 | Prophet ve LightGBM ile Gıda Perakendesinde Talep Tahmini | Orta seviye | Yakında gelecek |
| 9 | Çiftlik Zekası Kontrol Paneli: Grafana ile Gerçek Zamanlı Analiz | Orta seviye | Yakında gelecek |
| 10 | Tedarik Zinciri Gıda Optimizasyonu: Atıkların Azaltılması için ML | Orta seviye | Yakında gelecek |
2025'te AgriTech Pazarı: Rakamlar ve Trendler
Sadece birkaç yıl içinde hassas tarım, niş bir teknolojiden stratejik bir itici güç haline geldi. Birincil sektörün rekabet gücü. Rakamlar bunu açıkça doğruluyor: küresel pazar hassas tarım uygulanır 2025'te 14,77 milyar dolar ve büyüyecek 2030 yılına kadar 26,86 milyar. Ancak AgriTech'in genel resmi daha da geniştir: segment Yönetim yazılımı, tarımsal dronlar, robot teknolojisi ve dijital biyo-girdileri içeren genişletilmiş Çeşitli araştırma kaynaklarına göre 2025'te 30 milyar, 2031'e kadar ise %16-23'lük bir CAGR bekleniyor.
İtalya'da 2024 yılı bir dönüm noktası oldu: ISTAT'a göre İtalyan tarımı, 42,4 milyar euro (2023'e kıyasla +%9) ile katma değer açısından Avrupa'da birinci sırada yer alıyor. Tarımsal üretim miktar olarak %1,4, katma değer ise %3,5 arttı. Aynı zamanda, İtalyan tarım şirketlerinin %28,5'i halihazırda hassas teknikleri kullanıyor, Kuzey-Doğu (%33) ve Kuzey-Batı (%32,1) bölgelerinde ve büyük operatörlerde daha fazla yoğunlaşma görülmektedir. (UAA'sı 100 hektarın üzerinde olan şirketlerde %41,1).
2025'te Hassas Tarım Teknolojilerinin Etkinleştirilmesi
| Teknoloji | Ana Uygulama | Evlat Edinme İtalya | Ortalama yatırım getirisi |
|---|---|---|---|
| IoT toprak sensörleri | Değişken sulama, gübreleme | Alta (Kuzey İtalya) | Girdi maliyetlerinde %15-25 azalma |
| Tarımsal dronlar | Haritalama, tedaviler, yaprak analizi | Ortalama | Pestisitlerden %30-40 tasarruf |
| Uydu görüntüleri | NDVI, su stresi, öngörülen verimler | Orta-yüksek | %5-10 verim optimizasyonu |
| IoT hava durumu istasyonları | Hastalık tahmin modelleri, sulama | Yüksek | Tedavilerde %10-20 azalma |
| Değişken Oran Teknolojisi | Ekim, değişken gübreleme | Düşük-orta | %8-15 girdi tasarrufu |
| Saha verilerinde makine öğrenimi | Verim tahmini, tarımsal optimizasyon | Düşük | +%10-20 verim, -%15 girdi |
PNRR, hızlanmada belirleyici bir rol oynadı: Misyon 2 "Yeşil Devrim ve Ekolojik Geçiş", araç filosunun modernizasyonu için 400 milyon euro ayırdı Tarım 4.0 teknolojilerine doğru. Toplam bütçesi 5.0 olan Geçiş Planı 2024-2025 iki yıllık dönemde 12,7 milyar euro (6,3 milyar özellikle Geçiş 5.0) ve tarıma da uygulanabilir: 2025 Bütçe Kanunu (L. 207/2024) Uygulama kapsamı genişletilerek tarım işletmeleri vergi kredisine uygun hale getirildi Dijital ve enerji verimli teknolojilere yatırımlar için.
Tarım için IoT Mimarisi: Sensörden Veri Gölüne
Tek bir satır kod yazmadan önce sistem mimarisini anlamak önemlidir. tamamlandı. Başlangıçta yapılan bir tasarım hatası, ölçeklendirdiğinizde maliyetli yeniden yazma işlemlerine yol açar 10.000'de 10 sensör. Burada tanımladığımız mimari, ana mimarilerin benimsediği mimaridir. sektördeki operatörler tarafından gerçek üretim ortamlarında doğrulanmıştır.
Uçtan Uca Mimari: Katmanlar ve Bileşenler
┌─────────────────────────────────────────────────────────────────────┐
│ FIELD LAYER (Campo) │
│ [Sensore Suolo] [Stazione Meteo] [Sensore pH] [Drone Mapping] │
│ │ │ │ │ │
│ └────────────────┴────────────────┘ │ │
│ │ LoRaWAN / Zigbee / RS-485 │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ GATEWAY LAYER (Edge) │
│ [Gateway LoRaWAN / Raspberry Pi 4] │
│ - Aggregazione dati multi-sensore │
│ - Pre-elaborazione e filtro outlier │
│ - Buffer locale (offline tolerance) │
│ - Protocollo: MQTT publish su broker locale │
└────────────────────────┼────────────────────────────────────────────┘
│ MQTT / TLS
┌────────────────────────┼────────────────────────────────────────────┐
│ BROKER LAYER (Fog/Cloud) │
│ [EMQX / HiveMQ / Eclipse Mosquitto] │
│ - Topic management gerarchico │
│ - Autenticazione mTLS / JWT │
│ - QoS management e message persistence │
│ - Bridge verso cloud (AWS IoT / Azure IoT Hub) │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ PROCESSING LAYER (Cloud) │
│ [Apache Kafka] ──► [Stream Processor] ──► [InfluxDB] │
│ - Ingestion stream - Validazione schema - Time-series store │
│ - Partitioning - Enrichment - Retention policy │
│ - Consumer groups - Alerting real-time - Downsampling │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ DATA LAKE (Medallion Architecture) │
│ [Bronze: Raw S3] ──► [Silver: Cleaned] ──► [Gold: Analytics] │
│ - Dati grezzi MQTT - Schema validato - Aggregazioni │
│ - Immutabile - Outlier rimossi - ML features │
│ - Formato: Parquet - Formato: Delta/Iceberg - Formato: Parquet │
└────────────────────────┼────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────────────────────────┐
│ SERVING LAYER (Applicazioni) │
│ [Grafana Dashboard] [Alert Manager] [ML Models] [Mobile App] │
│ - Real-time monitoring - SMS/Email/Push - Previsioni rese │
│ - Mappa campo - Soglie critiche - Ottimizzazione input │
└─────────────────────────────────────────────────────────────────────┘
Her seviyenin farklı ve ayrılmış sorumlulukları vardır. Bu ayrılık ve eleştiri: şunları yapmanızı sağlar: bir bileşeni dokunmadan değiştirin (örn. ölçeklendirme sırasında Mosquitto'dan EMQX'e geçiş yapın) diğerleri. Ayrıca çevrimdışı toleransı uygulamanıza olanak tanır: Bulut bağlantısı kesilirse ağ geçidi verileri yerel olarak toplamaya ve ara belleğe almaya devam eder.
Tarımsal Sensörler: Türleri, Protokolleri ve Dağıtımı
Sahaya Yönelik Sensörlerin Ana Kategorileri
Doğru sensörlerin seçilmesi mahsule, toprak koşullarına ve hedeflere bağlıdır tarım uzmanları. 2025 yılı için teknik özellikleri ve gösterge maliyetleri içeren ana kategoriler şunlardır:
Hassas Tarıma Yönelik Sensörler
| Kategori | Parametre | Teknoloji | Birim Maliyet | Protokol |
|---|---|---|---|---|
| Toprak nemi (VWC) | Hacimsel su içeriği | FDR, TDR, Kapasitif | 30-150 Avro | SDI-12, RS-485, LoRa |
| Toprak sıcaklığı | T 10/30/50cm'de topraklanmış | PT100, NTC | 20-80 Avro | SDI-12, I2C, 1 Kablolu |
| Toprak pH'ı | Yerinde asitlik | ISE elektrodu | 80-300 Avro | RS-485, Modbus |
| Elektrik iletkenliği (EC) | Tuzluluk, doğurganlık | Endüktif, kontak | 60-200 Avro | SDI-12, RS-485 |
| Hava istasyonu | T, HR, rüzgar, yağmur, radyasyon | Entegre çoklu sensör | 200-800 Avro | RS-485, Wi-Fi, LoRa |
| Yaprak sensörü | Yaprak nemi, sıcaklık | Kapasitif, IR | 40-120 Avro | SDI-12, I2C |
| Sulama debimetresi | Su akış hızı | Ultrasonik, pervane | 80-350 Avro | Darbe, RS-485 |
| Taşınabilir NDVI sensörü | Bitkisel indeks | Multispektral | 300-1500 Avro | Bluetooth, Wi-Fi |
Kablosuz İletişim Protokolleri: En İyi Karşılaştırma
Kablosuz protokol seçimi sistem mimarisindeki belki de en kritik karardır Tarımsal IoT. Sahalar aşırı özelliklere sahiptir: 10 km'ye kadar mesafeler, fiziksel engeller (sıralar, ağaçlar, kırsal binalar), şebeke elektriği yok, sıcaklıklar -20 ile +60 santigrat derece arasında.
Tarımsal IoT için Kablosuz Protokollerin Karşılaştırılması
| Protokol | Menzil | Bant | Davul | Altyapı maliyeti | Kullanım örneği |
|---|---|---|---|---|---|
| LoRaWAN | 3-15 km | 0,3-50kbps | 5-10 yıl | Orta (ağ geçidi) | Toprak sensörleri, uzak saha hava durumu |
| NB-IoT | 10+ kilometre | 20-250kbps | 3-8 yıl | Düşük (SIM kart) | 4G/5G kapsama alanına sahip alanlar |
| Zigbee | 10-100m | 250kbps | 1-3 yıl | Düşük (örgü) | Seralar, otomatik sulama sistemleri |
| Wi-Fi 6 | 100-200m | Yüksek (Gbps) | Saat/gün | Orta (AP) | Kamera sistemleri, video kalite analizi |
| 4G/LTE | Sınırsız | Yüksek | 1-5 yıl | Orta (SIM) | Tarım makineleri, mobil ağ geçidi |
| RS-485 (kablolu) | 1200 m | 10 Mb/sn | Yok | Bas (yol açar) | Kontrollü seralar, sabit sistemler |
Tipik bir İtalyan çiftliği için (açık alanda 50-500 hektar), en yaygın çözüm 2025 ve hibrit mimari: Toprak sensörleri için LoRaWAN uzak alanlarda, Tarım makineleri için NB-IoT hareket halinde, e Seralar için WiFi/kablolu Hassasiyet ve örnekleme oranının maksimum olduğu yer. Merkezi ağ geçidi (genellikle bir Raspberry Pi 4 veya Dragino endüstriyel ağ geçidi) her şeyi toplar ve MQTT aracılığıyla bulutta yayınlar.
MQTT Derinlemesine İnceleme: Mimari, Hizmet Kalitesi ve En İyi Uygulamalar
MQTT (Message Queuing Telemetri Transport), IoT'nin fiili protokolüdür. IBM tarafından oluşturuldu 1990'lı yıllarda petrol boru hatlarının uydu aracılığıyla izlenmesi, ISO/IEC 20922 standardı ve kalbi haline geldi. herhangi bir ciddi IoT sisteminin Sadeliği ve gücü onu ortamlar için ideal kılar. sınırlı bant genişliği ve düşük güç tüketimi cihazları.
Yayınlama/Abone Olma Modeli
HTTP'nin tipik istek/yanıt modelinden farklı olarak MQTT şu paradigmayı kullanır: yayınla/abone ol: veri üreticileri (yayıncılar) bunları kimin okuduğunu bilmiyor ve ben tüketiciler (aboneler) kimin yayınladığını bilmiyor. Ayrışma tamdır ve bir aracılık eder. Merkezi bileşen adı verilen komisyoncu.
Veriler şu şekilde düzenlenmiştir: başlıkeğik çizgilerle ayrılmış hiyerarşik dizeler Verilerin doğasını açıklayın. İyi tasarlanmış bir konu, ölçeklenebilir bir mimarinin anahtarıdır. Çok parselli bir çiftlik için aşağıdaki yapı önerilir:
# Struttura topic MQTT consigliata per agricoltura di precisione
# Pattern: azienda/appezzamento/dispositivo/tipo-sensore/metrica
# Esempi concreti:
farm/campo-nord/sensor-001/soil/moisture # Umidita suolo sensore 001
farm/campo-nord/sensor-001/soil/temperature # Temperatura suolo sensore 001
farm/campo-nord/sensor-001/soil/ph # pH suolo sensore 001
farm/campo-nord/sensor-001/soil/ec # Conducibilita elettrica
farm/campo-sud/weather-station/air/temperature # Temperatura aria stazione meteo
farm/campo-sud/weather-station/air/humidity # Umidita aria
farm/campo-sud/weather-station/wind/speed # Velocita vento
farm/campo-sud/weather-station/rain/mm # Precipitazioni
farm/+/+/soil/moisture # Wildcard: umidita suolo da TUTTI i campi e sensori
farm/campo-nord/# # Wildcard: TUTTI i dati dal campo nord
farm/# # Wildcard: TUTTI i dati dell'azienda
# Topic di sistema (prefisso $)
$SYS/brokers/emqx/connections/count # Statistiche broker
farm/campo-nord/sensor-001/$status # Status device (LWT)
farm/campo-nord/sensor-001/$command # Comandi al dispositivo
Hizmet Kalitesi (QoS): Üç Düzey
MQTT QoS, istemci ve komisyoncu arasındaki mesaj teslim garantisini tanımlar. Doğru seviyenin seçilmesi pili, bant genişliğini ve sistem güvenilirliğini doğrudan etkiler:
MQTT QoS: Ayrıntılı Karşılaştırma
| Seviye | İsim | Garanti | Tepegöz | Tarımda Kullanım |
|---|---|---|---|---|
| Hizmet Kalitesi 0 | En fazla bir kez | Yok (ateş et ve unut) | Minimum (1 paket) | Yüksek frekanslı telemetri (her 5 saniyede bir T), kabul edilebilir kayıp |
| Hizmet Kalitesi 1 | En az bir kez | En az bir teslimat (olası kopyalar) | Düşük (2 paket, ACK) | Nem/pH sensörü okumaları, sulama günlüğü |
| Hizmet Kalitesi 2 | Tam olarak bir kez | Yalnızca bir kez garantili teslimat | Yüksek (4 paket) | Sulama vanası kontrolleri, kritik alarmlar, gübre dozajlaması |
Saklanan Mesajlar ve Son Vasiyetname
Tarımda özellikle yararlı olan iki MQTT özelliği:
- Saklanan Mesajlar: Aracı, bir konudaki son mesajı saklar ve her yeni aboneye anında ulaştırılır. Sensörlerin mevcut durumu açısından kritik: bağlanan bir kontrol paneli, bir sonrakini beklemeden en güncel değerleri anında alır yayın döngüsü.
- Son Vasiyetname (LWT): Bağlanırken her cihaz durumunda komisyoncunun otomatik olarak yayınlayacağı bir "vasiyet" mesajı yapılandırabilir. bağlantı anormal şekilde başarısız oluyor. Yoklama olmadan çevrimdışı sensörleri tespit etmek için vazgeçilmez aktif: sensörün bağlantısı doğru şekilde kesilmiyorsa (düşük pil, parazit), aracı, durum konusunda "çevrimdışı" durumunu otomatik olarak yayınlar.
Tam Python Uygulaması: Sensor Publisher
Nemli gerçekçi bir tarımsal sensör düğümünü simüle eden bir Python yayıncısı uyguluyoruz
toprak, sıcaklık, pH ve EC. Kodun kullandığı paho-mqtt 2.x (Geri aramalara sahip modern API
güncellendi) ve tüm en iyi uygulamaları hayata geçiriyor: LWT, saklanan mesajlar, uygun QoS,
otomatik yeniden bağlanma ve yapılandırılmış JSON şeması.
# sensor_node.py
# Nodo sensore MQTT per agricoltura di precisione
# Dipendenze: pip install paho-mqtt pydantic
import paho.mqtt.client as mqtt
import json
import time
import random
import math
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
import ssl
# ── Configurazione ────────────────────────────────────────────────────────────
BROKER_HOST = "emqx.azienda-agricola.it"
BROKER_PORT = 8883 # TLS
KEEPALIVE = 60 # secondi
CLIENT_ID = "sensor-campo-nord-001"
FARM_ID = "farm-001"
FIELD_ID = "campo-nord"
SENSOR_ID = "sensor-001"
# Topic base
TOPIC_BASE = f"{FARM_ID}/{FIELD_ID}/{SENSOR_ID}"
TOPIC_SOIL = f"{TOPIC_BASE}/soil"
TOPIC_STATUS = f"{TOPIC_BASE}/$status"
TOPIC_COMMAND = f"{TOPIC_BASE}/$command"
# Intervallo di pubblicazione in secondi
PUBLISH_INTERVAL = 30
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger(__name__)
# ── Modello dati sensore ───────────────────────────────────────────────────────
@dataclass
class SoilReading:
"""Lettura completa da un nodo sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str # ISO 8601 UTC
latitude: float
longitude: float
depth_cm: int # Profondità di installazione
# Misurazioni suolo
moisture_pct: float # Umidita volumetrica (VWC) %
temperature_c: float # Temperatura suolo gradi C
ph: float # pH suolo (4.0 - 9.0)
ec_ds_m: float # Conducibilita elettrica dS/m
# Metadata dispositivo
battery_pct: int # Livello batteria
rssi_dbm: int # Signal strength in dBm
firmware_version: str
# Flag qualità
quality_flag: str # "OK", "WARN", "ERROR"
quality_notes: Optional[str] = None
def read_sensors_from_hardware() -> SoilReading:
"""
In produzione: legge i sensori reali via SDI-12 o RS-485.
Qui: simula dati realistici con variazione temporale.
"""
now = datetime.now(timezone.utc)
# Ciclo circadiano per temperatura (più alta nelle ore centrali)
hour = now.hour
temp_base = 18.0
temp_variation = 6.0 * math.sin(math.pi * (hour - 6) / 12) if 6 <= hour <= 18 else -2.0
temperature = temp_base + temp_variation + random.gauss(0, 0.3)
# Umidita: degrada lentamente senza pioggia, segue ciclo stagionale
base_moisture = 35.0 # % VWC campo (35% = campo saturo irrigazione)
moisture = base_moisture + random.gauss(0, 1.5)
moisture = max(5.0, min(60.0, moisture)) # clamp fisico
# pH relativamente stabile
ph = 6.8 + random.gauss(0, 0.1)
ph = max(4.0, min(9.0, ph))
# EC: correlata alla salinita e fertilizzazione
ec = 1.2 + random.gauss(0, 0.05)
ec = max(0.1, min(5.0, ec))
# Battery che decresce lentamente (simulazione)
battery = max(10, 95 - int(time.time() / 3600) % 85)
# Quality flag automatico
quality = "OK"
notes = None
if moisture < 10.0:
quality = "WARN"
notes = "Umidita sotto soglia minima critica"
elif moisture > 55.0:
quality = "WARN"
notes = "Umidita sopra soglia saturazione"
if battery < 15:
quality = "WARN"
notes = (notes or "") + " | Batteria in esaurimento"
return SoilReading(
sensor_id = SENSOR_ID,
farm_id = FARM_ID,
field_id = FIELD_ID,
timestamp = now.isoformat(),
latitude = 40.4164,
longitude = 17.9308,
depth_cm = 30,
moisture_pct = round(moisture, 2),
temperature_c = round(temperature, 2),
ph = round(ph, 2),
ec_ds_m = round(ec, 3),
battery_pct = battery,
rssi_dbm = random.randint(-95, -45),
firmware_version = "2.4.1",
quality_flag = quality,
quality_notes = notes,
)
# ── Client MQTT ───────────────────────────────────────────────────────────────
class AgriSensorNode:
"""Nodo sensore MQTT con auto-reconnect e LWT"""
def __init__(self):
self.client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
self._setup_auth()
self._setup_tls()
self._setup_callbacks()
self._setup_lwt()
self.connected = False
def _setup_auth(self):
self.client.username_pw_set(
username="sensor-user",
password="<TOKEN_SEGRETO>"
)
def _setup_tls(self):
"""TLS mutuo con certificato dispositivo"""
self.client.tls_set(
ca_certs = "/certs/ca.crt",
certfile = "/certs/sensor.crt",
keyfile = "/certs/sensor.key",
tls_version = ssl.PROTOCOL_TLS_CLIENT,
)
def _setup_callbacks(self):
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_publish = self._on_publish
def _setup_lwt(self):
"""Last Will Testament: pubblicato dal broker se la connessione cade"""
lwt_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "connection_lost",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.will_set(
topic = TOPIC_STATUS,
payload = lwt_payload,
qos = 1,
retain = True, # Retain: dashboard vede subito lo stato offline
)
def _on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code.is_failure:
log.error(f"Connessione fallita: {reason_code}")
return
log.info(f"Connesso al broker: {BROKER_HOST}")
self.connected = True
# Pubblica stato online (retained)
online_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "online",
"timestamp": datetime.now(timezone.utc).isoformat(),
"firmware": "2.4.1",
})
client.publish(TOPIC_STATUS, online_payload, qos=1, retain=True)
# Sottoscrivi ai comandi
client.subscribe(TOPIC_COMMAND, qos=2)
log.info(f"Sottoscritto a: {TOPIC_COMMAND}")
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
self.connected = False
log.warning(f"Disconnesso: {reason_code}. Tentativo riconnessione...")
def _on_message(self, client, userdata, message):
"""Gestione comandi ricevuti dal broker (es. cambio intervallo)"""
try:
payload = json.loads(message.payload.decode())
cmd = payload.get("command")
log.info(f"Comando ricevuto: {cmd}")
if cmd == "set_interval":
global PUBLISH_INTERVAL
PUBLISH_INTERVAL = int(payload.get("value", 30))
log.info(f"Intervallo aggiornato a {PUBLISH_INTERVAL}s")
elif cmd == "reboot":
log.warning("Comando reboot ricevuto")
# In produzione: riavvia il sistema
except Exception as e:
log.error(f"Errore parsing comando: {e}")
def _on_publish(self, client, userdata, mid, reason_code, properties):
log.debug(f"Messaggio {mid} pubblicato con successo")
def connect(self):
self.client.connect(
host = BROKER_HOST,
port = BROKER_PORT,
keepalive = KEEPALIVE,
)
self.client.loop_start() # Thread background per I/O
def publish_reading(self, reading: SoilReading):
"""Pubblica lettura sensore su topic appropriati"""
# Payload principale: lettura completa
payload_full = json.dumps(asdict(reading), default=str)
result = self.client.publish(
topic = TOPIC_SOIL,
payload = payload_full,
qos = 1,
retain = True, # Ultimo valore sempre disponibile
)
# Pubblica anche metriche singole per dashboard real-time
metrics = {
"moisture": (reading.moisture_pct, 1),
"temperature": (reading.temperature_c, 0),
"ph": (reading.ph, 1),
"ec": (reading.ec_ds_m, 0),
}
for metric, (value, qos) in metrics.items():
self.client.publish(
topic = f"{TOPIC_SOIL}/{metric}",
payload = str(value),
qos = qos,
retain = True,
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
log.info(
f"Pubblicato | Moisture: {reading.moisture_pct}% | "
f"Temp: {reading.temperature_c}C | pH: {reading.ph} | "
f"EC: {reading.ec_ds_m} dS/m | Quality: {reading.quality_flag}"
)
else:
log.error(f"Errore pubblicazione: {result.rc}")
def run(self):
"""Loop principale del nodo sensore"""
self.connect()
# Attendi connessione iniziale
timeout = 10
while not self.connected and timeout > 0:
time.sleep(1)
timeout -= 1
if not self.connected:
log.error("Impossibile connettersi al broker")
return
log.info(f"Nodo sensore avviato. Intervallo: {PUBLISH_INTERVAL}s")
try:
while True:
reading = read_sensors_from_hardware()
self.publish_reading(reading)
time.sleep(PUBLISH_INTERVAL)
except KeyboardInterrupt:
log.info("Shutdown richiesto")
finally:
# Disconnessione pulita: il LWT NON viene inviato
offline_payload = json.dumps({
"sensor_id": SENSOR_ID,
"status": "offline",
"reason": "graceful_shutdown",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
self.client.publish(TOPIC_STATUS, offline_payload, qos=1, retain=True)
time.sleep(0.5)
self.client.loop_stop()
self.client.disconnect()
if __name__ == "__main__":
node = AgriSensorNode()
node.run()
Tüketici İşlem Hattı: Doğrulama, Zenginleştirme ve Depolama
Tüketici sistemin diğer ucudur. MQTT konularına abone olur, alınan verileri doğrular Pydantic, bağlamsal verilerle (tarımsal bilgiler, hava durumu uyarıları) okumaları zenginleştirir ve bunları zaman serisi depolama için InfluxDB'ye ve veri gölü için S3'e yönlendirir.
# pipeline_consumer.py
# Consumer MQTT + validazione Pydantic + storage InfluxDB
# Dipendenze: pip install paho-mqtt pydantic influxdb-client boto3
import paho.mqtt.client as mqtt
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel, Field, field_validator, ValidationError
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import boto3
import io
log = logging.getLogger(__name__)
# ── Schema di validazione Pydantic ────────────────────────────────────────────
class SoilReadingSchema(BaseModel):
"""Schema di validazione per letture sensore suolo"""
sensor_id: str
farm_id: str
field_id: str
timestamp: str
latitude: float = Field(ge=-90, le=90)
longitude: float = Field(ge=-180, le=180)
depth_cm: int = Field(ge=0, le=200)
moisture_pct: float = Field(ge=0.0, le=100.0)
temperature_c: float = Field(ge=-40.0, le=80.0)
ph: float = Field(ge=0.0, le=14.0)
ec_ds_m: float = Field(ge=0.0, le=20.0)
battery_pct: int = Field(ge=0, le=100)
rssi_dbm: int = Field(ge=-150, le=0)
firmware_version: str
quality_flag: str = Field(pattern="^(OK|WARN|ERROR)$")
quality_notes: Optional[str] = None
@field_validator("timestamp")
@classmethod
def validate_timestamp(cls, v: str) -> str:
"""Verifica che il timestamp sia ISO 8601 valido e non nel futuro"""
try:
ts = datetime.fromisoformat(v)
if ts > datetime.now(timezone.utc):
raise ValueError("Timestamp nel futuro")
except ValueError as e:
raise ValueError(f"Timestamp non valido: {e}")
return v
@field_validator("ph")
@classmethod
def validate_ph_agronomico(cls, v: float) -> float:
"""pH fuori range agronomico (4.5-8.5) e anomalia"""
if v < 4.5 or v > 8.5:
log.warning(f"pH {v} fuori range agronomico tipico [4.5-8.5]")
return v
def has_critical_alert(self) -> bool:
"""Verifica se la lettura richiede un alert critico"""
return (
self.moisture_pct < 10.0 or
self.moisture_pct > 58.0 or
self.ph < 4.5 or
self.ph > 8.5 or
self.ec_ds_m > 4.0 or
self.battery_pct < 10
)
# ── Storage InfluxDB ──────────────────────────────────────────────────────────
class InfluxDBWriter:
"""Writer per time-series su InfluxDB 2.x"""
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
def write_soil_reading(self, reading: SoilReadingSchema):
"""Scrive una lettura suolo su InfluxDB con tags e fields ottimizzati"""
point = (
Point("soil_reading")
# Tags: usati per filtro e group-by (cardinalita limitata)
.tag("sensor_id", reading.sensor_id)
.tag("farm_id", reading.farm_id)
.tag("field_id", reading.field_id)
.tag("depth_cm", str(reading.depth_cm))
.tag("quality", reading.quality_flag)
# Fields: metriche numeriche
.field("moisture_pct", reading.moisture_pct)
.field("temperature_c", reading.temperature_c)
.field("ph", reading.ph)
.field("ec_ds_m", reading.ec_ds_m)
.field("battery_pct", float(reading.battery_pct))
.field("rssi_dbm", float(reading.rssi_dbm))
# Timestamp dalla lettura del sensore (non dall'arrivo)
.time(datetime.fromisoformat(reading.timestamp), WritePrecision.SECONDS)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
log.debug(f"Scritto su InfluxDB: {reading.sensor_id} @ {reading.timestamp}")
def close(self):
self.client.close()
# ── Bronze Layer su S3 ────────────────────────────────────────────────────────
class S3BronzeWriter:
"""Scrive dati grezzi su S3 (Bronze layer Medallion Architecture)"""
def __init__(self, bucket: str, region: str = "eu-south-1"):
self.s3 = boto3.client("s3", region_name=region)
self.bucket = bucket
def write_raw(self, raw_payload: str, topic: str, received_at: datetime):
"""
Salva il payload grezzo in formato NDJSON su S3.
Partitionamento per data: year/month/day/hour/
"""
prefix = received_at.strftime("year=%Y/month=%m/day=%d/hour=%H")
filename = f"{prefix}/{received_at.isoformat()}.json"
envelope = {
"topic": topic,
"received_at": received_at.isoformat(),
"payload": json.loads(raw_payload),
}
self.s3.put_object(
Bucket = self.bucket,
Key = filename,
Body = json.dumps(envelope).encode("utf-8"),
ContentType = "application/json",
)
log.debug(f"Bronze layer: scritto {filename}")
# ── Alert Manager ─────────────────────────────────────────────────────────────
class AlertManager:
"""Gestione alert critici con cooldown per evitare spam"""
def __init__(self):
self._last_alert: dict[str, datetime] = {}
self.cooldown_seconds = 300 # 5 minuti tra un alert e l'altro per sensor
def check_and_alert(self, reading: SoilReadingSchema):
if not reading.has_critical_alert():
return
sensor_key = reading.sensor_id
now = datetime.now(timezone.utc)
last = self._last_alert.get(sensor_key)
if last and (now - last).total_seconds() < self.cooldown_seconds:
return # In cooldown, skip
self._last_alert[sensor_key] = now
self._send_alert(reading)
def _send_alert(self, reading: SoilReadingSchema):
"""In produzione: invia SMS/email/push. Qui: log."""
alerts = []
if reading.moisture_pct < 10.0:
alerts.append(f"STRESS IDRICO: umidita {reading.moisture_pct}% sotto soglia critica 10%")
if reading.moisture_pct > 58.0:
alerts.append(f"SATURAZIONE: umidita {reading.moisture_pct}% sopra saturazione")
if reading.ph < 4.5:
alerts.append(f"pH CRITICO: {reading.ph} - suolo troppo acido")
if reading.ec_ds_m > 4.0:
alerts.append(f"SALINITA CRITICA: EC {reading.ec_ds_m} dS/m")
if reading.battery_pct < 10:
alerts.append(f"BATTERIA: {reading.battery_pct}% - sostituire")
for alert in alerts:
log.critical(f"ALERT [{reading.sensor_id}] {alert}")
# TODO: self.sms_client.send(...)
# TODO: self.email_client.send(...)
# ── Pipeline Consumer principale ──────────────────────────────────────────────
class AgriPipelineConsumer:
"""Consumer MQTT con validazione, storage e alerting integrati"""
def __init__(self):
self.influx = InfluxDBWriter(
url = "https://influxdb.azienda.it:8086",
token = "<INFLUX_TOKEN>",
org = "azienda-agricola",
bucket = "farm-sensors",
)
self.s3_bronze = S3BronzeWriter(bucket="farm-raw-data-bronze")
self.alert_mgr = AlertManager()
self.client = mqtt.Client(
client_id = "pipeline-consumer-001",
protocol = mqtt.MQTTv5,
callback_api_version = mqtt.CallbackAPIVersion.VERSION2,
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.stats = {"received": 0, "valid": 0, "errors": 0, "alerts": 0}
def _on_connect(self, client, userdata, flags, rc, props):
log.info("Consumer connesso al broker")
# Sottoscrive a TUTTI i sensori suolo di TUTTE le farm
client.subscribe("farm/+/+/soil", qos=1)
client.subscribe("farm/+/+/$status", qos=1)
log.info("Sottoscritto a: farm/+/+/soil e farm/+/+/$status")
def _on_message(self, client, userdata, message):
received_at = datetime.now(timezone.utc)
self.stats["received"] += 1
raw_payload = message.payload.decode("utf-8")
try:
# 1. Parse JSON
data = json.loads(raw_payload)
# 2. Salva Bronze layer (dato grezzo, prima di qualsiasi trasformazione)
self.s3_bronze.write_raw(raw_payload, message.topic, received_at)
# 3. Valida con Pydantic
reading = SoilReadingSchema(**data)
self.stats["valid"] += 1
# 4. Scrivi su InfluxDB (time-series)
self.influx.write_soil_reading(reading)
# 5. Check alert
self.alert_mgr.check_and_alert(reading)
if reading.has_critical_alert():
self.stats["alerts"] += 1
log.info(
f"Processed | {reading.sensor_id} | "
f"moisture={reading.moisture_pct}% | "
f"quality={reading.quality_flag}"
)
except json.JSONDecodeError as e:
self.stats["errors"] += 1
log.error(f"JSON non valido da {message.topic}: {e}")
except ValidationError as e:
self.stats["errors"] += 1
log.error(f"Validazione fallita per {message.topic}: {e}")
# Salva comunque su Bronze (dato anomalo ma registrato)
except Exception as e:
self.stats["errors"] += 1
log.exception(f"Errore imprevisto: {e}")
def run(self):
self.client.connect("emqx.azienda-agricola.it", 8883)
log.info("Pipeline consumer avviata. Ctrl+C per fermare.")
try:
self.client.loop_forever()
except KeyboardInterrupt:
pass
finally:
self.influx.close()
log.info(
f"Stats finali: {self.stats}"
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
consumer = AgriPipelineConsumer()
consumer.run()
Tarımsal Veriler için Madalyon Mimarisi: Bronz, Gümüş, Altın
Madalyon Mimarisi (Databricks tarafından tanıtıldı ancak artık veri mühendisliğinde fiili bir standart) Verileri üç aşamalı kalite katmanı halinde düzenler. Tarımsal IoT verilerine uygulandığında şu sorunları çözer: gerçek sorunlar: aykırı değerler gönderen sensörler, RTC sapması için yanlış zaman damgaları, okumalar QoS 1 ile MQTT yeniden iletimi için kopyalanır ve kontrol paneli başına farklı toplamalara ihtiyaç duyulur gerçek zamanlı ve uzun vadeli makine öğrenimi modelleri.
Tarımsal IoT için Madalyon Mimarisi
| Katmanlar | Biçim | İçerik | Operasyonlar | Tutulma |
|---|---|---|---|---|
| Bronz (Ham) | JSON / Parke | Ham, değişmez MQTT yükü | Dönüşüm yok, sadece depola | 5 yıl (düzenleyici) |
| Gümüş (Temizlenmiş) | Delta Gölü / Buzdağları | Normalleştirilmiş model, aykırı değerler kaldırıldı | Tekilleştirme, aykırı değer filtresi, tür dönüşümü | 3 yıl |
| Altın (Analitik) | Parke | Saatlik/günlük toplamalar, makine öğrenimi özellikleri | Toplama, hava durumu/uydu ile birleştirme | 10 yıl |
# medallion_pipeline.py
# Pipeline Medallion per dati agricoli IoT con PySpark / pandas
# In produzione: usa Databricks, AWS Glue, o dbt su Spark
# Qui: versione pandas per sviluppo locale e testing
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import json
import hashlib
DATA_ROOT = Path("/data/farm")
# ── BRONZE LAYER ──────────────────────────────────────────────────────────────
def load_bronze(date: str) -> pd.DataFrame:
"""
Carica dati grezzi dal Bronze layer (JSON NDJSON).
Nessuna trasformazione: solo lettura e schema enforcement minimo.
"""
bronze_path = DATA_ROOT / "bronze" / date
records = []
for f in bronze_path.glob("**/*.json"):
with open(f) as fp:
envelope = json.load(fp)
records.append(envelope)
if not records:
return pd.DataFrame()
# DataFrame con schema minimo garantito
df = pd.json_normalize(records, sep="_")
df["_bronze_loaded_at"] = datetime.utcnow().isoformat()
df["_source_file"] = [str(f) for f in bronze_path.glob("**/*.json")]
return df
# ── SILVER LAYER ──────────────────────────────────────────────────────────────
def transform_bronze_to_silver(df_bronze: pd.DataFrame) -> pd.DataFrame:
"""
Trasformazioni Bronze → Silver:
1. Parse e normalizza timestamp
2. Rimuovi duplicati (QoS 1 può consegnare più volte)
3. Filtra outlier fisicamente impossibili
4. Cast tipi corretti
5. Aggiungi colonne derivate
"""
if df_bronze.empty:
return pd.DataFrame()
df = df_bronze.copy()
# 1. Parse timestamp
df["ts"] = pd.to_datetime(df["payload_timestamp"], utc=True, errors="coerce")
df = df.dropna(subset=["ts"])
# 2. Deduplica: stesso sensor_id + timestamp = stesso dato
df["_dedup_key"] = df.apply(
lambda r: hashlib.md5(
f"{r.get('payload_sensor_id', '')}{r.get('payload_timestamp', '')}".encode()
).hexdigest(),
axis=1
)
df = df.drop_duplicates(subset=["_dedup_key"], keep="first")
# 3. Filtro outlier fisici
df = df[df["payload_moisture_pct"].between(0, 100)]
df = df[df["payload_temperature_c"].between(-40, 80)]
df = df[df["payload_ph"].between(0, 14)]
df = df[df["payload_ec_ds_m"].between(0, 20)]
# 4. Flag outlier agronomici (mantieni, ma etichetta)
df["is_agronomic_outlier"] = (
~df["payload_moisture_pct"].between(5, 60) |
~df["payload_ph"].between(4.5, 8.5) |
(df["payload_ec_ds_m"] > 4.0)
)
# 5. Colonne derivate
df["date"] = df["ts"].dt.date
df["hour"] = df["ts"].dt.hour
df["month"] = df["ts"].dt.month
# 6. Rename colonne per schema pulito
df = df.rename(columns={
"payload_sensor_id": "sensor_id",
"payload_farm_id": "farm_id",
"payload_field_id": "field_id",
"payload_moisture_pct": "moisture_pct",
"payload_temperature_c": "temperature_c",
"payload_ph": "ph",
"payload_ec_ds_m": "ec_ds_m",
"payload_battery_pct": "battery_pct",
"payload_quality_flag": "quality_flag",
})
# Seleziona solo colonne rilevanti
cols = [
"sensor_id", "farm_id", "field_id", "ts", "date", "hour", "month",
"moisture_pct", "temperature_c", "ph", "ec_ds_m", "battery_pct",
"quality_flag", "is_agronomic_outlier"
]
df = df[[c for c in cols if c in df.columns]]
df["_silver_processed_at"] = datetime.utcnow().isoformat()
return df
# ── GOLD LAYER ────────────────────────────────────────────────────────────────
def transform_silver_to_gold(df_silver: pd.DataFrame) -> dict[str, pd.DataFrame]:
"""
Trasformazioni Silver → Gold:
Produce più tabelle di aggregazione per usi diversi.
"""
if df_silver.empty:
return {}
# Filtra outlier agronomici per analytics
df = df_silver[~df_silver["is_agronomic_outlier"]].copy()
gold_tables = {}
# ─── Aggregazione oraria per dashboard ───
hourly = df.groupby(["sensor_id", "farm_id", "field_id", "date", "hour"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_min = ("moisture_pct", "min"),
moisture_max = ("moisture_pct", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
readings_n = ("moisture_pct", "count"),
).reset_index()
hourly["moisture_avg"] = hourly["moisture_avg"].round(2)
gold_tables["hourly_aggregations"] = hourly
# ─── Aggregazione giornaliera per reporting agronomico ───
daily = df.groupby(["sensor_id", "farm_id", "field_id", "date"]).agg(
moisture_avg = ("moisture_pct", "mean"),
moisture_std = ("moisture_pct", "std"),
temp_min = ("temperature_c", "min"),
temp_max = ("temperature_c", "max"),
temp_avg = ("temperature_c", "mean"),
ph_avg = ("ph", "mean"),
ec_avg = ("ec_ds_m", "mean"),
stress_hours = ("moisture_pct", lambda x: (x < 20).sum()),
readings_n = ("moisture_pct", "count"),
).reset_index()
gold_tables["daily_agronomic"] = daily
# ─── Feature engineering per ML (predizione irrigazione) ───
ml_features = df.copy()
ml_features["moisture_lag_1h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(2)
ml_features["moisture_lag_3h"] = ml_features.groupby("sensor_id")["moisture_pct"].shift(6)
ml_features["moisture_trend"] = (
ml_features["moisture_pct"] - ml_features["moisture_lag_3h"]
)
ml_features["needs_irrigation"] = (ml_features["moisture_pct"] < 25.0).astype(int)
gold_tables["ml_irrigation_features"] = ml_features.dropna(
subset=["moisture_lag_1h", "moisture_lag_3h"]
)
return gold_tables
def run_medallion_pipeline(date: str):
"""Esegue la pipeline Medallion per una data specifica"""
log_prefix = f"[Medallion {date}]"
print(f"{log_prefix} Caricamento Bronze...")
bronze = load_bronze(date)
print(f"{log_prefix} Bronze: {len(bronze)} record")
print(f"{log_prefix} Trasformazione Silver...")
silver = transform_bronze_to_silver(bronze)
print(f"{log_prefix} Silver: {len(silver)} record (dopo dedup e filtro outlier)")
# Salva Silver
silver_path = DATA_ROOT / "silver" / date / "soil_readings.parquet"
silver_path.parent.mkdir(parents=True, exist_ok=True)
silver.to_parquet(silver_path, index=False)
print(f"{log_prefix} Trasformazione Gold...")
gold_tables = transform_silver_to_gold(silver)
for table_name, df in gold_tables.items():
gold_path = DATA_ROOT / "gold" / date / f"{table_name}.parquet"
gold_path.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(gold_path, index=False)
print(f"{log_prefix} Gold {table_name}: {len(df)} record")
print(f"{log_prefix} Pipeline completata.")
if __name__ == "__main__":
today = datetime.utcnow().strftime("%Y-%m-%d")
run_medallion_pipeline(today)
InfluxDB ve Apache Kafka ile entegrasyon
Tarımsal Zaman Serisi için InfluxDB
InfluxDB, yüksek frekanslı IoT zaman serileri için en uygun veritabanıdır. PostgreSQL'den farklı olarak
veya MySQL ve zamansal sırayla çok sayıda yazma işlemi için optimize edilmiş, otomatik veri sıkıştırma
yerel toplama işlevleriyle tarihsel (altörnekleme) ve zaman aralıklarındaki sorgular
(mean(), max(), moving_average()).
Her 30 saniyede bir 50 sensörün gönderildiği bir çiftlik için InfluxDB bunu hallediyor rahatça Saniyede 100 yazma tüketici donanımı üzerinde, saklama politikasıyla otomatik: 30 günlük ham veriler, 1 yıllık saatlik toplamalar, günlük toplamalar 10 yıldır. Hepsi sensör başına yılda birkaç MB'lık depolama alanında.
# influxdb_queries.py
# Query Flux per analytics agricole su InfluxDB 2.x
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
client = InfluxDBClient(
url = "https://influxdb.azienda.it:8086",
token = "<TOKEN>",
org = "azienda-agricola",
)
query_api = client.query_api()
# ─── Query 1: Media umidita ultime 24 ore per campo ───────────────────────────
QUERY_MOISTURE_24H = """
from(bucket: "farm-sensors")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r.field_id == "campo-nord")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> yield(name: "hourly_moisture")
"""
# ─── Query 2: Alert stress idrico (moisture < 20% nelle ultime 6h) ────────────
QUERY_DROUGHT_ALERT = """
from(bucket: "farm-sensors")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> filter(fn: (r) => r._value < 20.0)
|> group(columns: ["sensor_id", "field_id"])
|> count()
|> filter(fn: (r) => r._value > 3)
|> yield(name: "drought_sensors")
"""
# ─── Query 3: Trend temperatura settimanale con banda statistica ──────────────
QUERY_TEMP_WEEKLY = """
from(bucket: "farm-sensors")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "temperature_c")
|> aggregateWindow(every: 6h, fn: mean)
|> movingAverage(n: 4)
|> yield(name: "temp_trend_7d")
"""
# ─── Query 4: Stato batterie sensori (per manutenzione preventiva) ────────────
QUERY_BATTERY_STATUS = """
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
|> filter(fn: (r) => r._value < 20)
|> sort(columns: ["_value"])
|> yield(name: "low_battery_sensors")
"""
def run_analytics():
print("=== Dashboard Analytics Agricole ===")
# Esegui query umidita
tables = query_api.query(QUERY_MOISTURE_24H)
for table in tables:
for record in table.records:
print(f" {record.get_time()} | "
f"sensor={record.values.get('sensor_id', 'N/A')} | "
f"moisture={record.get_value():.1f}%")
# Check alert stress idrico
drought_tables = query_api.query(QUERY_DROUGHT_ALERT)
for table in drought_tables:
for record in table.records:
print(f" ALERT STRESS IDRICO: sensor {record.values.get('sensor_id')} "
f"in field {record.values.get('field_id')}")
if __name__ == "__main__":
run_analytics()
Yüksek Ölçekli Akış İşleme için Apache Kafka
Sensör sayısı yüzleri aştığında veya saniyeden kısa gecikmeler gerektiğinde, MQTT bir dağıtım sistemi olarak tek başına yeterli değildir. Apache Kafka bir mesaj veriyolu olarak geliyor MQTT aracısı ile işleme katmanı arasındaki kuruluş. Standart model: MQTT Broker → Kafka Connect (MQTT Kaynak Bağlayıcısı) → Kafka Konuları → Tüketici Grupları.
Kafka bu bağlamda önemli faydalar sağlar: mesajın tekrar oynatılması (tüketicide bir hata varsa, her şey yeniden işlenir), alana/coğrafi alana göre bölümleme, birden fazla bağımsız tüketici (InfluxDB yazarları, uyarı motorları, ML çıkarımı, veri gölü yazarları aynı verileri müdahale) ve Kafka İşlemleri ile tam olarak bir kez garanti eder.
# kafka_consumer.py
# Consumer Kafka per dati sensori agricoli
# Dipendenze: pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import logging
log = logging.getLogger(__name__)
KAFKA_CONFIG = {
"bootstrap.servers": "kafka-broker-1:9092,kafka-broker-2:9092",
"group.id": "farm-analytics-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Commit manuale per exactly-once
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "farm-consumer",
"sasl.password": "<KAFKA_PASSWORD>",
}
TOPICS = ["farm.soil.readings", "farm.weather.readings"]
def process_soil_message(data: dict) -> bool:
"""Elabora un messaggio sensore suolo"""
sensor_id = data.get("sensor_id", "unknown")
moisture = data.get("moisture_pct", 0)
# Routing logica: stress idrico → alert immediato
if moisture < 15.0:
log.warning(f"STRESS IDRICO critico: sensor {sensor_id} = {moisture}%")
# In produzione: pubblica su topic alert
return True
log.info(f"OK | sensor={sensor_id} | moisture={moisture}%")
return True
def run_kafka_consumer():
consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe(TOPICS)
log.info(f"Consumer Kafka avviato su topics: {TOPICS}")
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
log.debug(f"Raggiunto EOF: {msg.topic()}[{msg.partition()}]")
else:
raise KafkaException(msg.error())
continue
# Processa messaggio
try:
data = json.loads(msg.value().decode("utf-8"))
success = process_soil_message(data)
if success:
consumer.commit(message=msg) # Commit solo se processato correttamente
except json.JSONDecodeError as e:
log.error(f"JSON non valido: {e}")
consumer.commit(message=msg) # Commit anche errori per non bloccare
except KeyboardInterrupt:
log.info("Consumer fermato")
finally:
consumer.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_kafka_consumer()
Grafana Kontrol Paneli ve Uyarı Sistemi
Grafana, IoT verilerini gerçek zamanlı olarak görselleştirmek için standart bir araçtır. Yerel olarak entegre olur InfluxDB 2.x ile veri kaynağı aracılığıyla ve coğrafi haritalarla etkileşimli gösterge tabloları oluşturmanıza olanak tanır, ön uç kodu yazmaya gerek kalmadan zaman serisi grafikleri ve durum panelleri.
Tarımsal bir işletme için standart kontrol paneli şunları içerir:
- Saha haritası: Sensör katmanları ve su stresi için renkler içeren coğrafi harita eklentisi
- Nem zaman serisi: Renkli eşiklere sahip 24 saatlik grafik (kırmızı < %15, sarı < %25)
- Pil Göstergesi: Bakım planlaması için tüm sensörlerin pil durumu
- Isı haritası pH'ı: Vuruş kararları için saha başına pH ısı haritası
- Sulama Tahmini: Sonraki 48 saatte ML model çıkışı
# grafana_alert_rules.yaml
# Regole di alerting Grafana per agricoltura di precisione
apiVersion: 1
groups:
- name: farm-critical-alerts
interval: 1m
rules:
# Alert stress idrico critico
- uid: drought-critical
title: "Stress Idrico Critico"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "moisture_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15.0]
type: lt
query: { params: ["A"] }
noDataState: Alerting
execErrState: Alerting
for: 10m
annotations:
summary: "Sensore {{ $labels.sensor_id }} in stress idrico"
description: "Umidita {{ $values.A }}% sotto soglia critica 15%"
labels:
severity: critical
team: agronomist
# Alert batteria scarica
- uid: battery-low
title: "Batteria Sensore in Esaurimento"
condition: C
data:
- refId: A
datasourceUid: influxdb-farm
model:
query: |
from(bucket: "farm-sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "soil_reading")
|> filter(fn: (r) => r._field == "battery_pct")
|> last()
- refId: C
datasourceUid: __expr__
model:
type: threshold
conditions:
- evaluator:
params: [15]
type: lt
query: { params: ["A"] }
for: 5m
annotations:
summary: "Batteria bassa su {{ $labels.sensor_id }}"
labels:
severity: warning
team: maintenance
Anti-Desen: Evrensel Sabit Eşikler
İlk uygulamalarda yaygın olarak yapılan bir hata, tüm ürünler için aynı nem eşik değerlerinin kullanılmasıdır. mahsuller ve her türlü toprak. Bir toprak için %20'lik kritik nem eşiği doğrudur. domatesli killi toprak, ancak asmalı kumlu toprak veya bahçecilik fidanlığı. Eşikler sensöre, kültüre ve faza göre yapılandırılabilir olmalıdır fenolojik. Sabit kodlanmayan, dinamik bir eşik yapılandırma sistemi uygular uyarı kodu.
Dağıtım, Güvenlik ve Ölçeklenebilirlik
Yerel Kalkınma için Docker Compose
Tüm yığını bulut bağımlılıkları olmadan yerel olarak geliştirmek ve test etmek için
docker-compose.yml MQTT, InfluxDB ve Grafana aracılarını içerir:
# docker-compose.yml
# Stack IoT agricolo completo per sviluppo locale
version: "3.9"
services:
# Broker MQTT - Eclipse Mosquitto
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883" # MQTT plain (solo sviluppo)
- "8883:8883" # MQTT TLS
- "9001:9001" # WebSocket
volumes:
- ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./certs:/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_logs:/mosquitto/log
restart: unless-stopped
# Time-series database
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: changeme123!
DOCKER_INFLUXDB_INIT_ORG: azienda-agricola
DOCKER_INFLUXDB_INIT_BUCKET: farm-sensors
DOCKER_INFLUXDB_INIT_RETENTION: 30d
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-token
volumes:
- influxdb_data:/var/lib/influxdb2
restart: unless-stopped
# Dashboard e alerting
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
GF_INSTALL_PLUGINS: grafana-worldmap-panel,grafana-clock-panel
volumes:
- grafana_data:/var/lib/grafana
- ./config/grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
# Pipeline consumer (il nostro codice Python)
pipeline-consumer:
build:
context: .
dockerfile: Dockerfile.consumer
environment:
MQTT_HOST: mosquitto
MQTT_PORT: "1883"
INFLUX_URL: http://influxdb:8086
INFLUX_TOKEN: my-super-secret-token
INFLUX_ORG: azienda-agricola
INFLUX_BUCKET: farm-sensors
depends_on:
- mosquitto
- influxdb
restart: unless-stopped
volumes:
mosquitto_data:
mosquitto_logs:
influxdb_data:
grafana_data:
Tarımsal IoT için En İyi Güvenlik Uygulamaları
Tarımsal IoT sistemlerinin güvenliği genellikle hafife alınmaktadır. Güvenliği ihlal edilmiş bir sensör değişebilir okumalara neden olur ve anormal sulamaya, israfa veya mahsullerin zarar görmesine neden olur. Çıplak minimum:
Tarımsal IoT Güvenlik Kontrol Listesi
- Karşılıklı TLS (mTLS): Her cihazın benzersiz bir istemci sertifikası vardır. Komisyoncu Cihazların kimliğini yalnızca kullanıcı adı/şifreyle değil, sertifika aracılığıyla doğrular.
-
Konu EKL'leri: Her sensör SADECE kendi konularında yayın yapabilir
(
farm/campo-nord/sensor-001/#) ve diğer sensörlerden gelen konuları okumayın. - Firmware imzalama: OTA (kablosuz) güncellemeleri imzalanmalıdır Kötü amaçlı ürün yazılımını önlemek için kriptografik olarak.
- Kimlik bilgisi rotasyonu: 1 yıllık geçerlilik süresine sahip jetonlar ve sertifikalar, rotasyon ACME veya AWS IoT Sertifika Rotasyonu aracılığıyla otomatik.
- Ağ segmentasyonu: Ağdan yalıtılmış, özel VLAN'daki IoT cihazları kurumsal. Aracıya yalnızca ağ geçidinden erişim sağlanır, hiçbir zaman doğrudan internetten erişilmez.
- Hız sınırlaması: Aracı, saniye başına ileti sayısını sınırlar. Kazara su baskını veya saldırıları önlemek için cihaz.
İtalya'da Hassas Tarım: Fırsatlar ve Teşvikler 2025
İtalya, Avrupa tarım teknolojisi panoramasında benzersiz bir konumdadır: gıda sektörüne sahiptir dünyanın en iyisi (DOP, IGP, DOC), parçalı bir üretim yapısı (orta ölçekli şirketler) AB ortalaması olan 33'e kıyasla 11 hektar) ancak çok güçlü bir tarımsal uzmanlığa sahiptir ve bu alanda ilktir. 2024'te 42,4 milyar Euro (yıllık bazda +%9) ile tarımsal katma değer açısından Avrupa.
Buradaki zorluk, Nesnelerin İnterneti ve Yapay Zeka teknolojilerini %95'i oluşturan tarımsal KOBİ'lere de getirmektir. üretken kumaştan. İşte 2025'te kullanılabilecek finansman araçları:
İtalya'da AgriTech ve IoT teşvikleri (2025)
| Ölçüm | Tutar/Fayda | Yararlanıcılar | Son kullanma tarihi |
|---|---|---|---|
| Geçiş Planı 5.0 | Dijital + enerji yatırımlarında %35-45 vergi kredisi | Tüm işletmeler (tarım dahil) | 31/12/2025 |
| PNRR Misyonu 2.3 - Mekanizasyon | 400 milyon avroluk geri ödenmeyen katkılar | 4.0 makinelere yönelik tarım işletmeleri | Tükendi (yeni aramalar bekleniyor) |
| Vergi Kredisi 4.0 (eski adıyla Endüstri 4.0) | Teknolojik sermaye mallarında %20 | Tarımsal ATECO'ya sahip şirketler | Uzatılmış 2025 |
| INAIL ISI Tarım Bildirimi | %65'e kadar geri ödemesiz | Tarımsal işletmeler ve kooperatifler | Yıllık (ilkbahar açılışı) |
| PSP (CAP Stratejik Planı 2023-2027) | Hassas tarıma yönelik primli eko-program önlemleri | Kayıtlı UAA'ya sahip şirketler | Yıllık (CAP uygulaması) |
| PNRR Agrivoltaik Çağrı | Tarıma entegre fotovoltaik sistemler için 1,5 MİLYAR EURO | Tarımsal işletmeler | 2025-2026 |
Eksiksiz bir IoT sistemine (sensörler, ağ geçitleri, yazılım, bağlantı) toplam maliyetle 50.000 Avro, Geçiş Planı 5.0 kadar kapsayabilir 22.500 Avro (maliyetin %45'i) vergi kredisi olarak, Su ve gübre tasarrufu sayesinde yatırımın 2-3 yılda geri kazanılması ve insan gücü.
Örnek Olay: Masseria Pugliese 500 Hektar - Zeytin Ağacı ve Buğday
Puglialı bir tarım şirketinin gerçek bir durumu (toplu ve anonimleştirilmiş veriler) 120 toprak nem sensörü, 8 meteoroloji istasyonu ve bir IoT hattını hayata geçirdi otomatik sulama sistemi:
- İlk yatırım: 85.000 EUR (donanım + yazılım + kurulum)
- PNRR ve Geçiş 4.0 katkıları: 38.000 Avro (%44,7)
- Net yatırım: 47.000 Avro
- 1. Yıl su tasarrufu: -%42 (sulama faturasında -18.000 EUR/yıl)
- Gübre tasarrufu: -%22 (-8.500 EUR/yıl)
- Tahıl veriminde artış: +%11 (+15.000 EUR/yıl ciro)
- Geri ödeme süresi: 2,3 yıl
- 5 yıllık yatırım getirisi: %342
Kaçınılması Gereken En İyi Uygulamalar ve Anti-Kalıplar
Birleştirilmiş En İyi Uygulamalar
-
Sürüm oluşturma ile evrim diyagramı: Her zaman bir alan ekle
schema_versionMQTT yüklerinde. Yeni alanlar eklediğinizde sürümü artırın. Tüketici yönetiyor Değişiklikleri bozmadan farklı sürümler. - Sunucudan değil, cihazdan zaman damgası: Zaman damgası aynı olmalıdır Sensör komisyoncuya ulaştığında değil, okuma sırasında. Ağ gecikmesi değişken olabilir ve zaman serilerini tahrif edebilir.
- Varsayılan olarak QoS 1, yalnızca kritik komutlar için QoS 2: QoS 2 dört kat ağ trafiği. Yalnızca çalıştırma komutları için kullanın (valflerin açılması, pompaların çalıştırılması) çoğaltmanın kabul edilemez olduğu yer.
- Zorunlu yerel arabellek: Her ağ geçidinin yerel bir arabelleği olmalıdır (SQLite, NDJSON dosyaları) ağ bağlantı kesintileri sırasında veri toplamaya devam etmek için. Tarımda bağlantılar güvenilmezdir.
- Periyodik sensör kalibrasyonu: Toprak nem sensörleri türetilmiştir zamanla (genellikle yılda %2-5). Altı aylık kalibrasyonları planlayın ve uygulayın Bitişik sensörleri karşılaştırarak otomatik sürüklenme tespiti.
- InfluxDB saklama politikası: Otomatik altörneklemeyi yapılandırma: ham veriler 30 gün, saatlik toplamalar 1 yıl, sınırsız günlük toplamalar. Kaydet Uzun vadeli analizler için gereken ayrıntı düzeyini kaybetmeden depolama.
Kaçınılması Gereken Kritik Anti-Desenler
-
MQTT düz konu (hiyerarşi olmadan): Gibi konular
sensor_001_moistureyerinefarm/campo-nord/sensor-001/soil/moisturekullanmayı imkansız hale getirmek toplu abonelikler ve yüzlerce sensöre ölçeklendirme için joker karakter. - Olay odaklı yerine yoklama: Her N'de MQTT komisyoncusunu sorgulamayın yeni verileri "istemek" için saniyeler. MQTT ve push tabanlı: yayıncı, mevcut olduğunda gönderir abone anında veri alır. Yoklama, protokolün tüm avantajlarını ortadan kaldırır.
- Şema doğrulaması yok: Aracıdan herhangi bir JSON'u kabul etmeden şema doğrulama. Sayılar yerine boş değerler, dizeler gönderen hatalı bir sensör veya aralık dışındaki değerler (sıcaklık -999) doğrulama olmadan veri gölünün tamamını kirletir.
- Bronzdan Altına doğrudan yazma: Gümüş katmanını atlayın ve yazın ham verileri doğrudan toplamalara aktarır. Mantığında bir hata keşfettiğinizde dönüşüm, ham veriler henüz işlenmediğinden her şeyi yeniden işlemeniz gerekir. korunmuş.
- Kimlik doğrulaması olmayan genel komisyoncu: Herkese açık MQTT aracılarını kullanın (test.mosquitto.org) veya TLS/kimlik doğrulaması olmayan özel aracılar. Tarımsal veriler hassas şirket varlıkları; bir rakip konularınıza abone olabilir ve okuyabilir Firmanızın üretim durumunu gerçek zamanlı olarak görüntüleyin.
- Brokerdaki tek başarısızlık noktası: Tek bir MQTT komisyoncusu kümeleme veya yük devretme. EMQX ve HiveMQ yerel kümelemeyi destekler; Sivrisinek gerektirir HA için harici çözümler. Bir aracının kapatılması tüm veri toplamayı engeller.
Ölçeklenebilirlik: 10'dan 10.000'e kadar Sensör
Tanımlanan mimari, değişiklik yapılmadan birkaç bin sensöre kadar doğrusal olarak ölçeklenir mimari. MQTT aracı donanım yapılandırmasına ilişkin tipik sayılar şunlardır:
Yapılandırma için MQTT Broker yeteneği
| Yapılandırma | Komisyoncu | Maksimum Sensörler | Mesaj/sn | Aylık maliyet |
|---|---|---|---|---|
| Geliştirme/Test | Raspberry Pi 4'te Sivrisinek | 100 | 500 | 0 (sahip olunan donanım) |
| KOBİ'ler (50-500 ha) | VPS 4 vCPU/8GB'ta EMQX | 1.000 | 5.000 | 40-80 Avro |
| Büyük Şirket (500+ ha) | EMQX Kümesi 3 düğümleri | 10.000 | 50.000 | 200-400 Avro |
| Kooperatif/İlçe | HiveMQ Enterprise / AWS IoT Core | 100.000+ | Sınırsız | Kullanım başına ödeme |
AWS IoT Core ve Azure IoT Hub, ulusal veya çoklu kuruluş ölçekleri için bulutta yerel seçeneklerdir: ölçeklenebilirliği otomatik olarak yönetiyorlar, %99,99 SLA sunuyorlar ve yerel olarak entegre oluyorlar ilgili ekosistemleriyle (AWS Timestream, Lambda, AWS için S3; Azure Veri Gezgini, Stream Analytics, Azure için Data Lake). Maliyet genellikle milyon mesaj başına 1-5 dolar arasındadır. ilk testler için cömert bir ücretsiz katmanla.
Sonuçlar ve Sonraki Adımlar
Hassas tarım için IoT hattının tamamını sahadaki sensörlerden inşa ettik uygun QoS, Pydantic doğrulaması ile MQTT aracılığıyla yapılandırılmış veri gölüne, Zaman serileri için InfluxDB depolama alanı ve veri gölü için Medallion mimarisi. Python kodu Gösterilen üretime hazırdır ve gerçek sistemlerde bulunan temel kalıpları kapsar.
Yanınıza almanız gereken önemli noktalar:
- MQTT tarımsal IoT için doğru protokoldür: hafif, asenkron ve kararsız bağlantı desteği
- Açık alan için LoRaWAN, seralar için WiFi/RS-485: tek bir evrensel kablosuz protokol yoktur
- Şema doğrulama (Pydantic) isteğe bağlı değildir: sensörler kötü verileri düşündüğünüzden daha sık gönderir
- Madalyon Mimarisi (Bronz/Gümüş/Altın) yeniden işlenebilirliği ve ilerici veri kalitesini garanti eder
- InfluxDB + Grafana, lisans maliyeti olmadan gerçek zamanlı izleme için minimum yığındır
- PNRR ve Geçiş 5.0 teşvikleri, İtalyan tarım işletmelerine yönelik yatırımın %45'ini kapsıyor
FoodTech serisinin bir sonraki makalesinde şablonların nasıl uygulanacağını inceleyeceğiz makine öğrenimi sınırı doğrudan ağ geçidinde mahsul izleme için, Gecikmeyi dakikalardan milisaniyelere düşürmek ve kritik olaylara gerçek zamanlı tepkiler vermek başlangıç aşamasında ani donlar veya mantar saldırıları gibi.
Kaynaklar ve Analizler
- Resmi paho-mqtt belgeleri: eclipse.dev/paho
- EMQX Broker (açık kaynak, kümeye hazır): emqx.io
- InfluxDB 2.x belgeleri: docs.influxdata.com
- Geçiş Planı 5.0 MIMIT: mimit.gov.it
- İSTAT Hassas Tarım 2024: istat.it
- Databricks Madalyon Mimarisi: databricks.com
Diğer Serilerdeki İlgili Makaleler
- MLOps: Getiri tahmini makine öğrenimi modelleri üretimde nasıl dağıtılır?
- Veri ve Yapay Zeka İşi - Üretimde Yapay Zeka: Kafka ve OPC-UA ile endüstriye uygulanan IoT modelleri
- Yapay Zeka Mühendisliği: Tarım uzmanlarına yönelik sanal asistanlar için tarımsal belgelere ilişkin RAG







