| """ |
| ThingsBoardClient: live telemetry client for the Seymour vineyard at |
| web.seymouragri.com. |
| |
| Device layout |
| ------------- |
| TREATMENT area (rows 501–502, under solar panels): |
| Air2, Air3, Air4 — microclimate sensors under the panels |
| Crop3, Crop5, Crop6, Crop7 — fruiting-zone crop sensors (per panel position) |
| Soil1, Soil3, Soil5, Soil6 — root-zone soil probes |
| Irrigation1 — irrigation flow/volume/quality logger |
| Thermocouples-1 — panel surface temperature (4 positions) |
| |
| REFERENCE area (rows 503–504, open sky, no panels): |
| Crop1, Crop2, Crop4 — fruiting-zone crop sensors (no shading) |
| Soil2, Soil4, Soil7, Soil9 — root-zone soil probes |
| Thermocouples-2 — structural/ambient thermocouple reference |
| |
| AMBIENT (site-level outdoor baseline): |
| Air1 — outdoor climate station (above canopy, no panel) |
| |
| Credentials (env vars or .env): |
| THINGSBOARD_HOST — default https://web.seymouragri.com |
| THINGSBOARD_USERNAME — tenant login email |
| THINGSBOARD_PASSWORD — tenant login password |
| THINGSBOARD_TOKEN — pre-generated JWT (takes priority over user/pass) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import os |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from dataclasses import dataclass, field |
| from datetime import datetime, timezone |
| from enum import Enum |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import pandas as pd |
| import requests |
|
|
|
|
| |
| |
| |
|
|
| class VineArea(str, Enum): |
| TREATMENT = "treatment" |
| REFERENCE = "reference" |
| AMBIENT = "ambient" |
|
|
|
|
| |
| |
| |
|
|
| @dataclass(frozen=True) |
| class DeviceInfo: |
| uuid: str |
| device_id: int |
| area: VineArea |
| row: Optional[int] |
| label: str |
|
|
|
|
| |
| |
| DEVICE_REGISTRY: Dict[str, DeviceInfo] = { |
| "Air1": DeviceInfo( |
| uuid="373041f0-089a-11ef-9126-b746c27d34bd", device_id=4, |
| area=VineArea.AMBIENT, row=None, label="Outdoor Climate (ambient baseline)", |
| ), |
| "Air2": DeviceInfo( |
| uuid="37bf89a0-089a-11ef-9126-b746c27d34bd", device_id=5, |
| area=VineArea.TREATMENT, row=501, label="Indoor Climate Row 501 (under panels)", |
| ), |
| "Air3": DeviceInfo( |
| uuid="3860aba0-089a-11ef-9126-b746c27d34bd", device_id=6, |
| area=VineArea.TREATMENT, row=502, label="Indoor Climate Row 502 (under panels)", |
| ), |
| "Air4": DeviceInfo( |
| uuid="04452660-7114-11ef-9360-f1ed9d9dc643", device_id=7, |
| area=VineArea.TREATMENT, row=502, label="Treatment Row 502 North (under panels)", |
| ), |
| "Crop1": DeviceInfo( |
| uuid="39224df0-089a-11ef-9126-b746c27d34bd", device_id=8, |
| area=VineArea.REFERENCE, row=503, label="Reference crop Row 503", |
| ), |
| "Crop2": DeviceInfo( |
| uuid="aa0d9970-7113-11ef-9360-f1ed9d9dc643", device_id=9, |
| area=VineArea.REFERENCE, row=503, label="Control crop Row 503", |
| ), |
| "Crop3": DeviceInfo( |
| uuid="859b3ce0-29dd-11f0-96bc-55874793181d", device_id=10, |
| area=VineArea.TREATMENT, row=502, label="Treatment 502 – West Bottom", |
| ), |
| "Crop4": DeviceInfo( |
| uuid="889765e0-29dd-11f0-96bc-55874793181d", device_id=11, |
| area=VineArea.REFERENCE, row=502, label="Control crop Row 502 (reference vine)", |
| ), |
| "Crop5": DeviceInfo( |
| uuid="8b092930-29dd-11f0-96bc-55874793181d", device_id=12, |
| area=VineArea.TREATMENT, row=502, label="Treatment 502 – East Upper", |
| ), |
| "Crop6": DeviceInfo( |
| uuid="8cce31c0-29dd-11f0-96bc-55874793181d", device_id=13, |
| area=VineArea.TREATMENT, row=502, label="Treatment 502 – East Bottom", |
| ), |
| "Crop7": DeviceInfo( |
| uuid="8e7440a0-29dd-11f0-96bc-55874793181d", device_id=14, |
| area=VineArea.TREATMENT, row=502, label="Treatment 502 – West Upper", |
| ), |
| "Soil1": DeviceInfo( |
| uuid="3586b0a0-089a-11ef-9126-b746c27d34bd", device_id=16, |
| area=VineArea.TREATMENT, row=502, label="Soil Row 502 (treatment)", |
| ), |
| "Soil2": DeviceInfo( |
| uuid="35cda4b0-089a-11ef-9126-b746c27d34bd", device_id=17, |
| area=VineArea.REFERENCE, row=503, label="Soil Row 503 (reference)", |
| ), |
| "Soil3": DeviceInfo( |
| uuid="3634caf0-089a-11ef-9126-b746c27d34bd", device_id=18, |
| area=VineArea.TREATMENT, row=501, label="Soil Row 501 (treatment)", |
| ), |
| "Soil4": DeviceInfo( |
| uuid="36a4cad0-089a-11ef-9126-b746c27d34bd", device_id=19, |
| area=VineArea.REFERENCE, row=504, label="Soil Row 504 Control", |
| ), |
| "Soil5": DeviceInfo( |
| uuid="77d55280-70e7-11ef-9360-f1ed9d9dc643", device_id=20, |
| area=VineArea.TREATMENT, row=502, label="Treatment Row 502 South", |
| ), |
| "Soil6": DeviceInfo( |
| uuid="7e4e4630-70e7-11ef-9360-f1ed9d9dc643", device_id=21, |
| area=VineArea.TREATMENT, row=502, label="Treatment Row 502 North", |
| ), |
| "Soil7": DeviceInfo( |
| uuid="842e5540-70e7-11ef-9360-f1ed9d9dc643", device_id=22, |
| area=VineArea.REFERENCE, row=504, label="Control 504 South", |
| ), |
| "Soil9": DeviceInfo( |
| uuid="91e44ff0-70e7-11ef-9360-f1ed9d9dc643", device_id=23, |
| area=VineArea.REFERENCE, row=504, label="Control 504 South (2nd probe)", |
| ), |
| "Irrigation1": DeviceInfo( |
| uuid="3a066c60-089a-11ef-9126-b746c27d34bd", device_id=15, |
| area=VineArea.TREATMENT, row=502, label="Irrigation Row 502", |
| ), |
| "Thermocouples1": DeviceInfo( |
| uuid="72ce88f0-c548-11ef-8bc2-fdab9f3349b7", device_id=2, |
| area=VineArea.TREATMENT, row=502, label="Panel surface temps Treatment 502", |
| ), |
| "Thermocouples2": DeviceInfo( |
| uuid="03e40ba0-cc0e-11ef-a2e9-55874793181d", device_id=3, |
| area=VineArea.REFERENCE, row=None, label="Panel/structure surface temps Reference", |
| ), |
| |
| "Tracker501": DeviceInfo( |
| uuid="aac06e50-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, |
| area=VineArea.TREATMENT, row=501, label="Tracker row 501", |
| ), |
| "Tracker502": DeviceInfo( |
| uuid="b99bd630-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, |
| area=VineArea.TREATMENT, row=502, label="Tracker row 502", |
| ), |
| "Tracker503": DeviceInfo( |
| uuid="caffe4c0-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, |
| area=VineArea.TREATMENT, row=503, label="Tracker row 503", |
| ), |
| "Tracker509": DeviceInfo( |
| uuid="bacf7c50-fcdc-11f0-b902-5ff1ea8c4cf9", device_id=0, |
| area=VineArea.TREATMENT, row=509, label="Tracker row 509", |
| ), |
| } |
|
|
| |
| |
| |
|
|
| @dataclass(frozen=True) |
| class AssetInfo: |
| uuid: str |
| label: str |
|
|
| ASSET_REGISTRY: Dict[str, AssetInfo] = { |
| "Plant": AssetInfo( |
| uuid="dc94ddb0-dbe6-11f0-9352-a53ca0b6a212", |
| label="Yeruham Vineyard — plant-level energy", |
| ), |
| } |
|
|
| ENERGY_KEYS: List[str] = ["power", "production"] |
| TRACKER_KEYS: List[str] = ["angle", "manualMode", "setAngle", "setMode"] |
|
|
| |
| |
| |
|
|
| AIR_KEYS: List[str] = [ |
| "airTemperature", "leafTemperature", "VPD", "CO2", "PAR", "DLI", |
| "airHumidity", "windSpeed", "windAngle", "rain", "airPressure", |
| "dewTemperature", "NDVI", "PRI", "airLeafDeltaT", |
| ] |
|
|
| CROP_KEYS: List[str] = [ |
| "PAR", "leafTemperature", "NDVI", "PRI", "DLI", "PARAvg1H", "PARAvg24H", |
| ] |
|
|
| SOIL_KEYS: List[str] = [ |
| "soilMoisture", "soilMoisture2", |
| "soilTemperature", "soilTemperature2", |
| "soilBulkEC", "soilpH", |
| ] |
|
|
| IRRIGATION_KEYS: List[str] = [ |
| "irrigationVolume", "irrigationMinutes", "irrigationFlowRate", |
| "irrigationEC", "irrigationPH", "waterTemperature", |
| "irrigationCycleVolume", "irrigationCycleMinutes", |
| ] |
|
|
| THERMOCOUPLE_KEYS: List[str] = [ |
| "thermocoupleTemperature_1", "thermocoupleTemperature_2", |
| "thermocoupleTemperature_3", "thermocoupleTemperature_4", |
| ] |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class VineSnapshot: |
| """ |
| Aggregated real-time vine state from all ThingsBoard sensors. |
| |
| Fields are grouped by area: |
| - ambient : Air1 (outdoor climate, site-level baseline) |
| - treatment : under solar panels (rows 501–502) |
| - reference : open sky / no panels (rows 503–504) |
| |
| None means the sensor did not return a value. |
| """ |
|
|
| snapshot_ts: datetime |
| staleness_minutes: float |
|
|
| |
| ambient_temp_c: Optional[float] = None |
| ambient_humidity_pct: Optional[float] = None |
| ambient_wind_speed_ms: Optional[float] = None |
| ambient_wind_angle_deg: Optional[float] = None |
| ambient_rain_mm: Optional[float] = None |
|
|
| |
| treatment_air_temp_c: Optional[float] = None |
| treatment_leaf_temp_c: Optional[float] = None |
| treatment_vpd_kpa: Optional[float] = None |
| treatment_co2_ppm: Optional[float] = None |
| treatment_par_umol: Optional[float] = None |
| treatment_dli_mol_m2: Optional[float] = None |
| treatment_ndvi: Optional[float] = None |
| treatment_pri: Optional[float] = None |
| treatment_air_leaf_delta_t: Optional[float] = None |
|
|
| |
| treatment_crop_par_umol: Optional[float] = None |
| treatment_crop_leaf_temp_c: Optional[float] = None |
| treatment_crop_ndvi: Optional[float] = None |
| treatment_crop_dli_mol_m2: Optional[float] = None |
| treatment_crop_par_avg1h: Optional[float] = None |
| |
| treatment_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict) |
|
|
| |
| reference_crop_par_umol: Optional[float] = None |
| reference_crop_leaf_temp_c: Optional[float] = None |
| reference_crop_ndvi: Optional[float] = None |
| reference_crop_dli_mol_m2: Optional[float] = None |
| reference_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict) |
|
|
| |
| par_shading_ratio: Optional[float] = None |
|
|
| |
| treatment_soil_moisture_pct: Optional[float] = None |
| treatment_soil_temp_c: Optional[float] = None |
| treatment_soil_ec_ds_m: Optional[float] = None |
| treatment_soil_ph: Optional[float] = None |
|
|
| |
| reference_soil_moisture_pct: Optional[float] = None |
| reference_soil_temp_c: Optional[float] = None |
|
|
| |
| irrigation_last_volume_l: Optional[float] = None |
| irrigation_last_minutes: Optional[float] = None |
| irrigation_ec: Optional[float] = None |
| irrigation_ph: Optional[float] = None |
| water_temp_c: Optional[float] = None |
|
|
| |
| treatment_panel_temp_c: Optional[float] = None |
| reference_panel_temp_c: Optional[float] = None |
|
|
| def to_advisor_text(self) -> str: |
| """Format snapshot for inclusion in an AI advisory prompt.""" |
| age = f"{self.staleness_minutes:.0f}" if self.staleness_minutes < 120 else ">{:.0f}".format(self.staleness_minutes) |
| lines = [f"VINE STATE (ThingsBoard sensors, ~{age} min ago):"] |
|
|
| lines.append(" TREATMENT area (rows 501-502, under solar panels):") |
| if self.treatment_air_temp_c is not None: |
| lines.append(f" Air temperature: {self.treatment_air_temp_c:.1f} C") |
| if self.treatment_leaf_temp_c is not None: |
| lines.append(f" Leaf temperature: {self.treatment_leaf_temp_c:.1f} C") |
| if self.treatment_air_leaf_delta_t is not None: |
| lines.append(f" Air-leaf delta-T: {self.treatment_air_leaf_delta_t:+.1f} C (proxy for heat stress)") |
| if self.treatment_vpd_kpa is not None: |
| lines.append(f" VPD: {self.treatment_vpd_kpa:.2f} kPa") |
| if self.treatment_co2_ppm is not None: |
| lines.append(f" CO2: {self.treatment_co2_ppm:.0f} ppm") |
| if self.treatment_crop_par_umol is not None: |
| lines.append(f" Fruiting-zone PAR: {self.treatment_crop_par_umol:.0f} umol/m2/s (avg of Crop3/5/6/7)") |
| if self.treatment_crop_dli_mol_m2 is not None: |
| lines.append(f" DLI today so far: {self.treatment_crop_dli_mol_m2:.1f} mol/m2/day") |
| if self.treatment_crop_ndvi is not None: |
| lines.append(f" Canopy NDVI: {self.treatment_crop_ndvi:.3f}") |
| if self.treatment_soil_moisture_pct is not None: |
| lines.append(f" Soil moisture: {self.treatment_soil_moisture_pct:.1f}% (avg Soil1/3/5/6)") |
| if self.treatment_soil_temp_c is not None: |
| lines.append(f" Soil temperature: {self.treatment_soil_temp_c:.1f} C") |
| if self.treatment_panel_temp_c is not None: |
| lines.append(f" Panel surface temp: {self.treatment_panel_temp_c:.1f} C") |
|
|
| if self.treatment_crop_by_position: |
| lines.append(" Per-position PAR (Crop sensors):") |
| for pos, vals in self.treatment_crop_by_position.items(): |
| par = vals.get("par") |
| lt = vals.get("leaf_temp") |
| par_str = f"{par:.0f} umol/m2/s" if par is not None else "N/A" |
| lt_str = f" | leaf {lt:.1f} C" if lt is not None else "" |
| lines.append(f" {pos}: PAR {par_str}{lt_str}") |
|
|
| lines.append("") |
| lines.append(" REFERENCE area (rows 503-504, open sky, no panels):") |
| if self.reference_crop_par_umol is not None: |
| lines.append(f" Fruiting-zone PAR: {self.reference_crop_par_umol:.0f} umol/m2/s (avg of Crop1/2/4)") |
| if self.reference_crop_leaf_temp_c is not None: |
| lines.append(f" Leaf temperature: {self.reference_crop_leaf_temp_c:.1f} C") |
| if self.reference_crop_ndvi is not None: |
| lines.append(f" Canopy NDVI: {self.reference_crop_ndvi:.3f}") |
| if self.reference_soil_moisture_pct is not None: |
| lines.append(f" Soil moisture: {self.reference_soil_moisture_pct:.1f}% (avg Soil2/4/7/9)") |
| if self.reference_crop_by_position: |
| lines.append(" Per-position PAR (Crop sensors):") |
| for pos, vals in self.reference_crop_by_position.items(): |
| par = vals.get("par") |
| par_str = f"{par:.0f} umol/m2/s" if par is not None else "N/A" |
| lines.append(f" {pos}: PAR {par_str}") |
|
|
| if self.par_shading_ratio is not None: |
| reduction_pct = (1 - self.par_shading_ratio) * 100 |
| lines.append("") |
| lines.append(f" PAR shading ratio (treatment/reference): {self.par_shading_ratio:.2f}" |
| f" ({reduction_pct:.0f}% reduction by panels)") |
|
|
| if self.ambient_temp_c is not None: |
| lines.append("") |
| lines.append(" AMBIENT (outdoor baseline, Air1):") |
| lines.append(f" Air temperature: {self.ambient_temp_c:.1f} C") |
| if self.ambient_wind_speed_ms is not None: |
| lines.append(f" Wind speed: {self.ambient_wind_speed_ms:.1f} m/s") |
| if self.ambient_rain_mm is not None and self.ambient_rain_mm > 0: |
| lines.append(f" Rain: {self.ambient_rain_mm:.1f} mm") |
|
|
| any_irrigation = any(v is not None for v in [ |
| self.irrigation_last_volume_l, self.irrigation_last_minutes, |
| self.irrigation_ec, self.irrigation_ph, |
| ]) |
| if any_irrigation: |
| lines.append("") |
| lines.append(" IRRIGATION (Irrigation1, row 502):") |
| if self.irrigation_last_volume_l is not None: |
| lines.append(f" Last cycle volume: {self.irrigation_last_volume_l:.0f} L") |
| if self.irrigation_last_minutes is not None: |
| lines.append(f" Duration: {self.irrigation_last_minutes:.0f} min") |
| if self.irrigation_ec is not None: |
| lines.append(f" EC: {self.irrigation_ec:.2f} dS/m") |
| if self.irrigation_ph is not None: |
| lines.append(f" pH: {self.irrigation_ph:.1f}") |
| if self.water_temp_c is not None: |
| lines.append(f" Water temperature: {self.water_temp_c:.1f} C") |
|
|
| return "\n".join(lines) |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| """Return a flat dict suitable for JSON serialization (e.g., chatbot tool result).""" |
| out: Dict[str, Any] = { |
| "snapshot_ts": self.snapshot_ts.isoformat(), |
| "staleness_minutes": round(self.staleness_minutes, 1), |
| } |
| for attr in ( |
| "ambient_temp_c", "ambient_humidity_pct", "ambient_wind_speed_ms", |
| "ambient_wind_angle_deg", "ambient_rain_mm", |
| "treatment_air_temp_c", "treatment_leaf_temp_c", "treatment_vpd_kpa", |
| "treatment_co2_ppm", "treatment_par_umol", "treatment_dli_mol_m2", |
| "treatment_ndvi", "treatment_pri", "treatment_air_leaf_delta_t", |
| "treatment_crop_par_umol", "treatment_crop_leaf_temp_c", |
| "treatment_crop_ndvi", "treatment_crop_dli_mol_m2", "treatment_crop_par_avg1h", |
| "reference_crop_par_umol", "reference_crop_leaf_temp_c", |
| "reference_crop_ndvi", "reference_crop_dli_mol_m2", |
| "par_shading_ratio", |
| "treatment_soil_moisture_pct", "treatment_soil_temp_c", |
| "treatment_soil_ec_ds_m", "treatment_soil_ph", |
| "reference_soil_moisture_pct", "reference_soil_temp_c", |
| "irrigation_last_volume_l", "irrigation_last_minutes", |
| "irrigation_ec", "irrigation_ph", "water_temp_c", |
| "treatment_panel_temp_c", "reference_panel_temp_c", |
| ): |
| val = getattr(self, attr) |
| out[attr] = round(val, 3) if val is not None else None |
| out["treatment_crop_by_position"] = self.treatment_crop_by_position |
| out["reference_crop_by_position"] = self.reference_crop_by_position |
| return out |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class ThingsBoardConfig: |
| """ThingsBoard connection settings. Data retrieval always uses prod (Seymour).""" |
| |
| host: str = os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/") |
| username: Optional[str] = ( |
| os.environ.get("THINGSBOARD_USERNAME") or os.environ.get("TB_USERNAME") |
| ) |
| password: Optional[str] = ( |
| os.environ.get("THINGSBOARD_PASSWORD") or os.environ.get("TB_PASSWORD") |
| ) |
| token: Optional[str] = os.environ.get("THINGSBOARD_TOKEN") |
|
|
|
|
| |
| |
| |
|
|
| class ThingsBoardClient: |
| """ |
| Minimal ThingsBoard client for the Seymour vineyard. |
| |
| Authentication |
| -------------- |
| Provide THINGSBOARD_TOKEN for a pre-generated JWT, or |
| THINGSBOARD_USERNAME + THINGSBOARD_PASSWORD for login-based auth. |
| Tokens are cached and refreshed automatically before they expire. |
| |
| Usage |
| ----- |
| client = ThingsBoardClient() |
| snapshot = client.get_vine_snapshot() |
| print(snapshot.to_advisor_text()) |
| """ |
|
|
| _TOKEN_TTL_SECONDS = 8_000 |
|
|
| def __init__(self, config: Optional[ThingsBoardConfig] = None) -> None: |
| self.config = config or ThingsBoardConfig() |
| self._session = requests.Session() |
| self._session.headers.update({"Content-Type": "application/json"}) |
| self._jwt: Optional[str] = None |
| self._jwt_expires_at: float = 0.0 |
|
|
| |
| |
| |
|
|
| def _ensure_jwt(self) -> str: |
| """Return a valid JWT, obtaining or refreshing as needed.""" |
| if self.config.token: |
| if "X-Authorization" not in self._session.headers: |
| self._session.headers["X-Authorization"] = f"Bearer {self.config.token}" |
| return self.config.token |
|
|
| if self._jwt and time.monotonic() < self._jwt_expires_at: |
| return self._jwt |
|
|
| if not self.config.username or not self.config.password: |
| raise RuntimeError( |
| "ThingsBoard authentication requires THINGSBOARD_TOKEN " |
| "or both THINGSBOARD_USERNAME and THINGSBOARD_PASSWORD." |
| ) |
|
|
| url = f"{self.config.host.rstrip('/')}/api/auth/login" |
| resp = self._session.post( |
| url, |
| json={"username": self.config.username, "password": self.config.password}, |
| timeout=10, |
| ) |
| resp.raise_for_status() |
| token = resp.json()["token"] |
| self._jwt = token |
| self._jwt_expires_at = time.monotonic() + self._TOKEN_TTL_SECONDS |
| self._session.headers["X-Authorization"] = f"Bearer {token}" |
| return token |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| def _fetch_latest_raw( |
| self, |
| entity_type: str, |
| uuid: str, |
| keys: List[str], |
| ) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]: |
| """Fetch most-recent telemetry for any entity type (DEVICE or ASSET).""" |
| self._ensure_jwt() |
| url = ( |
| f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}" |
| f"/{uuid}/values/timeseries" |
| ) |
| resp = self._session.get(url, params={"keys": ",".join(keys)}, timeout=15) |
| resp.raise_for_status() |
| raw: Dict[str, List[Dict]] = resp.json() |
|
|
| values: Dict[str, Optional[float]] = {} |
| newest_ts_ms: Optional[int] = None |
| for key in keys: |
| entries = raw.get(key, []) |
| if entries: |
| values[key] = _safe_float(entries[0]["value"]) |
| ts_ms = entries[0].get("ts") |
| if ts_ms and (newest_ts_ms is None or ts_ms > newest_ts_ms): |
| newest_ts_ms = ts_ms |
| else: |
| values[key] = None |
|
|
| newest_ts = ( |
| datetime.fromtimestamp(newest_ts_ms / 1000, tz=timezone.utc) |
| if newest_ts_ms else None |
| ) |
| return values, newest_ts |
|
|
| def _fetch_timeseries_raw( |
| self, |
| entity_type: str, |
| uuid: str, |
| keys: List[str], |
| start: datetime, |
| end: datetime, |
| limit: int = 1000, |
| interval_ms: int = 900_000, |
| agg: str = "NONE", |
| ) -> pd.DataFrame: |
| """Fetch time-series telemetry for any entity type (DEVICE or ASSET).""" |
| self._ensure_jwt() |
| start_ms = int(start.timestamp() * 1000) |
| end_ms = int(end.timestamp() * 1000) |
| url = ( |
| f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}" |
| f"/{uuid}/values/timeseries" |
| ) |
| params: Dict[str, Any] = { |
| "keys": ",".join(keys), |
| "startTs": start_ms, |
| "endTs": end_ms, |
| "limit": limit, |
| "agg": agg, |
| } |
| if agg != "NONE": |
| params["interval"] = interval_ms |
|
|
| resp = self._session.get(url, params=params, timeout=30) |
| resp.raise_for_status() |
| raw: Dict[str, List[Dict]] = resp.json() |
|
|
| frames: Dict[str, pd.Series] = {} |
| for key, entries in raw.items(): |
| if key in keys and entries: |
| ts = pd.to_datetime([e["ts"] for e in entries], unit="ms", utc=True) |
| vals = [_safe_float(e["value"]) for e in entries] |
| frames[key] = pd.Series(vals, index=ts) |
|
|
| if not frames: |
| return pd.DataFrame() |
| return pd.DataFrame(frames).sort_index() |
|
|
| |
| |
| |
|
|
| def _fetch_latest( |
| self, |
| device_name: str, |
| keys: List[str], |
| ) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]: |
| """Fetch most-recent values for a named device.""" |
| info = DEVICE_REGISTRY[device_name] |
| return self._fetch_latest_raw("DEVICE", info.uuid, keys) |
|
|
| def get_latest_telemetry( |
| self, |
| device_name: str, |
| keys: List[str], |
| ) -> Dict[str, Optional[float]]: |
| """Return the most recent value for each key. Missing keys return None.""" |
| if device_name not in DEVICE_REGISTRY: |
| raise KeyError( |
| f"Unknown device: {device_name!r}. " |
| f"Valid names: {sorted(DEVICE_REGISTRY)}" |
| ) |
| values, _ = self._fetch_latest(device_name, keys) |
| return values |
|
|
| def get_timeseries( |
| self, |
| device_name: str, |
| keys: List[str], |
| start: datetime, |
| end: datetime, |
| limit: int = 1000, |
| interval_ms: int = 900_000, |
| agg: str = "NONE", |
| ) -> pd.DataFrame: |
| """Fetch time-series telemetry for a named device.""" |
| if device_name not in DEVICE_REGISTRY: |
| raise KeyError(f"Unknown device: {device_name!r}") |
| info = DEVICE_REGISTRY[device_name] |
| return self._fetch_timeseries_raw( |
| "DEVICE", info.uuid, keys, start, end, limit, interval_ms, agg, |
| ) |
|
|
| |
| |
| |
|
|
| def get_asset_timeseries( |
| self, |
| asset_name: str, |
| keys: List[str], |
| start: datetime, |
| end: datetime, |
| limit: int = 1000, |
| interval_ms: int = 3_600_000, |
| agg: str = "SUM", |
| ) -> pd.DataFrame: |
| """Fetch time-series from a ThingsBoard ASSET (e.g. Plant energy).""" |
| if asset_name not in ASSET_REGISTRY: |
| raise KeyError(f"Unknown asset: {asset_name!r}. Valid: {sorted(ASSET_REGISTRY)}") |
| info = ASSET_REGISTRY[asset_name] |
| return self._fetch_timeseries_raw( |
| "ASSET", info.uuid, keys, start, end, limit, interval_ms, agg, |
| ) |
|
|
| def get_asset_latest( |
| self, |
| asset_name: str, |
| keys: List[str], |
| ) -> Dict[str, Optional[float]]: |
| """Fetch latest telemetry from a ThingsBoard ASSET.""" |
| if asset_name not in ASSET_REGISTRY: |
| raise KeyError(f"Unknown asset: {asset_name!r}") |
| info = ASSET_REGISTRY[asset_name] |
| values, _ = self._fetch_latest_raw("ASSET", info.uuid, keys) |
| return values |
|
|
| |
| |
| |
|
|
| def send_rpc_command( |
| self, |
| device_name: str, |
| method: str, |
| params: Any = None, |
| timeout: float = 10.0, |
| ) -> Dict[str, Any]: |
| """Send a two-way RPC command to a device. |
| |
| Uses POST /api/plugins/rpc/twoway/{deviceId}. |
| Falls back to one-way if two-way returns 404. |
| """ |
| if device_name not in DEVICE_REGISTRY: |
| raise KeyError(f"Unknown device: {device_name!r}") |
| info = DEVICE_REGISTRY[device_name] |
| self._ensure_jwt() |
|
|
| payload = {"method": method, "params": params if params is not None else {}} |
|
|
| |
| url = ( |
| f"{self.config.host.rstrip('/')}/api/plugins/rpc/twoway" |
| f"/{info.uuid}" |
| ) |
| resp = self._session.post(url, json=payload, timeout=timeout) |
| if resp.status_code in (404, 405): |
| |
| url = ( |
| f"{self.config.host.rstrip('/')}/api/plugins/rpc/oneway" |
| f"/{info.uuid}" |
| ) |
| resp = self._session.post(url, json=payload, timeout=timeout) |
| resp.raise_for_status() |
| try: |
| return resp.json() |
| except Exception: |
| return {"status": "ok", "status_code": resp.status_code} |
|
|
| def set_device_attributes( |
| self, |
| device_name: str, |
| attributes: Dict[str, Any], |
| scope: str = "SHARED_SCOPE", |
| ) -> None: |
| """Write server-side attributes to a device. |
| |
| Uses POST /api/plugins/telemetry/DEVICE/{id}/attributes/{scope}. |
| This is an alternative to RPC for setting tracker targets. |
| """ |
| if device_name not in DEVICE_REGISTRY: |
| raise KeyError(f"Unknown device: {device_name!r}") |
| info = DEVICE_REGISTRY[device_name] |
| self._ensure_jwt() |
|
|
| url = ( |
| f"{self.config.host.rstrip('/')}/api/plugins/telemetry/DEVICE" |
| f"/{info.uuid}/attributes/{scope}" |
| ) |
| resp = self._session.post(url, json=attributes, timeout=10) |
| resp.raise_for_status() |
|
|
| |
| |
| |
|
|
| |
| _DASHBOARD_FETCH_PLAN: Dict[str, List[str]] = { |
| "Air1": AIR_KEYS, |
| "Air2": AIR_KEYS, |
| "Soil1": SOIL_KEYS, |
| "Irrigation1": IRRIGATION_KEYS, |
| } |
|
|
| |
| _LIGHT_FETCH_PLAN: Dict[str, List[str]] = { |
| "Air1": AIR_KEYS, |
| "Air2": AIR_KEYS, |
| "Crop1": CROP_KEYS, |
| "Crop3": CROP_KEYS, |
| "Soil1": SOIL_KEYS, |
| "Irrigation1": IRRIGATION_KEYS, |
| } |
|
|
| _FULL_FETCH_PLAN: Dict[str, List[str]] = { |
| "Air1": AIR_KEYS, |
| "Air2": AIR_KEYS, |
| "Air3": AIR_KEYS, |
| "Air4": AIR_KEYS, |
| "Crop1": CROP_KEYS, |
| "Crop2": CROP_KEYS, |
| "Crop3": CROP_KEYS, |
| "Crop4": CROP_KEYS, |
| "Crop5": CROP_KEYS, |
| "Crop6": CROP_KEYS, |
| "Crop7": CROP_KEYS, |
| "Soil1": SOIL_KEYS, |
| "Soil2": SOIL_KEYS, |
| "Soil3": SOIL_KEYS, |
| "Soil4": SOIL_KEYS, |
| "Soil5": SOIL_KEYS, |
| "Soil6": SOIL_KEYS, |
| "Soil7": SOIL_KEYS, |
| "Soil9": SOIL_KEYS, |
| "Irrigation1": IRRIGATION_KEYS, |
| "Thermocouples1": THERMOCOUPLE_KEYS, |
| "Thermocouples2": THERMOCOUPLE_KEYS, |
| } |
|
|
| def get_vine_snapshot(self, light: bool = False, |
| mode: Optional[str] = None) -> VineSnapshot: |
| """ |
| Fetch latest telemetry from all relevant devices and return an |
| aggregated VineSnapshot distinguishing treatment vs reference areas. |
| |
| Uses a thread pool to parallelise HTTP requests. |
| Individual device failures are silently skipped (returns None fields). |
| |
| Parameters |
| ---------- |
| light : bool |
| If True, fetch only ~6 key devices instead of all 21. |
| mode : str, optional |
| "dashboard" = 4 devices only (air + soil + irrigation). |
| Overrides `light` when set. |
| """ |
| if mode == "dashboard": |
| fetch_plan = self._DASHBOARD_FETCH_PLAN |
| elif light: |
| fetch_plan = self._LIGHT_FETCH_PLAN |
| else: |
| fetch_plan = self._FULL_FETCH_PLAN |
|
|
| |
| self._ensure_jwt() |
|
|
| raw_results: Dict[str, Dict[str, Optional[float]]] = {} |
| newest_ts_overall: Optional[datetime] = None |
|
|
| with ThreadPoolExecutor(max_workers=8) as pool: |
| future_map = { |
| pool.submit(self._fetch_latest, name, keys): name |
| for name, keys in fetch_plan.items() |
| } |
| for future in as_completed(future_map, timeout=25): |
| name = future_map[future] |
| try: |
| values, ts = future.result() |
| raw_results[name] = values |
| if ts and (newest_ts_overall is None or ts > newest_ts_overall): |
| newest_ts_overall = ts |
| except Exception: |
| raw_results[name] = {} |
|
|
| now = datetime.now(tz=timezone.utc) |
| staleness = ( |
| (now - newest_ts_overall).total_seconds() / 60 |
| if newest_ts_overall else float("nan") |
| ) |
|
|
| |
| air1 = raw_results.get("Air1", {}) |
|
|
| |
| treatment_air = [raw_results.get(d, {}) for d in ("Air2", "Air3", "Air4")] |
|
|
| |
| position_labels = { |
| "Crop3": "502-west-bottom", |
| "Crop5": "502-east-upper", |
| "Crop6": "502-east-bottom", |
| "Crop7": "502-west-upper", |
| } |
| treatment_crop_devs = { |
| label: raw_results.get(dev, {}) |
| for dev, label in position_labels.items() |
| } |
| treatment_crop_by_pos: Dict[str, Dict[str, Optional[float]]] = { |
| label: { |
| "par": v.get("PAR"), |
| "leaf_temp": v.get("leafTemperature"), |
| "ndvi": v.get("NDVI"), |
| "dli": v.get("DLI"), |
| } |
| for label, v in treatment_crop_devs.items() |
| } |
|
|
| |
| ref_position_labels = { |
| "Crop1": "503-ref", |
| "Crop2": "503-control", |
| "Crop4": "502-control", |
| } |
| reference_crop_devs = { |
| label: raw_results.get(dev, {}) |
| for dev, label in ref_position_labels.items() |
| } |
| reference_crop_by_pos: Dict[str, Dict[str, Optional[float]]] = { |
| label: { |
| "par": v.get("PAR"), |
| "leaf_temp": v.get("leafTemperature"), |
| "ndvi": v.get("NDVI"), |
| "dli": v.get("DLI"), |
| } |
| for label, v in reference_crop_devs.items() |
| } |
|
|
| |
| treatment_soil_devs = [raw_results.get(d, {}) for d in ("Soil1", "Soil3", "Soil5", "Soil6")] |
| reference_soil_devs = [raw_results.get(d, {}) for d in ("Soil2", "Soil4", "Soil7", "Soil9")] |
|
|
| def _avg_soil_moisture(devs: List[Dict]) -> Optional[float]: |
| all_vals = [] |
| for d in devs: |
| for k in ("soilMoisture", "soilMoisture2"): |
| if d.get(k) is not None: |
| all_vals.append(d[k]) |
| lo, hi = _BOUNDS["soil_moisture"] |
| return _bounded_avg(lo, hi, *all_vals) if all_vals else None |
|
|
| def _avg_soil_temp(devs: List[Dict]) -> Optional[float]: |
| all_vals = [] |
| for d in devs: |
| for k in ("soilTemperature", "soilTemperature2"): |
| if d.get(k) is not None: |
| all_vals.append(d[k]) |
| lo, hi = _BOUNDS["soil_temp"] |
| return _bounded_avg(lo, hi, *all_vals) if all_vals else None |
|
|
| |
| tc1 = raw_results.get("Thermocouples1", {}) |
| tc2 = raw_results.get("Thermocouples2", {}) |
|
|
| irr = raw_results.get("Irrigation1", {}) |
|
|
| |
| t_par = _bounded_avg(*_BOUNDS["par"], *[v.get("PAR") for v in treatment_crop_devs.values()]) |
| r_par = _bounded_avg(*_BOUNDS["par"], *[v.get("PAR") for v in reference_crop_devs.values()]) |
| par_ratio: Optional[float] = None |
| if t_par is not None and r_par is not None and r_par > 0: |
| par_ratio = t_par / r_par |
|
|
| snapshot = VineSnapshot( |
| snapshot_ts=now, |
| staleness_minutes=staleness, |
|
|
| |
| ambient_temp_c=_bounded_avg(*_BOUNDS["air_temp"], air1.get("airTemperature")), |
| ambient_humidity_pct=_bounded_avg(0, 100, air1.get("airHumidity")), |
| ambient_wind_speed_ms=_bounded_avg(0, 60, air1.get("windSpeed")), |
| ambient_wind_angle_deg=_bounded_avg(0, 360, air1.get("windAngle")), |
| ambient_rain_mm=_bounded_avg(0, 500, air1.get("rain")), |
|
|
| |
| treatment_air_temp_c=_bounded_avg(*_BOUNDS["air_temp"], *[d.get("airTemperature") for d in treatment_air]), |
| treatment_leaf_temp_c=_bounded_avg(*_BOUNDS["leaf_temp"], *[d.get("leafTemperature") for d in treatment_air]), |
| treatment_vpd_kpa=_bounded_avg(*_BOUNDS["vpd"], *[d.get("VPD") for d in treatment_air]), |
| treatment_co2_ppm=_bounded_avg(*_BOUNDS["co2"], *[d.get("CO2") for d in treatment_air]), |
| treatment_par_umol=_bounded_avg(*_BOUNDS["par"], *[d.get("PAR") for d in treatment_air]), |
| treatment_dli_mol_m2=_bounded_avg(*_BOUNDS["dli"], *[d.get("DLI") for d in treatment_air]), |
| treatment_ndvi=_bounded_avg(*_BOUNDS["ndvi"], *[d.get("NDVI") for d in treatment_air]), |
| treatment_pri=_bounded_avg(*_BOUNDS["pri"], *[d.get("PRI") for d in treatment_air]), |
| treatment_air_leaf_delta_t=_bounded_avg(-20, 20, *[d.get("airLeafDeltaT") for d in treatment_air]), |
|
|
| |
| treatment_crop_par_umol=t_par, |
| treatment_crop_leaf_temp_c=_bounded_avg( |
| *_BOUNDS["leaf_temp"], *[v.get("leafTemperature") for v in treatment_crop_devs.values()] |
| ), |
| treatment_crop_ndvi=_bounded_avg( |
| *_BOUNDS["ndvi"], *[v.get("NDVI") for v in treatment_crop_devs.values()] |
| ), |
| treatment_crop_dli_mol_m2=_bounded_avg( |
| *_BOUNDS["dli"], *[v.get("DLI") for v in treatment_crop_devs.values()] |
| ), |
| treatment_crop_par_avg1h=_bounded_avg( |
| *_BOUNDS["par"], *[v.get("PARAvg1H") for v in treatment_crop_devs.values()] |
| ), |
| treatment_crop_by_position=treatment_crop_by_pos, |
|
|
| |
| reference_crop_par_umol=r_par, |
| reference_crop_leaf_temp_c=_bounded_avg( |
| *_BOUNDS["leaf_temp"], *[v.get("leafTemperature") for v in reference_crop_devs.values()] |
| ), |
| reference_crop_ndvi=_bounded_avg( |
| *_BOUNDS["ndvi"], *[v.get("NDVI") for v in reference_crop_devs.values()] |
| ), |
| reference_crop_dli_mol_m2=_bounded_avg( |
| *_BOUNDS["dli"], *[v.get("DLI") for v in reference_crop_devs.values()] |
| ), |
| reference_crop_by_position=reference_crop_by_pos, |
|
|
| par_shading_ratio=par_ratio, |
|
|
| |
| treatment_soil_moisture_pct=_avg_soil_moisture(treatment_soil_devs), |
| treatment_soil_temp_c=_avg_soil_temp(treatment_soil_devs), |
| treatment_soil_ec_ds_m=_safe_avg(*[d.get("soilBulkEC") for d in treatment_soil_devs]), |
| treatment_soil_ph=_safe_avg(*[d.get("soilpH") for d in treatment_soil_devs]), |
|
|
| |
| reference_soil_moisture_pct=_avg_soil_moisture(reference_soil_devs), |
| reference_soil_temp_c=_avg_soil_temp(reference_soil_devs), |
|
|
| |
| irrigation_last_volume_l=irr.get("irrigationCycleVolume") or irr.get("irrigationVolume"), |
| irrigation_last_minutes=irr.get("irrigationCycleMinutes") or irr.get("irrigationMinutes"), |
| irrigation_ec=irr.get("irrigationEC"), |
| irrigation_ph=irr.get("irrigationPH"), |
| water_temp_c=irr.get("waterTemperature"), |
|
|
| |
| treatment_panel_temp_c=_bounded_avg( |
| *_BOUNDS["panel_temp"], *[tc1.get(k) for k in THERMOCOUPLE_KEYS] |
| ), |
| reference_panel_temp_c=_bounded_avg( |
| *_BOUNDS["panel_temp"], *[tc2.get(k) for k in THERMOCOUPLE_KEYS] |
| ), |
| ) |
| return snapshot |
|
|
|
|
| |
| |
| |
|
|
| def _safe_float(val: Any) -> Optional[float]: |
| """Convert a TB telemetry value string/number to float, or None on failure.""" |
| if val is None: |
| return None |
| try: |
| f = float(val) |
| return None if math.isnan(f) or math.isinf(f) else f |
| except (TypeError, ValueError): |
| return None |
|
|
|
|
| def _safe_avg(*vals: Any) -> Optional[float]: |
| """Return the mean of non-None, finite values, or None if none available.""" |
| valid = [v for v in vals if v is not None and isinstance(v, (int, float)) |
| and not math.isnan(v) and not math.isinf(v)] |
| return sum(valid) / len(valid) if valid else None |
|
|
|
|
| def _bounded_avg(lo: float, hi: float, *vals: Any) -> Optional[float]: |
| """Return the mean of values within [lo, hi], rejecting sensor faults outside that range.""" |
| valid = [v for v in vals if v is not None and isinstance(v, (int, float)) |
| and not math.isnan(v) and not math.isinf(v) and lo <= v <= hi] |
| return sum(valid) / len(valid) if valid else None |
|
|
|
|
| |
| _BOUNDS = { |
| "air_temp": (-5.0, 55.0), |
| "leaf_temp": (-5.0, 60.0), |
| "soil_temp": (-2.0, 45.0), |
| "soil_moisture": (0.0, 100.0), |
| "par": (0.0, 3000.0), |
| "vpd": (0.0, 10.0), |
| "co2": (300.0, 2000.0), |
| "ndvi": (-1.0, 1.0), |
| "pri": (-1.0, 1.0), |
| "dli": (0.0, 80.0), |
| "panel_temp": (-10.0, 100.0), |
| } |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| client = ThingsBoardClient() |
| print("Fetching vine snapshot from ThingsBoard...") |
| try: |
| snap = client.get_vine_snapshot() |
| print(snap.to_advisor_text()) |
| print(f"\nSnapshot age: {snap.staleness_minutes:.1f} min") |
| except Exception as exc: |
| print(f"Error: {exc}") |
| print("Make sure THINGSBOARD_USERNAME/PASSWORD or THINGSBOARD_TOKEN are set in your .env") |
|
|