""" ThingsBoardClient: live telemetry client for the Seymour vineyard at web.seymouragri.com. Device layout ------------- TREATMENT area (rows 501–502, under solar panels): Air2, Air3, Air4 — microclimate sensors under the panels Crop3, Crop5, Crop6, Crop7 — fruiting-zone crop sensors (per panel position) Soil1, Soil3, Soil5, Soil6 — root-zone soil probes Irrigation1 — irrigation flow/volume/quality logger Thermocouples-1 — panel surface temperature (4 positions) REFERENCE area (rows 503–504, open sky, no panels): Crop1, Crop2, Crop4 — fruiting-zone crop sensors (no shading) Soil2, Soil4, Soil7, Soil9 — root-zone soil probes Thermocouples-2 — structural/ambient thermocouple reference AMBIENT (site-level outdoor baseline): Air1 — outdoor climate station (above canopy, no panel) Credentials (env vars or .env): THINGSBOARD_HOST — default https://web.seymouragri.com THINGSBOARD_USERNAME — tenant login email THINGSBOARD_PASSWORD — tenant login password THINGSBOARD_TOKEN — pre-generated JWT (takes priority over user/pass) """ from __future__ import annotations import math import os import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Optional, Tuple import pandas as pd import requests # --------------------------------------------------------------------------- # Enumerations # --------------------------------------------------------------------------- class VineArea(str, Enum): TREATMENT = "treatment" # under solar panels REFERENCE = "reference" # open sky, no panels AMBIENT = "ambient" # site-level outdoor baseline # --------------------------------------------------------------------------- # Device registry # --------------------------------------------------------------------------- @dataclass(frozen=True) class DeviceInfo: uuid: str device_id: int area: VineArea row: Optional[int] label: str #: Full device registry mapping short name → DeviceInfo. #: UUIDs are from devices.csv in the Research/PV_Vine_Tradeoff repository. DEVICE_REGISTRY: Dict[str, DeviceInfo] = { "Air1": DeviceInfo( uuid="373041f0-089a-11ef-9126-b746c27d34bd", device_id=4, area=VineArea.AMBIENT, row=None, label="Outdoor Climate (ambient baseline)", ), "Air2": DeviceInfo( uuid="37bf89a0-089a-11ef-9126-b746c27d34bd", device_id=5, area=VineArea.TREATMENT, row=501, label="Indoor Climate Row 501 (under panels)", ), "Air3": DeviceInfo( uuid="3860aba0-089a-11ef-9126-b746c27d34bd", device_id=6, area=VineArea.TREATMENT, row=502, label="Indoor Climate Row 502 (under panels)", ), "Air4": DeviceInfo( uuid="04452660-7114-11ef-9360-f1ed9d9dc643", device_id=7, area=VineArea.TREATMENT, row=502, label="Treatment Row 502 North (under panels)", ), "Crop1": DeviceInfo( uuid="39224df0-089a-11ef-9126-b746c27d34bd", device_id=8, area=VineArea.REFERENCE, row=503, label="Reference crop Row 503", ), "Crop2": DeviceInfo( uuid="aa0d9970-7113-11ef-9360-f1ed9d9dc643", device_id=9, area=VineArea.REFERENCE, row=503, label="Control crop Row 503", ), "Crop3": DeviceInfo( uuid="859b3ce0-29dd-11f0-96bc-55874793181d", device_id=10, area=VineArea.TREATMENT, row=502, label="Treatment 502 – West Bottom", ), "Crop4": DeviceInfo( uuid="889765e0-29dd-11f0-96bc-55874793181d", device_id=11, area=VineArea.REFERENCE, row=502, label="Control crop Row 502 (reference vine)", ), "Crop5": DeviceInfo( uuid="8b092930-29dd-11f0-96bc-55874793181d", device_id=12, area=VineArea.TREATMENT, row=502, label="Treatment 502 – East Upper", ), "Crop6": DeviceInfo( uuid="8cce31c0-29dd-11f0-96bc-55874793181d", device_id=13, area=VineArea.TREATMENT, row=502, label="Treatment 502 – East Bottom", ), "Crop7": DeviceInfo( uuid="8e7440a0-29dd-11f0-96bc-55874793181d", device_id=14, area=VineArea.TREATMENT, row=502, label="Treatment 502 – West Upper", ), "Soil1": DeviceInfo( uuid="3586b0a0-089a-11ef-9126-b746c27d34bd", device_id=16, area=VineArea.TREATMENT, row=502, label="Soil Row 502 (treatment)", ), "Soil2": DeviceInfo( uuid="35cda4b0-089a-11ef-9126-b746c27d34bd", device_id=17, area=VineArea.REFERENCE, row=503, label="Soil Row 503 (reference)", ), "Soil3": DeviceInfo( uuid="3634caf0-089a-11ef-9126-b746c27d34bd", device_id=18, area=VineArea.TREATMENT, row=501, label="Soil Row 501 (treatment)", ), "Soil4": DeviceInfo( uuid="36a4cad0-089a-11ef-9126-b746c27d34bd", device_id=19, area=VineArea.REFERENCE, row=504, label="Soil Row 504 Control", ), "Soil5": DeviceInfo( uuid="77d55280-70e7-11ef-9360-f1ed9d9dc643", device_id=20, area=VineArea.TREATMENT, row=502, label="Treatment Row 502 South", ), "Soil6": DeviceInfo( uuid="7e4e4630-70e7-11ef-9360-f1ed9d9dc643", device_id=21, area=VineArea.TREATMENT, row=502, label="Treatment Row 502 North", ), "Soil7": DeviceInfo( uuid="842e5540-70e7-11ef-9360-f1ed9d9dc643", device_id=22, area=VineArea.REFERENCE, row=504, label="Control 504 South", ), "Soil9": DeviceInfo( uuid="91e44ff0-70e7-11ef-9360-f1ed9d9dc643", device_id=23, area=VineArea.REFERENCE, row=504, label="Control 504 South (2nd probe)", ), "Irrigation1": DeviceInfo( uuid="3a066c60-089a-11ef-9126-b746c27d34bd", device_id=15, area=VineArea.TREATMENT, row=502, label="Irrigation Row 502", ), "Thermocouples1": DeviceInfo( uuid="72ce88f0-c548-11ef-8bc2-fdab9f3349b7", device_id=2, area=VineArea.TREATMENT, row=502, label="Panel surface temps Treatment 502", ), "Thermocouples2": DeviceInfo( uuid="03e40ba0-cc0e-11ef-a2e9-55874793181d", device_id=3, area=VineArea.REFERENCE, row=None, label="Panel/structure surface temps Reference", ), # Tracker controllers (panel angle + mode) "Tracker501": DeviceInfo( uuid="aac06e50-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=501, label="Tracker row 501", ), "Tracker502": DeviceInfo( uuid="b99bd630-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=502, label="Tracker row 502", ), "Tracker503": DeviceInfo( uuid="caffe4c0-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=503, label="Tracker row 503", ), "Tracker509": DeviceInfo( uuid="bacf7c50-fcdc-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=509, label="Tracker row 509", ), } # --------------------------------------------------------------------------- # Asset registry (non-device entities — e.g. the plant-level energy asset) # --------------------------------------------------------------------------- @dataclass(frozen=True) class AssetInfo: uuid: str label: str ASSET_REGISTRY: Dict[str, AssetInfo] = { "Plant": AssetInfo( uuid="dc94ddb0-dbe6-11f0-9352-a53ca0b6a212", label="Yeruham Vineyard — plant-level energy", ), } ENERGY_KEYS: List[str] = ["power", "production"] TRACKER_KEYS: List[str] = ["angle", "manualMode", "setAngle", "setMode"] # --------------------------------------------------------------------------- # Telemetry key sets per device type # --------------------------------------------------------------------------- AIR_KEYS: List[str] = [ "airTemperature", "leafTemperature", "VPD", "CO2", "PAR", "DLI", "airHumidity", "windSpeed", "windAngle", "rain", "airPressure", "dewTemperature", "NDVI", "PRI", "airLeafDeltaT", ] CROP_KEYS: List[str] = [ "PAR", "leafTemperature", "NDVI", "PRI", "DLI", "PARAvg1H", "PARAvg24H", ] SOIL_KEYS: List[str] = [ "soilMoisture", "soilMoisture2", "soilTemperature", "soilTemperature2", "soilBulkEC", "soilpH", ] IRRIGATION_KEYS: List[str] = [ "irrigationVolume", "irrigationMinutes", "irrigationFlowRate", "irrigationEC", "irrigationPH", "waterTemperature", "irrigationCycleVolume", "irrigationCycleMinutes", ] THERMOCOUPLE_KEYS: List[str] = [ "thermocoupleTemperature_1", "thermocoupleTemperature_2", "thermocoupleTemperature_3", "thermocoupleTemperature_4", ] # --------------------------------------------------------------------------- # VineSnapshot dataclass # --------------------------------------------------------------------------- @dataclass class VineSnapshot: """ Aggregated real-time vine state from all ThingsBoard sensors. Fields are grouped by area: - ambient : Air1 (outdoor climate, site-level baseline) - treatment : under solar panels (rows 501–502) - reference : open sky / no panels (rows 503–504) None means the sensor did not return a value. """ snapshot_ts: datetime staleness_minutes: float # --- Ambient (Air1, outdoor baseline) --- ambient_temp_c: Optional[float] = None ambient_humidity_pct: Optional[float] = None ambient_wind_speed_ms: Optional[float] = None ambient_wind_angle_deg: Optional[float] = None ambient_rain_mm: Optional[float] = None # --- Treatment microclimate (avg of Air2 / Air3 / Air4) --- treatment_air_temp_c: Optional[float] = None treatment_leaf_temp_c: Optional[float] = None treatment_vpd_kpa: Optional[float] = None treatment_co2_ppm: Optional[float] = None treatment_par_umol: Optional[float] = None treatment_dli_mol_m2: Optional[float] = None treatment_ndvi: Optional[float] = None treatment_pri: Optional[float] = None treatment_air_leaf_delta_t: Optional[float] = None # --- Treatment crop (avg of Crop3 / Crop5 / Crop6 / Crop7) --- treatment_crop_par_umol: Optional[float] = None treatment_crop_leaf_temp_c: Optional[float] = None treatment_crop_ndvi: Optional[float] = None treatment_crop_dli_mol_m2: Optional[float] = None treatment_crop_par_avg1h: Optional[float] = None # Per-panel-position readings {position_label: {par, leaf_temp, ndvi}} treatment_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict) # --- Reference crop (avg of Crop1 / Crop2 / Crop4) --- reference_crop_par_umol: Optional[float] = None reference_crop_leaf_temp_c: Optional[float] = None reference_crop_ndvi: Optional[float] = None reference_crop_dli_mol_m2: Optional[float] = None reference_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict) # --- PAR shading ratio: treatment_crop_par / reference_crop_par --- par_shading_ratio: Optional[float] = None # <1 = panels are shading # --- Treatment soil (avg of Soil1 / Soil3 / Soil5 / Soil6) --- treatment_soil_moisture_pct: Optional[float] = None treatment_soil_temp_c: Optional[float] = None treatment_soil_ec_ds_m: Optional[float] = None treatment_soil_ph: Optional[float] = None # --- Reference soil (avg of Soil2 / Soil4 / Soil7 / Soil9) --- reference_soil_moisture_pct: Optional[float] = None reference_soil_temp_c: Optional[float] = None # --- Irrigation (Irrigation1, row 502 treatment) --- irrigation_last_volume_l: Optional[float] = None irrigation_last_minutes: Optional[float] = None irrigation_ec: Optional[float] = None irrigation_ph: Optional[float] = None water_temp_c: Optional[float] = None # --- Panel surface temperatures --- treatment_panel_temp_c: Optional[float] = None # avg Thermocouples1 positions 1-4 reference_panel_temp_c: Optional[float] = None # avg Thermocouples2 positions 1-4 def to_advisor_text(self) -> str: """Format snapshot for inclusion in an AI advisory prompt.""" age = f"{self.staleness_minutes:.0f}" if self.staleness_minutes < 120 else ">{:.0f}".format(self.staleness_minutes) lines = [f"VINE STATE (ThingsBoard sensors, ~{age} min ago):"] lines.append(" TREATMENT area (rows 501-502, under solar panels):") if self.treatment_air_temp_c is not None: lines.append(f" Air temperature: {self.treatment_air_temp_c:.1f} C") if self.treatment_leaf_temp_c is not None: lines.append(f" Leaf temperature: {self.treatment_leaf_temp_c:.1f} C") if self.treatment_air_leaf_delta_t is not None: lines.append(f" Air-leaf delta-T: {self.treatment_air_leaf_delta_t:+.1f} C (proxy for heat stress)") if self.treatment_vpd_kpa is not None: lines.append(f" VPD: {self.treatment_vpd_kpa:.2f} kPa") if self.treatment_co2_ppm is not None: lines.append(f" CO2: {self.treatment_co2_ppm:.0f} ppm") if self.treatment_crop_par_umol is not None: lines.append(f" Fruiting-zone PAR: {self.treatment_crop_par_umol:.0f} umol/m2/s (avg of Crop3/5/6/7)") if self.treatment_crop_dli_mol_m2 is not None: lines.append(f" DLI today so far: {self.treatment_crop_dli_mol_m2:.1f} mol/m2/day") if self.treatment_crop_ndvi is not None: lines.append(f" Canopy NDVI: {self.treatment_crop_ndvi:.3f}") if self.treatment_soil_moisture_pct is not None: lines.append(f" Soil moisture: {self.treatment_soil_moisture_pct:.1f}% (avg Soil1/3/5/6)") if self.treatment_soil_temp_c is not None: lines.append(f" Soil temperature: {self.treatment_soil_temp_c:.1f} C") if self.treatment_panel_temp_c is not None: lines.append(f" Panel surface temp: {self.treatment_panel_temp_c:.1f} C") if self.treatment_crop_by_position: lines.append(" Per-position PAR (Crop sensors):") for pos, vals in self.treatment_crop_by_position.items(): par = vals.get("par") lt = vals.get("leaf_temp") par_str = f"{par:.0f} umol/m2/s" if par is not None else "N/A" lt_str = f" | leaf {lt:.1f} C" if lt is not None else "" lines.append(f" {pos}: PAR {par_str}{lt_str}") lines.append("") lines.append(" REFERENCE area (rows 503-504, open sky, no panels):") if self.reference_crop_par_umol is not None: lines.append(f" Fruiting-zone PAR: {self.reference_crop_par_umol:.0f} umol/m2/s (avg of Crop1/2/4)") if self.reference_crop_leaf_temp_c is not None: lines.append(f" Leaf temperature: {self.reference_crop_leaf_temp_c:.1f} C") if self.reference_crop_ndvi is not None: lines.append(f" Canopy NDVI: {self.reference_crop_ndvi:.3f}") if self.reference_soil_moisture_pct is not None: lines.append(f" Soil moisture: {self.reference_soil_moisture_pct:.1f}% (avg Soil2/4/7/9)") if self.reference_crop_by_position: lines.append(" Per-position PAR (Crop sensors):") for pos, vals in self.reference_crop_by_position.items(): par = vals.get("par") par_str = f"{par:.0f} umol/m2/s" if par is not None else "N/A" lines.append(f" {pos}: PAR {par_str}") if self.par_shading_ratio is not None: reduction_pct = (1 - self.par_shading_ratio) * 100 lines.append("") lines.append(f" PAR shading ratio (treatment/reference): {self.par_shading_ratio:.2f}" f" ({reduction_pct:.0f}% reduction by panels)") if self.ambient_temp_c is not None: lines.append("") lines.append(" AMBIENT (outdoor baseline, Air1):") lines.append(f" Air temperature: {self.ambient_temp_c:.1f} C") if self.ambient_wind_speed_ms is not None: lines.append(f" Wind speed: {self.ambient_wind_speed_ms:.1f} m/s") if self.ambient_rain_mm is not None and self.ambient_rain_mm > 0: lines.append(f" Rain: {self.ambient_rain_mm:.1f} mm") any_irrigation = any(v is not None for v in [ self.irrigation_last_volume_l, self.irrigation_last_minutes, self.irrigation_ec, self.irrigation_ph, ]) if any_irrigation: lines.append("") lines.append(" IRRIGATION (Irrigation1, row 502):") if self.irrigation_last_volume_l is not None: lines.append(f" Last cycle volume: {self.irrigation_last_volume_l:.0f} L") if self.irrigation_last_minutes is not None: lines.append(f" Duration: {self.irrigation_last_minutes:.0f} min") if self.irrigation_ec is not None: lines.append(f" EC: {self.irrigation_ec:.2f} dS/m") if self.irrigation_ph is not None: lines.append(f" pH: {self.irrigation_ph:.1f}") if self.water_temp_c is not None: lines.append(f" Water temperature: {self.water_temp_c:.1f} C") return "\n".join(lines) def to_dict(self) -> Dict[str, Any]: """Return a flat dict suitable for JSON serialization (e.g., chatbot tool result).""" out: Dict[str, Any] = { "snapshot_ts": self.snapshot_ts.isoformat(), "staleness_minutes": round(self.staleness_minutes, 1), } for attr in ( "ambient_temp_c", "ambient_humidity_pct", "ambient_wind_speed_ms", "ambient_wind_angle_deg", "ambient_rain_mm", "treatment_air_temp_c", "treatment_leaf_temp_c", "treatment_vpd_kpa", "treatment_co2_ppm", "treatment_par_umol", "treatment_dli_mol_m2", "treatment_ndvi", "treatment_pri", "treatment_air_leaf_delta_t", "treatment_crop_par_umol", "treatment_crop_leaf_temp_c", "treatment_crop_ndvi", "treatment_crop_dli_mol_m2", "treatment_crop_par_avg1h", "reference_crop_par_umol", "reference_crop_leaf_temp_c", "reference_crop_ndvi", "reference_crop_dli_mol_m2", "par_shading_ratio", "treatment_soil_moisture_pct", "treatment_soil_temp_c", "treatment_soil_ec_ds_m", "treatment_soil_ph", "reference_soil_moisture_pct", "reference_soil_temp_c", "irrigation_last_volume_l", "irrigation_last_minutes", "irrigation_ec", "irrigation_ph", "water_temp_c", "treatment_panel_temp_c", "reference_panel_temp_c", ): val = getattr(self, attr) out[attr] = round(val, 3) if val is not None else None out["treatment_crop_by_position"] = self.treatment_crop_by_position out["reference_crop_by_position"] = self.reference_crop_by_position return out # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- @dataclass class ThingsBoardConfig: """ThingsBoard connection settings. Data retrieval always uses prod (Seymour).""" # Prod only — test (eu.thingsboard.cloud) is for deploying apps, not data host: str = os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/") username: Optional[str] = ( os.environ.get("THINGSBOARD_USERNAME") or os.environ.get("TB_USERNAME") ) password: Optional[str] = ( os.environ.get("THINGSBOARD_PASSWORD") or os.environ.get("TB_PASSWORD") ) token: Optional[str] = os.environ.get("THINGSBOARD_TOKEN") # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- class ThingsBoardClient: """ Minimal ThingsBoard client for the Seymour vineyard. Authentication -------------- Provide THINGSBOARD_TOKEN for a pre-generated JWT, or THINGSBOARD_USERNAME + THINGSBOARD_PASSWORD for login-based auth. Tokens are cached and refreshed automatically before they expire. Usage ----- client = ThingsBoardClient() snapshot = client.get_vine_snapshot() print(snapshot.to_advisor_text()) """ _TOKEN_TTL_SECONDS = 8_000 # ThingsBoard default is 9000 s; be conservative def __init__(self, config: Optional[ThingsBoardConfig] = None) -> None: self.config = config or ThingsBoardConfig() self._session = requests.Session() self._session.headers.update({"Content-Type": "application/json"}) self._jwt: Optional[str] = None self._jwt_expires_at: float = 0.0 # ------------------------------------------------------------------ # Authentication # ------------------------------------------------------------------ def _ensure_jwt(self) -> str: """Return a valid JWT, obtaining or refreshing as needed.""" if self.config.token: if "X-Authorization" not in self._session.headers: self._session.headers["X-Authorization"] = f"Bearer {self.config.token}" return self.config.token if self._jwt and time.monotonic() < self._jwt_expires_at: return self._jwt if not self.config.username or not self.config.password: raise RuntimeError( "ThingsBoard authentication requires THINGSBOARD_TOKEN " "or both THINGSBOARD_USERNAME and THINGSBOARD_PASSWORD." ) url = f"{self.config.host.rstrip('/')}/api/auth/login" resp = self._session.post( url, json={"username": self.config.username, "password": self.config.password}, timeout=10, ) resp.raise_for_status() token = resp.json()["token"] self._jwt = token self._jwt_expires_at = time.monotonic() + self._TOKEN_TTL_SECONDS self._session.headers["X-Authorization"] = f"Bearer {token}" return token # ------------------------------------------------------------------ # Low-level API calls # ------------------------------------------------------------------ # ------------------------------------------------------------------ # Shared low-level helpers (DEVICE and ASSET use the same REST API, # differing only in the entity-type path segment). # ------------------------------------------------------------------ def _fetch_latest_raw( self, entity_type: str, uuid: str, keys: List[str], ) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]: """Fetch most-recent telemetry for any entity type (DEVICE or ASSET).""" self._ensure_jwt() url = ( f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}" f"/{uuid}/values/timeseries" ) resp = self._session.get(url, params={"keys": ",".join(keys)}, timeout=15) resp.raise_for_status() raw: Dict[str, List[Dict]] = resp.json() values: Dict[str, Optional[float]] = {} newest_ts_ms: Optional[int] = None for key in keys: entries = raw.get(key, []) if entries: values[key] = _safe_float(entries[0]["value"]) ts_ms = entries[0].get("ts") if ts_ms and (newest_ts_ms is None or ts_ms > newest_ts_ms): newest_ts_ms = ts_ms else: values[key] = None newest_ts = ( datetime.fromtimestamp(newest_ts_ms / 1000, tz=timezone.utc) if newest_ts_ms else None ) return values, newest_ts def _fetch_timeseries_raw( self, entity_type: str, uuid: str, keys: List[str], start: datetime, end: datetime, limit: int = 1000, interval_ms: int = 900_000, agg: str = "NONE", ) -> pd.DataFrame: """Fetch time-series telemetry for any entity type (DEVICE or ASSET).""" self._ensure_jwt() start_ms = int(start.timestamp() * 1000) end_ms = int(end.timestamp() * 1000) url = ( f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}" f"/{uuid}/values/timeseries" ) params: Dict[str, Any] = { "keys": ",".join(keys), "startTs": start_ms, "endTs": end_ms, "limit": limit, "agg": agg, } if agg != "NONE": params["interval"] = interval_ms resp = self._session.get(url, params=params, timeout=30) resp.raise_for_status() raw: Dict[str, List[Dict]] = resp.json() frames: Dict[str, pd.Series] = {} for key, entries in raw.items(): if key in keys and entries: ts = pd.to_datetime([e["ts"] for e in entries], unit="ms", utc=True) vals = [_safe_float(e["value"]) for e in entries] frames[key] = pd.Series(vals, index=ts) if not frames: return pd.DataFrame() return pd.DataFrame(frames).sort_index() # ------------------------------------------------------------------ # Device API (public) # ------------------------------------------------------------------ def _fetch_latest( self, device_name: str, keys: List[str], ) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]: """Fetch most-recent values for a named device.""" info = DEVICE_REGISTRY[device_name] return self._fetch_latest_raw("DEVICE", info.uuid, keys) def get_latest_telemetry( self, device_name: str, keys: List[str], ) -> Dict[str, Optional[float]]: """Return the most recent value for each key. Missing keys return None.""" if device_name not in DEVICE_REGISTRY: raise KeyError( f"Unknown device: {device_name!r}. " f"Valid names: {sorted(DEVICE_REGISTRY)}" ) values, _ = self._fetch_latest(device_name, keys) return values def get_timeseries( self, device_name: str, keys: List[str], start: datetime, end: datetime, limit: int = 1000, interval_ms: int = 900_000, # 15 minutes agg: str = "NONE", ) -> pd.DataFrame: """Fetch time-series telemetry for a named device.""" if device_name not in DEVICE_REGISTRY: raise KeyError(f"Unknown device: {device_name!r}") info = DEVICE_REGISTRY[device_name] return self._fetch_timeseries_raw( "DEVICE", info.uuid, keys, start, end, limit, interval_ms, agg, ) # ------------------------------------------------------------------ # Asset API (public) # ------------------------------------------------------------------ def get_asset_timeseries( self, asset_name: str, keys: List[str], start: datetime, end: datetime, limit: int = 1000, interval_ms: int = 3_600_000, # 1 hour agg: str = "SUM", ) -> pd.DataFrame: """Fetch time-series from a ThingsBoard ASSET (e.g. Plant energy).""" if asset_name not in ASSET_REGISTRY: raise KeyError(f"Unknown asset: {asset_name!r}. Valid: {sorted(ASSET_REGISTRY)}") info = ASSET_REGISTRY[asset_name] return self._fetch_timeseries_raw( "ASSET", info.uuid, keys, start, end, limit, interval_ms, agg, ) def get_asset_latest( self, asset_name: str, keys: List[str], ) -> Dict[str, Optional[float]]: """Fetch latest telemetry from a ThingsBoard ASSET.""" if asset_name not in ASSET_REGISTRY: raise KeyError(f"Unknown asset: {asset_name!r}") info = ASSET_REGISTRY[asset_name] values, _ = self._fetch_latest_raw("ASSET", info.uuid, keys) return values # ------------------------------------------------------------------ # Device commands (RPC + attribute writes) # ------------------------------------------------------------------ def send_rpc_command( self, device_name: str, method: str, params: Any = None, timeout: float = 10.0, ) -> Dict[str, Any]: """Send a two-way RPC command to a device. Uses POST /api/plugins/rpc/twoway/{deviceId}. Falls back to one-way if two-way returns 404. """ if device_name not in DEVICE_REGISTRY: raise KeyError(f"Unknown device: {device_name!r}") info = DEVICE_REGISTRY[device_name] self._ensure_jwt() payload = {"method": method, "params": params if params is not None else {}} # Try two-way RPC first url = ( f"{self.config.host.rstrip('/')}/api/plugins/rpc/twoway" f"/{info.uuid}" ) resp = self._session.post(url, json=payload, timeout=timeout) if resp.status_code in (404, 405): # Fallback to one-way RPC url = ( f"{self.config.host.rstrip('/')}/api/plugins/rpc/oneway" f"/{info.uuid}" ) resp = self._session.post(url, json=payload, timeout=timeout) resp.raise_for_status() try: return resp.json() except Exception: return {"status": "ok", "status_code": resp.status_code} def set_device_attributes( self, device_name: str, attributes: Dict[str, Any], scope: str = "SHARED_SCOPE", ) -> None: """Write server-side attributes to a device. Uses POST /api/plugins/telemetry/DEVICE/{id}/attributes/{scope}. This is an alternative to RPC for setting tracker targets. """ if device_name not in DEVICE_REGISTRY: raise KeyError(f"Unknown device: {device_name!r}") info = DEVICE_REGISTRY[device_name] self._ensure_jwt() url = ( f"{self.config.host.rstrip('/')}/api/plugins/telemetry/DEVICE" f"/{info.uuid}/attributes/{scope}" ) resp = self._session.post(url, json=attributes, timeout=10) resp.raise_for_status() # ------------------------------------------------------------------ # High-level vine snapshot # ------------------------------------------------------------------ # Dashboard-only: 4 devices for farmer view (temp, soil, irrigation) _DASHBOARD_FETCH_PLAN: Dict[str, List[str]] = { "Air1": AIR_KEYS, # ambient weather "Air2": AIR_KEYS, # treatment air "Soil1": SOIL_KEYS, # treatment soil "Irrigation1": IRRIGATION_KEYS, } # Light mode: 6 devices (adds crop PAR for chatbot/detailed view) _LIGHT_FETCH_PLAN: Dict[str, List[str]] = { "Air1": AIR_KEYS, # ambient "Air2": AIR_KEYS, # treatment air (one representative) "Crop1": CROP_KEYS, # reference crop "Crop3": CROP_KEYS, # treatment crop "Soil1": SOIL_KEYS, # treatment soil "Irrigation1": IRRIGATION_KEYS, } _FULL_FETCH_PLAN: Dict[str, List[str]] = { "Air1": AIR_KEYS, "Air2": AIR_KEYS, "Air3": AIR_KEYS, "Air4": AIR_KEYS, "Crop1": CROP_KEYS, "Crop2": CROP_KEYS, "Crop3": CROP_KEYS, "Crop4": CROP_KEYS, "Crop5": CROP_KEYS, "Crop6": CROP_KEYS, "Crop7": CROP_KEYS, "Soil1": SOIL_KEYS, "Soil2": SOIL_KEYS, "Soil3": SOIL_KEYS, "Soil4": SOIL_KEYS, "Soil5": SOIL_KEYS, "Soil6": SOIL_KEYS, "Soil7": SOIL_KEYS, "Soil9": SOIL_KEYS, "Irrigation1": IRRIGATION_KEYS, "Thermocouples1": THERMOCOUPLE_KEYS, "Thermocouples2": THERMOCOUPLE_KEYS, } def get_vine_snapshot(self, light: bool = False, mode: Optional[str] = None) -> VineSnapshot: """ Fetch latest telemetry from all relevant devices and return an aggregated VineSnapshot distinguishing treatment vs reference areas. Uses a thread pool to parallelise HTTP requests. Individual device failures are silently skipped (returns None fields). Parameters ---------- light : bool If True, fetch only ~6 key devices instead of all 21. mode : str, optional "dashboard" = 4 devices only (air + soil + irrigation). Overrides `light` when set. """ if mode == "dashboard": fetch_plan = self._DASHBOARD_FETCH_PLAN elif light: fetch_plan = self._LIGHT_FETCH_PLAN else: fetch_plan = self._FULL_FETCH_PLAN # Ensure auth token before spawning threads (avoid race on login) self._ensure_jwt() raw_results: Dict[str, Dict[str, Optional[float]]] = {} newest_ts_overall: Optional[datetime] = None with ThreadPoolExecutor(max_workers=8) as pool: future_map = { pool.submit(self._fetch_latest, name, keys): name for name, keys in fetch_plan.items() } for future in as_completed(future_map, timeout=25): name = future_map[future] try: values, ts = future.result() raw_results[name] = values if ts and (newest_ts_overall is None or ts > newest_ts_overall): newest_ts_overall = ts except Exception: raw_results[name] = {} now = datetime.now(tz=timezone.utc) staleness = ( (now - newest_ts_overall).total_seconds() / 60 if newest_ts_overall else float("nan") ) # ---------- Ambient (Air1) ---------- air1 = raw_results.get("Air1", {}) # ---------- Treatment microclimate (Air2/3/4) ---------- treatment_air = [raw_results.get(d, {}) for d in ("Air2", "Air3", "Air4")] # ---------- Treatment crop by position ---------- position_labels = { "Crop3": "502-west-bottom", "Crop5": "502-east-upper", "Crop6": "502-east-bottom", "Crop7": "502-west-upper", } treatment_crop_devs = { label: raw_results.get(dev, {}) for dev, label in position_labels.items() } treatment_crop_by_pos: Dict[str, Dict[str, Optional[float]]] = { label: { "par": v.get("PAR"), "leaf_temp": v.get("leafTemperature"), "ndvi": v.get("NDVI"), "dli": v.get("DLI"), } for label, v in treatment_crop_devs.items() } # ---------- Reference crop by position ---------- ref_position_labels = { "Crop1": "503-ref", "Crop2": "503-control", "Crop4": "502-control", } reference_crop_devs = { label: raw_results.get(dev, {}) for dev, label in ref_position_labels.items() } reference_crop_by_pos: Dict[str, Dict[str, Optional[float]]] = { label: { "par": v.get("PAR"), "leaf_temp": v.get("leafTemperature"), "ndvi": v.get("NDVI"), "dli": v.get("DLI"), } for label, v in reference_crop_devs.items() } # ---------- Soil averages ---------- treatment_soil_devs = [raw_results.get(d, {}) for d in ("Soil1", "Soil3", "Soil5", "Soil6")] reference_soil_devs = [raw_results.get(d, {}) for d in ("Soil2", "Soil4", "Soil7", "Soil9")] def _avg_soil_moisture(devs: List[Dict]) -> Optional[float]: all_vals = [] for d in devs: for k in ("soilMoisture", "soilMoisture2"): if d.get(k) is not None: all_vals.append(d[k]) lo, hi = _BOUNDS["soil_moisture"] return _bounded_avg(lo, hi, *all_vals) if all_vals else None def _avg_soil_temp(devs: List[Dict]) -> Optional[float]: all_vals = [] for d in devs: for k in ("soilTemperature", "soilTemperature2"): if d.get(k) is not None: all_vals.append(d[k]) lo, hi = _BOUNDS["soil_temp"] return _bounded_avg(lo, hi, *all_vals) if all_vals else None # ---------- Panel temps ---------- tc1 = raw_results.get("Thermocouples1", {}) tc2 = raw_results.get("Thermocouples2", {}) irr = raw_results.get("Irrigation1", {}) # ---------- PAR shading ratio (bounded to reject sensor faults) ---------- t_par = _bounded_avg(*_BOUNDS["par"], *[v.get("PAR") for v in treatment_crop_devs.values()]) r_par = _bounded_avg(*_BOUNDS["par"], *[v.get("PAR") for v in reference_crop_devs.values()]) par_ratio: Optional[float] = None if t_par is not None and r_par is not None and r_par > 0: par_ratio = t_par / r_par snapshot = VineSnapshot( snapshot_ts=now, staleness_minutes=staleness, # Ambient — apply bounds to catch single-device faults too ambient_temp_c=_bounded_avg(*_BOUNDS["air_temp"], air1.get("airTemperature")), ambient_humidity_pct=_bounded_avg(0, 100, air1.get("airHumidity")), ambient_wind_speed_ms=_bounded_avg(0, 60, air1.get("windSpeed")), ambient_wind_angle_deg=_bounded_avg(0, 360, air1.get("windAngle")), ambient_rain_mm=_bounded_avg(0, 500, air1.get("rain")), # Treatment climate — bounded to reject sensor faults treatment_air_temp_c=_bounded_avg(*_BOUNDS["air_temp"], *[d.get("airTemperature") for d in treatment_air]), treatment_leaf_temp_c=_bounded_avg(*_BOUNDS["leaf_temp"], *[d.get("leafTemperature") for d in treatment_air]), treatment_vpd_kpa=_bounded_avg(*_BOUNDS["vpd"], *[d.get("VPD") for d in treatment_air]), treatment_co2_ppm=_bounded_avg(*_BOUNDS["co2"], *[d.get("CO2") for d in treatment_air]), treatment_par_umol=_bounded_avg(*_BOUNDS["par"], *[d.get("PAR") for d in treatment_air]), treatment_dli_mol_m2=_bounded_avg(*_BOUNDS["dli"], *[d.get("DLI") for d in treatment_air]), treatment_ndvi=_bounded_avg(*_BOUNDS["ndvi"], *[d.get("NDVI") for d in treatment_air]), treatment_pri=_bounded_avg(*_BOUNDS["pri"], *[d.get("PRI") for d in treatment_air]), treatment_air_leaf_delta_t=_bounded_avg(-20, 20, *[d.get("airLeafDeltaT") for d in treatment_air]), # Treatment crop treatment_crop_par_umol=t_par, treatment_crop_leaf_temp_c=_bounded_avg( *_BOUNDS["leaf_temp"], *[v.get("leafTemperature") for v in treatment_crop_devs.values()] ), treatment_crop_ndvi=_bounded_avg( *_BOUNDS["ndvi"], *[v.get("NDVI") for v in treatment_crop_devs.values()] ), treatment_crop_dli_mol_m2=_bounded_avg( *_BOUNDS["dli"], *[v.get("DLI") for v in treatment_crop_devs.values()] ), treatment_crop_par_avg1h=_bounded_avg( *_BOUNDS["par"], *[v.get("PARAvg1H") for v in treatment_crop_devs.values()] ), treatment_crop_by_position=treatment_crop_by_pos, # Reference crop reference_crop_par_umol=r_par, reference_crop_leaf_temp_c=_bounded_avg( *_BOUNDS["leaf_temp"], *[v.get("leafTemperature") for v in reference_crop_devs.values()] ), reference_crop_ndvi=_bounded_avg( *_BOUNDS["ndvi"], *[v.get("NDVI") for v in reference_crop_devs.values()] ), reference_crop_dli_mol_m2=_bounded_avg( *_BOUNDS["dli"], *[v.get("DLI") for v in reference_crop_devs.values()] ), reference_crop_by_position=reference_crop_by_pos, par_shading_ratio=par_ratio, # Treatment soil treatment_soil_moisture_pct=_avg_soil_moisture(treatment_soil_devs), treatment_soil_temp_c=_avg_soil_temp(treatment_soil_devs), treatment_soil_ec_ds_m=_safe_avg(*[d.get("soilBulkEC") for d in treatment_soil_devs]), treatment_soil_ph=_safe_avg(*[d.get("soilpH") for d in treatment_soil_devs]), # Reference soil reference_soil_moisture_pct=_avg_soil_moisture(reference_soil_devs), reference_soil_temp_c=_avg_soil_temp(reference_soil_devs), # Irrigation irrigation_last_volume_l=irr.get("irrigationCycleVolume") or irr.get("irrigationVolume"), irrigation_last_minutes=irr.get("irrigationCycleMinutes") or irr.get("irrigationMinutes"), irrigation_ec=irr.get("irrigationEC"), irrigation_ph=irr.get("irrigationPH"), water_temp_c=irr.get("waterTemperature"), # Panel temps treatment_panel_temp_c=_bounded_avg( *_BOUNDS["panel_temp"], *[tc1.get(k) for k in THERMOCOUPLE_KEYS] ), reference_panel_temp_c=_bounded_avg( *_BOUNDS["panel_temp"], *[tc2.get(k) for k in THERMOCOUPLE_KEYS] ), ) return snapshot # --------------------------------------------------------------------------- # Helpers (module-level so threads can share without self) # --------------------------------------------------------------------------- def _safe_float(val: Any) -> Optional[float]: """Convert a TB telemetry value string/number to float, or None on failure.""" if val is None: return None try: f = float(val) return None if math.isnan(f) or math.isinf(f) else f except (TypeError, ValueError): return None def _safe_avg(*vals: Any) -> Optional[float]: """Return the mean of non-None, finite values, or None if none available.""" valid = [v for v in vals if v is not None and isinstance(v, (int, float)) and not math.isnan(v) and not math.isinf(v)] return sum(valid) / len(valid) if valid else None def _bounded_avg(lo: float, hi: float, *vals: Any) -> Optional[float]: """Return the mean of values within [lo, hi], rejecting sensor faults outside that range.""" valid = [v for v in vals if v is not None and isinstance(v, (int, float)) and not math.isnan(v) and not math.isinf(v) and lo <= v <= hi] return sum(valid) / len(valid) if valid else None # Physical plausibility bounds for Negev site _BOUNDS = { "air_temp": (-5.0, 55.0), # °C — extreme Negev range "leaf_temp": (-5.0, 60.0), # °C — leaves can exceed air under direct sun "soil_temp": (-2.0, 45.0), # °C — soil in Negev "soil_moisture": (0.0, 100.0), # % "par": (0.0, 3000.0), # µmol m⁻² s⁻¹ "vpd": (0.0, 10.0), # kPa "co2": (300.0, 2000.0), # ppm "ndvi": (-1.0, 1.0), "pri": (-1.0, 1.0), "dli": (0.0, 80.0), # mol m⁻² day⁻¹ "panel_temp": (-10.0, 100.0), # °C — panel surface } # --------------------------------------------------------------------------- # CLI smoke test # --------------------------------------------------------------------------- if __name__ == "__main__": client = ThingsBoardClient() print("Fetching vine snapshot from ThingsBoard...") try: snap = client.get_vine_snapshot() print(snap.to_advisor_text()) print(f"\nSnapshot age: {snap.staleness_minutes:.1f} min") except Exception as exc: print(f"Error: {exc}") print("Make sure THINGSBOARD_USERNAME/PASSWORD or THINGSBOARD_TOKEN are set in your .env")