api / src /data /thingsboard_client.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
ThingsBoardClient: live telemetry client for the Seymour vineyard at
web.seymouragri.com.
Device layout
-------------
TREATMENT area (rows 501–502, under solar panels):
Air2, Air3, Air4 — microclimate sensors under the panels
Crop3, Crop5, Crop6, Crop7 — fruiting-zone crop sensors (per panel position)
Soil1, Soil3, Soil5, Soil6 — root-zone soil probes
Irrigation1 — irrigation flow/volume/quality logger
Thermocouples-1 — panel surface temperature (4 positions)
REFERENCE area (rows 503–504, open sky, no panels):
Crop1, Crop2, Crop4 — fruiting-zone crop sensors (no shading)
Soil2, Soil4, Soil7, Soil9 — root-zone soil probes
Thermocouples-2 — structural/ambient thermocouple reference
AMBIENT (site-level outdoor baseline):
Air1 — outdoor climate station (above canopy, no panel)
Credentials (env vars or .env):
THINGSBOARD_HOST — default https://web.seymouragri.com
THINGSBOARD_USERNAME — tenant login email
THINGSBOARD_PASSWORD — tenant login password
THINGSBOARD_TOKEN — pre-generated JWT (takes priority over user/pass)
"""
from __future__ import annotations
import math
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import requests
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class VineArea(str, Enum):
TREATMENT = "treatment" # under solar panels
REFERENCE = "reference" # open sky, no panels
AMBIENT = "ambient" # site-level outdoor baseline
# ---------------------------------------------------------------------------
# Device registry
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DeviceInfo:
uuid: str
device_id: int
area: VineArea
row: Optional[int]
label: str
#: Full device registry mapping short name → DeviceInfo.
#: UUIDs are from devices.csv in the Research/PV_Vine_Tradeoff repository.
DEVICE_REGISTRY: Dict[str, DeviceInfo] = {
"Air1": DeviceInfo(
uuid="373041f0-089a-11ef-9126-b746c27d34bd", device_id=4,
area=VineArea.AMBIENT, row=None, label="Outdoor Climate (ambient baseline)",
),
"Air2": DeviceInfo(
uuid="37bf89a0-089a-11ef-9126-b746c27d34bd", device_id=5,
area=VineArea.TREATMENT, row=501, label="Indoor Climate Row 501 (under panels)",
),
"Air3": DeviceInfo(
uuid="3860aba0-089a-11ef-9126-b746c27d34bd", device_id=6,
area=VineArea.TREATMENT, row=502, label="Indoor Climate Row 502 (under panels)",
),
"Air4": DeviceInfo(
uuid="04452660-7114-11ef-9360-f1ed9d9dc643", device_id=7,
area=VineArea.TREATMENT, row=502, label="Treatment Row 502 North (under panels)",
),
"Crop1": DeviceInfo(
uuid="39224df0-089a-11ef-9126-b746c27d34bd", device_id=8,
area=VineArea.REFERENCE, row=503, label="Reference crop Row 503",
),
"Crop2": DeviceInfo(
uuid="aa0d9970-7113-11ef-9360-f1ed9d9dc643", device_id=9,
area=VineArea.REFERENCE, row=503, label="Control crop Row 503",
),
"Crop3": DeviceInfo(
uuid="859b3ce0-29dd-11f0-96bc-55874793181d", device_id=10,
area=VineArea.TREATMENT, row=502, label="Treatment 502 – West Bottom",
),
"Crop4": DeviceInfo(
uuid="889765e0-29dd-11f0-96bc-55874793181d", device_id=11,
area=VineArea.REFERENCE, row=502, label="Control crop Row 502 (reference vine)",
),
"Crop5": DeviceInfo(
uuid="8b092930-29dd-11f0-96bc-55874793181d", device_id=12,
area=VineArea.TREATMENT, row=502, label="Treatment 502 – East Upper",
),
"Crop6": DeviceInfo(
uuid="8cce31c0-29dd-11f0-96bc-55874793181d", device_id=13,
area=VineArea.TREATMENT, row=502, label="Treatment 502 – East Bottom",
),
"Crop7": DeviceInfo(
uuid="8e7440a0-29dd-11f0-96bc-55874793181d", device_id=14,
area=VineArea.TREATMENT, row=502, label="Treatment 502 – West Upper",
),
"Soil1": DeviceInfo(
uuid="3586b0a0-089a-11ef-9126-b746c27d34bd", device_id=16,
area=VineArea.TREATMENT, row=502, label="Soil Row 502 (treatment)",
),
"Soil2": DeviceInfo(
uuid="35cda4b0-089a-11ef-9126-b746c27d34bd", device_id=17,
area=VineArea.REFERENCE, row=503, label="Soil Row 503 (reference)",
),
"Soil3": DeviceInfo(
uuid="3634caf0-089a-11ef-9126-b746c27d34bd", device_id=18,
area=VineArea.TREATMENT, row=501, label="Soil Row 501 (treatment)",
),
"Soil4": DeviceInfo(
uuid="36a4cad0-089a-11ef-9126-b746c27d34bd", device_id=19,
area=VineArea.REFERENCE, row=504, label="Soil Row 504 Control",
),
"Soil5": DeviceInfo(
uuid="77d55280-70e7-11ef-9360-f1ed9d9dc643", device_id=20,
area=VineArea.TREATMENT, row=502, label="Treatment Row 502 South",
),
"Soil6": DeviceInfo(
uuid="7e4e4630-70e7-11ef-9360-f1ed9d9dc643", device_id=21,
area=VineArea.TREATMENT, row=502, label="Treatment Row 502 North",
),
"Soil7": DeviceInfo(
uuid="842e5540-70e7-11ef-9360-f1ed9d9dc643", device_id=22,
area=VineArea.REFERENCE, row=504, label="Control 504 South",
),
"Soil9": DeviceInfo(
uuid="91e44ff0-70e7-11ef-9360-f1ed9d9dc643", device_id=23,
area=VineArea.REFERENCE, row=504, label="Control 504 South (2nd probe)",
),
"Irrigation1": DeviceInfo(
uuid="3a066c60-089a-11ef-9126-b746c27d34bd", device_id=15,
area=VineArea.TREATMENT, row=502, label="Irrigation Row 502",
),
"Thermocouples1": DeviceInfo(
uuid="72ce88f0-c548-11ef-8bc2-fdab9f3349b7", device_id=2,
area=VineArea.TREATMENT, row=502, label="Panel surface temps Treatment 502",
),
"Thermocouples2": DeviceInfo(
uuid="03e40ba0-cc0e-11ef-a2e9-55874793181d", device_id=3,
area=VineArea.REFERENCE, row=None, label="Panel/structure surface temps Reference",
),
# Tracker controllers (panel angle + mode)
"Tracker501": DeviceInfo(
uuid="aac06e50-f769-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=501, label="Tracker row 501",
),
"Tracker502": DeviceInfo(
uuid="b99bd630-f769-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=502, label="Tracker row 502",
),
"Tracker503": DeviceInfo(
uuid="caffe4c0-f769-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=503, label="Tracker row 503",
),
"Tracker509": DeviceInfo(
uuid="bacf7c50-fcdc-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=509, label="Tracker row 509",
),
}
# ---------------------------------------------------------------------------
# Asset registry (non-device entities — e.g. the plant-level energy asset)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class AssetInfo:
uuid: str
label: str
ASSET_REGISTRY: Dict[str, AssetInfo] = {
"Plant": AssetInfo(
uuid="dc94ddb0-dbe6-11f0-9352-a53ca0b6a212",
label="Yeruham Vineyard — plant-level energy",
),
}
ENERGY_KEYS: List[str] = ["power", "production"]
TRACKER_KEYS: List[str] = ["angle", "manualMode", "setAngle", "setMode"]
# ---------------------------------------------------------------------------
# Telemetry key sets per device type
# ---------------------------------------------------------------------------
AIR_KEYS: List[str] = [
"airTemperature", "leafTemperature", "VPD", "CO2", "PAR", "DLI",
"airHumidity", "windSpeed", "windAngle", "rain", "airPressure",
"dewTemperature", "NDVI", "PRI", "airLeafDeltaT",
]
CROP_KEYS: List[str] = [
"PAR", "leafTemperature", "NDVI", "PRI", "DLI", "PARAvg1H", "PARAvg24H",
]
SOIL_KEYS: List[str] = [
"soilMoisture", "soilMoisture2",
"soilTemperature", "soilTemperature2",
"soilBulkEC", "soilpH",
]
IRRIGATION_KEYS: List[str] = [
"irrigationVolume", "irrigationMinutes", "irrigationFlowRate",
"irrigationEC", "irrigationPH", "waterTemperature",
"irrigationCycleVolume", "irrigationCycleMinutes",
]
THERMOCOUPLE_KEYS: List[str] = [
"thermocoupleTemperature_1", "thermocoupleTemperature_2",
"thermocoupleTemperature_3", "thermocoupleTemperature_4",
]
# ---------------------------------------------------------------------------
# VineSnapshot dataclass
# ---------------------------------------------------------------------------
@dataclass
class VineSnapshot:
"""
Aggregated real-time vine state from all ThingsBoard sensors.
Fields are grouped by area:
- ambient : Air1 (outdoor climate, site-level baseline)
- treatment : under solar panels (rows 501–502)
- reference : open sky / no panels (rows 503–504)
None means the sensor did not return a value.
"""
snapshot_ts: datetime
staleness_minutes: float
# --- Ambient (Air1, outdoor baseline) ---
ambient_temp_c: Optional[float] = None
ambient_humidity_pct: Optional[float] = None
ambient_wind_speed_ms: Optional[float] = None
ambient_wind_angle_deg: Optional[float] = None
ambient_rain_mm: Optional[float] = None
# --- Treatment microclimate (avg of Air2 / Air3 / Air4) ---
treatment_air_temp_c: Optional[float] = None
treatment_leaf_temp_c: Optional[float] = None
treatment_vpd_kpa: Optional[float] = None
treatment_co2_ppm: Optional[float] = None
treatment_par_umol: Optional[float] = None
treatment_dli_mol_m2: Optional[float] = None
treatment_ndvi: Optional[float] = None
treatment_pri: Optional[float] = None
treatment_air_leaf_delta_t: Optional[float] = None
# --- Treatment crop (avg of Crop3 / Crop5 / Crop6 / Crop7) ---
treatment_crop_par_umol: Optional[float] = None
treatment_crop_leaf_temp_c: Optional[float] = None
treatment_crop_ndvi: Optional[float] = None
treatment_crop_dli_mol_m2: Optional[float] = None
treatment_crop_par_avg1h: Optional[float] = None
# Per-panel-position readings {position_label: {par, leaf_temp, ndvi}}
treatment_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict)
# --- Reference crop (avg of Crop1 / Crop2 / Crop4) ---
reference_crop_par_umol: Optional[float] = None
reference_crop_leaf_temp_c: Optional[float] = None
reference_crop_ndvi: Optional[float] = None
reference_crop_dli_mol_m2: Optional[float] = None
reference_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict)
# --- PAR shading ratio: treatment_crop_par / reference_crop_par ---
par_shading_ratio: Optional[float] = None # <1 = panels are shading
# --- Treatment soil (avg of Soil1 / Soil3 / Soil5 / Soil6) ---
treatment_soil_moisture_pct: Optional[float] = None
treatment_soil_temp_c: Optional[float] = None
treatment_soil_ec_ds_m: Optional[float] = None
treatment_soil_ph: Optional[float] = None
# --- Reference soil (avg of Soil2 / Soil4 / Soil7 / Soil9) ---
reference_soil_moisture_pct: Optional[float] = None
reference_soil_temp_c: Optional[float] = None
# --- Irrigation (Irrigation1, row 502 treatment) ---
irrigation_last_volume_l: Optional[float] = None
irrigation_last_minutes: Optional[float] = None
irrigation_ec: Optional[float] = None
irrigation_ph: Optional[float] = None
water_temp_c: Optional[float] = None
# --- Panel surface temperatures ---
treatment_panel_temp_c: Optional[float] = None # avg Thermocouples1 positions 1-4
reference_panel_temp_c: Optional[float] = None # avg Thermocouples2 positions 1-4
def to_advisor_text(self) -> str:
"""Format snapshot for inclusion in an AI advisory prompt."""
age = f"{self.staleness_minutes:.0f}" if self.staleness_minutes < 120 else ">{:.0f}".format(self.staleness_minutes)
lines = [f"VINE STATE (ThingsBoard sensors, ~{age} min ago):"]
lines.append(" TREATMENT area (rows 501-502, under solar panels):")
if self.treatment_air_temp_c is not None:
lines.append(f" Air temperature: {self.treatment_air_temp_c:.1f} C")
if self.treatment_leaf_temp_c is not None:
lines.append(f" Leaf temperature: {self.treatment_leaf_temp_c:.1f} C")
if self.treatment_air_leaf_delta_t is not None:
lines.append(f" Air-leaf delta-T: {self.treatment_air_leaf_delta_t:+.1f} C (proxy for heat stress)")
if self.treatment_vpd_kpa is not None:
lines.append(f" VPD: {self.treatment_vpd_kpa:.2f} kPa")
if self.treatment_co2_ppm is not None:
lines.append(f" CO2: {self.treatment_co2_ppm:.0f} ppm")
if self.treatment_crop_par_umol is not None:
lines.append(f" Fruiting-zone PAR: {self.treatment_crop_par_umol:.0f} umol/m2/s (avg of Crop3/5/6/7)")
if self.treatment_crop_dli_mol_m2 is not None:
lines.append(f" DLI today so far: {self.treatment_crop_dli_mol_m2:.1f} mol/m2/day")
if self.treatment_crop_ndvi is not None:
lines.append(f" Canopy NDVI: {self.treatment_crop_ndvi:.3f}")
if self.treatment_soil_moisture_pct is not None:
lines.append(f" Soil moisture: {self.treatment_soil_moisture_pct:.1f}% (avg Soil1/3/5/6)")
if self.treatment_soil_temp_c is not None:
lines.append(f" Soil temperature: {self.treatment_soil_temp_c:.1f} C")
if self.treatment_panel_temp_c is not None:
lines.append(f" Panel surface temp: {self.treatment_panel_temp_c:.1f} C")
if self.treatment_crop_by_position:
lines.append(" Per-position PAR (Crop sensors):")
for pos, vals in self.treatment_crop_by_position.items():
par = vals.get("par")
lt = vals.get("leaf_temp")
par_str = f"{par:.0f} umol/m2/s" if par is not None else "N/A"
lt_str = f" | leaf {lt:.1f} C" if lt is not None else ""
lines.append(f" {pos}: PAR {par_str}{lt_str}")
lines.append("")
lines.append(" REFERENCE area (rows 503-504, open sky, no panels):")
if self.reference_crop_par_umol is not None:
lines.append(f" Fruiting-zone PAR: {self.reference_crop_par_umol:.0f} umol/m2/s (avg of Crop1/2/4)")
if self.reference_crop_leaf_temp_c is not None:
lines.append(f" Leaf temperature: {self.reference_crop_leaf_temp_c:.1f} C")
if self.reference_crop_ndvi is not None:
lines.append(f" Canopy NDVI: {self.reference_crop_ndvi:.3f}")
if self.reference_soil_moisture_pct is not None:
lines.append(f" Soil moisture: {self.reference_soil_moisture_pct:.1f}% (avg Soil2/4/7/9)")
if self.reference_crop_by_position:
lines.append(" Per-position PAR (Crop sensors):")
for pos, vals in self.reference_crop_by_position.items():
par = vals.get("par")
par_str = f"{par:.0f} umol/m2/s" if par is not None else "N/A"
lines.append(f" {pos}: PAR {par_str}")
if self.par_shading_ratio is not None:
reduction_pct = (1 - self.par_shading_ratio) * 100
lines.append("")
lines.append(f" PAR shading ratio (treatment/reference): {self.par_shading_ratio:.2f}"
f" ({reduction_pct:.0f}% reduction by panels)")
if self.ambient_temp_c is not None:
lines.append("")
lines.append(" AMBIENT (outdoor baseline, Air1):")
lines.append(f" Air temperature: {self.ambient_temp_c:.1f} C")
if self.ambient_wind_speed_ms is not None:
lines.append(f" Wind speed: {self.ambient_wind_speed_ms:.1f} m/s")
if self.ambient_rain_mm is not None and self.ambient_rain_mm > 0:
lines.append(f" Rain: {self.ambient_rain_mm:.1f} mm")
any_irrigation = any(v is not None for v in [
self.irrigation_last_volume_l, self.irrigation_last_minutes,
self.irrigation_ec, self.irrigation_ph,
])
if any_irrigation:
lines.append("")
lines.append(" IRRIGATION (Irrigation1, row 502):")
if self.irrigation_last_volume_l is not None:
lines.append(f" Last cycle volume: {self.irrigation_last_volume_l:.0f} L")
if self.irrigation_last_minutes is not None:
lines.append(f" Duration: {self.irrigation_last_minutes:.0f} min")
if self.irrigation_ec is not None:
lines.append(f" EC: {self.irrigation_ec:.2f} dS/m")
if self.irrigation_ph is not None:
lines.append(f" pH: {self.irrigation_ph:.1f}")
if self.water_temp_c is not None:
lines.append(f" Water temperature: {self.water_temp_c:.1f} C")
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Return a flat dict suitable for JSON serialization (e.g., chatbot tool result)."""
out: Dict[str, Any] = {
"snapshot_ts": self.snapshot_ts.isoformat(),
"staleness_minutes": round(self.staleness_minutes, 1),
}
for attr in (
"ambient_temp_c", "ambient_humidity_pct", "ambient_wind_speed_ms",
"ambient_wind_angle_deg", "ambient_rain_mm",
"treatment_air_temp_c", "treatment_leaf_temp_c", "treatment_vpd_kpa",
"treatment_co2_ppm", "treatment_par_umol", "treatment_dli_mol_m2",
"treatment_ndvi", "treatment_pri", "treatment_air_leaf_delta_t",
"treatment_crop_par_umol", "treatment_crop_leaf_temp_c",
"treatment_crop_ndvi", "treatment_crop_dli_mol_m2", "treatment_crop_par_avg1h",
"reference_crop_par_umol", "reference_crop_leaf_temp_c",
"reference_crop_ndvi", "reference_crop_dli_mol_m2",
"par_shading_ratio",
"treatment_soil_moisture_pct", "treatment_soil_temp_c",
"treatment_soil_ec_ds_m", "treatment_soil_ph",
"reference_soil_moisture_pct", "reference_soil_temp_c",
"irrigation_last_volume_l", "irrigation_last_minutes",
"irrigation_ec", "irrigation_ph", "water_temp_c",
"treatment_panel_temp_c", "reference_panel_temp_c",
):
val = getattr(self, attr)
out[attr] = round(val, 3) if val is not None else None
out["treatment_crop_by_position"] = self.treatment_crop_by_position
out["reference_crop_by_position"] = self.reference_crop_by_position
return out
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class ThingsBoardConfig:
"""ThingsBoard connection settings. Data retrieval always uses prod (Seymour)."""
# Prod only — test (eu.thingsboard.cloud) is for deploying apps, not data
host: str = os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/")
username: Optional[str] = (
os.environ.get("THINGSBOARD_USERNAME") or os.environ.get("TB_USERNAME")
)
password: Optional[str] = (
os.environ.get("THINGSBOARD_PASSWORD") or os.environ.get("TB_PASSWORD")
)
token: Optional[str] = os.environ.get("THINGSBOARD_TOKEN")
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class ThingsBoardClient:
"""
Minimal ThingsBoard client for the Seymour vineyard.
Authentication
--------------
Provide THINGSBOARD_TOKEN for a pre-generated JWT, or
THINGSBOARD_USERNAME + THINGSBOARD_PASSWORD for login-based auth.
Tokens are cached and refreshed automatically before they expire.
Usage
-----
client = ThingsBoardClient()
snapshot = client.get_vine_snapshot()
print(snapshot.to_advisor_text())
"""
_TOKEN_TTL_SECONDS = 8_000 # ThingsBoard default is 9000 s; be conservative
def __init__(self, config: Optional[ThingsBoardConfig] = None) -> None:
self.config = config or ThingsBoardConfig()
self._session = requests.Session()
self._session.headers.update({"Content-Type": "application/json"})
self._jwt: Optional[str] = None
self._jwt_expires_at: float = 0.0
# ------------------------------------------------------------------
# Authentication
# ------------------------------------------------------------------
def _ensure_jwt(self) -> str:
"""Return a valid JWT, obtaining or refreshing as needed."""
if self.config.token:
if "X-Authorization" not in self._session.headers:
self._session.headers["X-Authorization"] = f"Bearer {self.config.token}"
return self.config.token
if self._jwt and time.monotonic() < self._jwt_expires_at:
return self._jwt
if not self.config.username or not self.config.password:
raise RuntimeError(
"ThingsBoard authentication requires THINGSBOARD_TOKEN "
"or both THINGSBOARD_USERNAME and THINGSBOARD_PASSWORD."
)
url = f"{self.config.host.rstrip('/')}/api/auth/login"
resp = self._session.post(
url,
json={"username": self.config.username, "password": self.config.password},
timeout=10,
)
resp.raise_for_status()
token = resp.json()["token"]
self._jwt = token
self._jwt_expires_at = time.monotonic() + self._TOKEN_TTL_SECONDS
self._session.headers["X-Authorization"] = f"Bearer {token}"
return token
# ------------------------------------------------------------------
# Low-level API calls
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Shared low-level helpers (DEVICE and ASSET use the same REST API,
# differing only in the entity-type path segment).
# ------------------------------------------------------------------
def _fetch_latest_raw(
self,
entity_type: str,
uuid: str,
keys: List[str],
) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]:
"""Fetch most-recent telemetry for any entity type (DEVICE or ASSET)."""
self._ensure_jwt()
url = (
f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}"
f"/{uuid}/values/timeseries"
)
resp = self._session.get(url, params={"keys": ",".join(keys)}, timeout=15)
resp.raise_for_status()
raw: Dict[str, List[Dict]] = resp.json()
values: Dict[str, Optional[float]] = {}
newest_ts_ms: Optional[int] = None
for key in keys:
entries = raw.get(key, [])
if entries:
values[key] = _safe_float(entries[0]["value"])
ts_ms = entries[0].get("ts")
if ts_ms and (newest_ts_ms is None or ts_ms > newest_ts_ms):
newest_ts_ms = ts_ms
else:
values[key] = None
newest_ts = (
datetime.fromtimestamp(newest_ts_ms / 1000, tz=timezone.utc)
if newest_ts_ms else None
)
return values, newest_ts
def _fetch_timeseries_raw(
self,
entity_type: str,
uuid: str,
keys: List[str],
start: datetime,
end: datetime,
limit: int = 1000,
interval_ms: int = 900_000,
agg: str = "NONE",
) -> pd.DataFrame:
"""Fetch time-series telemetry for any entity type (DEVICE or ASSET)."""
self._ensure_jwt()
start_ms = int(start.timestamp() * 1000)
end_ms = int(end.timestamp() * 1000)
url = (
f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}"
f"/{uuid}/values/timeseries"
)
params: Dict[str, Any] = {
"keys": ",".join(keys),
"startTs": start_ms,
"endTs": end_ms,
"limit": limit,
"agg": agg,
}
if agg != "NONE":
params["interval"] = interval_ms
resp = self._session.get(url, params=params, timeout=30)
resp.raise_for_status()
raw: Dict[str, List[Dict]] = resp.json()
frames: Dict[str, pd.Series] = {}
for key, entries in raw.items():
if key in keys and entries:
ts = pd.to_datetime([e["ts"] for e in entries], unit="ms", utc=True)
vals = [_safe_float(e["value"]) for e in entries]
frames[key] = pd.Series(vals, index=ts)
if not frames:
return pd.DataFrame()
return pd.DataFrame(frames).sort_index()
# ------------------------------------------------------------------
# Device API (public)
# ------------------------------------------------------------------
def _fetch_latest(
self,
device_name: str,
keys: List[str],
) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]:
"""Fetch most-recent values for a named device."""
info = DEVICE_REGISTRY[device_name]
return self._fetch_latest_raw("DEVICE", info.uuid, keys)
def get_latest_telemetry(
self,
device_name: str,
keys: List[str],
) -> Dict[str, Optional[float]]:
"""Return the most recent value for each key. Missing keys return None."""
if device_name not in DEVICE_REGISTRY:
raise KeyError(
f"Unknown device: {device_name!r}. "
f"Valid names: {sorted(DEVICE_REGISTRY)}"
)
values, _ = self._fetch_latest(device_name, keys)
return values
def get_timeseries(
self,
device_name: str,
keys: List[str],
start: datetime,
end: datetime,
limit: int = 1000,
interval_ms: int = 900_000, # 15 minutes
agg: str = "NONE",
) -> pd.DataFrame:
"""Fetch time-series telemetry for a named device."""
if device_name not in DEVICE_REGISTRY:
raise KeyError(f"Unknown device: {device_name!r}")
info = DEVICE_REGISTRY[device_name]
return self._fetch_timeseries_raw(
"DEVICE", info.uuid, keys, start, end, limit, interval_ms, agg,
)
# ------------------------------------------------------------------
# Asset API (public)
# ------------------------------------------------------------------
def get_asset_timeseries(
self,
asset_name: str,
keys: List[str],
start: datetime,
end: datetime,
limit: int = 1000,
interval_ms: int = 3_600_000, # 1 hour
agg: str = "SUM",
) -> pd.DataFrame:
"""Fetch time-series from a ThingsBoard ASSET (e.g. Plant energy)."""
if asset_name not in ASSET_REGISTRY:
raise KeyError(f"Unknown asset: {asset_name!r}. Valid: {sorted(ASSET_REGISTRY)}")
info = ASSET_REGISTRY[asset_name]
return self._fetch_timeseries_raw(
"ASSET", info.uuid, keys, start, end, limit, interval_ms, agg,
)
def get_asset_latest(
self,
asset_name: str,
keys: List[str],
) -> Dict[str, Optional[float]]:
"""Fetch latest telemetry from a ThingsBoard ASSET."""
if asset_name not in ASSET_REGISTRY:
raise KeyError(f"Unknown asset: {asset_name!r}")
info = ASSET_REGISTRY[asset_name]
values, _ = self._fetch_latest_raw("ASSET", info.uuid, keys)
return values
# ------------------------------------------------------------------
# Device commands (RPC + attribute writes)
# ------------------------------------------------------------------
def send_rpc_command(
self,
device_name: str,
method: str,
params: Any = None,
timeout: float = 10.0,
) -> Dict[str, Any]:
"""Send a two-way RPC command to a device.
Uses POST /api/plugins/rpc/twoway/{deviceId}.
Falls back to one-way if two-way returns 404.
"""
if device_name not in DEVICE_REGISTRY:
raise KeyError(f"Unknown device: {device_name!r}")
info = DEVICE_REGISTRY[device_name]
self._ensure_jwt()
payload = {"method": method, "params": params if params is not None else {}}
# Try two-way RPC first
url = (
f"{self.config.host.rstrip('/')}/api/plugins/rpc/twoway"
f"/{info.uuid}"
)
resp = self._session.post(url, json=payload, timeout=timeout)
if resp.status_code in (404, 405):
# Fallback to one-way RPC
url = (
f"{self.config.host.rstrip('/')}/api/plugins/rpc/oneway"
f"/{info.uuid}"
)
resp = self._session.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
try:
return resp.json()
except Exception:
return {"status": "ok", "status_code": resp.status_code}
def set_device_attributes(
self,
device_name: str,
attributes: Dict[str, Any],
scope: str = "SHARED_SCOPE",
) -> None:
"""Write server-side attributes to a device.
Uses POST /api/plugins/telemetry/DEVICE/{id}/attributes/{scope}.
This is an alternative to RPC for setting tracker targets.
"""
if device_name not in DEVICE_REGISTRY:
raise KeyError(f"Unknown device: {device_name!r}")
info = DEVICE_REGISTRY[device_name]
self._ensure_jwt()
url = (
f"{self.config.host.rstrip('/')}/api/plugins/telemetry/DEVICE"
f"/{info.uuid}/attributes/{scope}"
)
resp = self._session.post(url, json=attributes, timeout=10)
resp.raise_for_status()
# ------------------------------------------------------------------
# High-level vine snapshot
# ------------------------------------------------------------------
# Dashboard-only: 4 devices for farmer view (temp, soil, irrigation)
_DASHBOARD_FETCH_PLAN: Dict[str, List[str]] = {
"Air1": AIR_KEYS, # ambient weather
"Air2": AIR_KEYS, # treatment air
"Soil1": SOIL_KEYS, # treatment soil
"Irrigation1": IRRIGATION_KEYS,
}
# Light mode: 6 devices (adds crop PAR for chatbot/detailed view)
_LIGHT_FETCH_PLAN: Dict[str, List[str]] = {
"Air1": AIR_KEYS, # ambient
"Air2": AIR_KEYS, # treatment air (one representative)
"Crop1": CROP_KEYS, # reference crop
"Crop3": CROP_KEYS, # treatment crop
"Soil1": SOIL_KEYS, # treatment soil
"Irrigation1": IRRIGATION_KEYS,
}
_FULL_FETCH_PLAN: Dict[str, List[str]] = {
"Air1": AIR_KEYS,
"Air2": AIR_KEYS,
"Air3": AIR_KEYS,
"Air4": AIR_KEYS,
"Crop1": CROP_KEYS,
"Crop2": CROP_KEYS,
"Crop3": CROP_KEYS,
"Crop4": CROP_KEYS,
"Crop5": CROP_KEYS,
"Crop6": CROP_KEYS,
"Crop7": CROP_KEYS,
"Soil1": SOIL_KEYS,
"Soil2": SOIL_KEYS,
"Soil3": SOIL_KEYS,
"Soil4": SOIL_KEYS,
"Soil5": SOIL_KEYS,
"Soil6": SOIL_KEYS,
"Soil7": SOIL_KEYS,
"Soil9": SOIL_KEYS,
"Irrigation1": IRRIGATION_KEYS,
"Thermocouples1": THERMOCOUPLE_KEYS,
"Thermocouples2": THERMOCOUPLE_KEYS,
}
def get_vine_snapshot(self, light: bool = False,
mode: Optional[str] = None) -> VineSnapshot:
"""
Fetch latest telemetry from all relevant devices and return an
aggregated VineSnapshot distinguishing treatment vs reference areas.
Uses a thread pool to parallelise HTTP requests.
Individual device failures are silently skipped (returns None fields).
Parameters
----------
light : bool
If True, fetch only ~6 key devices instead of all 21.
mode : str, optional
"dashboard" = 4 devices only (air + soil + irrigation).
Overrides `light` when set.
"""
if mode == "dashboard":
fetch_plan = self._DASHBOARD_FETCH_PLAN
elif light:
fetch_plan = self._LIGHT_FETCH_PLAN
else:
fetch_plan = self._FULL_FETCH_PLAN
# Ensure auth token before spawning threads (avoid race on login)
self._ensure_jwt()
raw_results: Dict[str, Dict[str, Optional[float]]] = {}
newest_ts_overall: Optional[datetime] = None
with ThreadPoolExecutor(max_workers=8) as pool:
future_map = {
pool.submit(self._fetch_latest, name, keys): name
for name, keys in fetch_plan.items()
}
for future in as_completed(future_map, timeout=25):
name = future_map[future]
try:
values, ts = future.result()
raw_results[name] = values
if ts and (newest_ts_overall is None or ts > newest_ts_overall):
newest_ts_overall = ts
except Exception:
raw_results[name] = {}
now = datetime.now(tz=timezone.utc)
staleness = (
(now - newest_ts_overall).total_seconds() / 60
if newest_ts_overall else float("nan")
)
# ---------- Ambient (Air1) ----------
air1 = raw_results.get("Air1", {})
# ---------- Treatment microclimate (Air2/3/4) ----------
treatment_air = [raw_results.get(d, {}) for d in ("Air2", "Air3", "Air4")]
# ---------- Treatment crop by position ----------
position_labels = {
"Crop3": "502-west-bottom",
"Crop5": "502-east-upper",
"Crop6": "502-east-bottom",
"Crop7": "502-west-upper",
}
treatment_crop_devs = {
label: raw_results.get(dev, {})
for dev, label in position_labels.items()
}
treatment_crop_by_pos: Dict[str, Dict[str, Optional[float]]] = {
label: {
"par": v.get("PAR"),
"leaf_temp": v.get("leafTemperature"),
"ndvi": v.get("NDVI"),
"dli": v.get("DLI"),
}
for label, v in treatment_crop_devs.items()
}
# ---------- Reference crop by position ----------
ref_position_labels = {
"Crop1": "503-ref",
"Crop2": "503-control",
"Crop4": "502-control",
}
reference_crop_devs = {
label: raw_results.get(dev, {})
for dev, label in ref_position_labels.items()
}
reference_crop_by_pos: Dict[str, Dict[str, Optional[float]]] = {
label: {
"par": v.get("PAR"),
"leaf_temp": v.get("leafTemperature"),
"ndvi": v.get("NDVI"),
"dli": v.get("DLI"),
}
for label, v in reference_crop_devs.items()
}
# ---------- Soil averages ----------
treatment_soil_devs = [raw_results.get(d, {}) for d in ("Soil1", "Soil3", "Soil5", "Soil6")]
reference_soil_devs = [raw_results.get(d, {}) for d in ("Soil2", "Soil4", "Soil7", "Soil9")]
def _avg_soil_moisture(devs: List[Dict]) -> Optional[float]:
all_vals = []
for d in devs:
for k in ("soilMoisture", "soilMoisture2"):
if d.get(k) is not None:
all_vals.append(d[k])
lo, hi = _BOUNDS["soil_moisture"]
return _bounded_avg(lo, hi, *all_vals) if all_vals else None
def _avg_soil_temp(devs: List[Dict]) -> Optional[float]:
all_vals = []
for d in devs:
for k in ("soilTemperature", "soilTemperature2"):
if d.get(k) is not None:
all_vals.append(d[k])
lo, hi = _BOUNDS["soil_temp"]
return _bounded_avg(lo, hi, *all_vals) if all_vals else None
# ---------- Panel temps ----------
tc1 = raw_results.get("Thermocouples1", {})
tc2 = raw_results.get("Thermocouples2", {})
irr = raw_results.get("Irrigation1", {})
# ---------- PAR shading ratio (bounded to reject sensor faults) ----------
t_par = _bounded_avg(*_BOUNDS["par"], *[v.get("PAR") for v in treatment_crop_devs.values()])
r_par = _bounded_avg(*_BOUNDS["par"], *[v.get("PAR") for v in reference_crop_devs.values()])
par_ratio: Optional[float] = None
if t_par is not None and r_par is not None and r_par > 0:
par_ratio = t_par / r_par
snapshot = VineSnapshot(
snapshot_ts=now,
staleness_minutes=staleness,
# Ambient — apply bounds to catch single-device faults too
ambient_temp_c=_bounded_avg(*_BOUNDS["air_temp"], air1.get("airTemperature")),
ambient_humidity_pct=_bounded_avg(0, 100, air1.get("airHumidity")),
ambient_wind_speed_ms=_bounded_avg(0, 60, air1.get("windSpeed")),
ambient_wind_angle_deg=_bounded_avg(0, 360, air1.get("windAngle")),
ambient_rain_mm=_bounded_avg(0, 500, air1.get("rain")),
# Treatment climate — bounded to reject sensor faults
treatment_air_temp_c=_bounded_avg(*_BOUNDS["air_temp"], *[d.get("airTemperature") for d in treatment_air]),
treatment_leaf_temp_c=_bounded_avg(*_BOUNDS["leaf_temp"], *[d.get("leafTemperature") for d in treatment_air]),
treatment_vpd_kpa=_bounded_avg(*_BOUNDS["vpd"], *[d.get("VPD") for d in treatment_air]),
treatment_co2_ppm=_bounded_avg(*_BOUNDS["co2"], *[d.get("CO2") for d in treatment_air]),
treatment_par_umol=_bounded_avg(*_BOUNDS["par"], *[d.get("PAR") for d in treatment_air]),
treatment_dli_mol_m2=_bounded_avg(*_BOUNDS["dli"], *[d.get("DLI") for d in treatment_air]),
treatment_ndvi=_bounded_avg(*_BOUNDS["ndvi"], *[d.get("NDVI") for d in treatment_air]),
treatment_pri=_bounded_avg(*_BOUNDS["pri"], *[d.get("PRI") for d in treatment_air]),
treatment_air_leaf_delta_t=_bounded_avg(-20, 20, *[d.get("airLeafDeltaT") for d in treatment_air]),
# Treatment crop
treatment_crop_par_umol=t_par,
treatment_crop_leaf_temp_c=_bounded_avg(
*_BOUNDS["leaf_temp"], *[v.get("leafTemperature") for v in treatment_crop_devs.values()]
),
treatment_crop_ndvi=_bounded_avg(
*_BOUNDS["ndvi"], *[v.get("NDVI") for v in treatment_crop_devs.values()]
),
treatment_crop_dli_mol_m2=_bounded_avg(
*_BOUNDS["dli"], *[v.get("DLI") for v in treatment_crop_devs.values()]
),
treatment_crop_par_avg1h=_bounded_avg(
*_BOUNDS["par"], *[v.get("PARAvg1H") for v in treatment_crop_devs.values()]
),
treatment_crop_by_position=treatment_crop_by_pos,
# Reference crop
reference_crop_par_umol=r_par,
reference_crop_leaf_temp_c=_bounded_avg(
*_BOUNDS["leaf_temp"], *[v.get("leafTemperature") for v in reference_crop_devs.values()]
),
reference_crop_ndvi=_bounded_avg(
*_BOUNDS["ndvi"], *[v.get("NDVI") for v in reference_crop_devs.values()]
),
reference_crop_dli_mol_m2=_bounded_avg(
*_BOUNDS["dli"], *[v.get("DLI") for v in reference_crop_devs.values()]
),
reference_crop_by_position=reference_crop_by_pos,
par_shading_ratio=par_ratio,
# Treatment soil
treatment_soil_moisture_pct=_avg_soil_moisture(treatment_soil_devs),
treatment_soil_temp_c=_avg_soil_temp(treatment_soil_devs),
treatment_soil_ec_ds_m=_safe_avg(*[d.get("soilBulkEC") for d in treatment_soil_devs]),
treatment_soil_ph=_safe_avg(*[d.get("soilpH") for d in treatment_soil_devs]),
# Reference soil
reference_soil_moisture_pct=_avg_soil_moisture(reference_soil_devs),
reference_soil_temp_c=_avg_soil_temp(reference_soil_devs),
# Irrigation
irrigation_last_volume_l=irr.get("irrigationCycleVolume") or irr.get("irrigationVolume"),
irrigation_last_minutes=irr.get("irrigationCycleMinutes") or irr.get("irrigationMinutes"),
irrigation_ec=irr.get("irrigationEC"),
irrigation_ph=irr.get("irrigationPH"),
water_temp_c=irr.get("waterTemperature"),
# Panel temps
treatment_panel_temp_c=_bounded_avg(
*_BOUNDS["panel_temp"], *[tc1.get(k) for k in THERMOCOUPLE_KEYS]
),
reference_panel_temp_c=_bounded_avg(
*_BOUNDS["panel_temp"], *[tc2.get(k) for k in THERMOCOUPLE_KEYS]
),
)
return snapshot
# ---------------------------------------------------------------------------
# Helpers (module-level so threads can share without self)
# ---------------------------------------------------------------------------
def _safe_float(val: Any) -> Optional[float]:
"""Convert a TB telemetry value string/number to float, or None on failure."""
if val is None:
return None
try:
f = float(val)
return None if math.isnan(f) or math.isinf(f) else f
except (TypeError, ValueError):
return None
def _safe_avg(*vals: Any) -> Optional[float]:
"""Return the mean of non-None, finite values, or None if none available."""
valid = [v for v in vals if v is not None and isinstance(v, (int, float))
and not math.isnan(v) and not math.isinf(v)]
return sum(valid) / len(valid) if valid else None
def _bounded_avg(lo: float, hi: float, *vals: Any) -> Optional[float]:
"""Return the mean of values within [lo, hi], rejecting sensor faults outside that range."""
valid = [v for v in vals if v is not None and isinstance(v, (int, float))
and not math.isnan(v) and not math.isinf(v) and lo <= v <= hi]
return sum(valid) / len(valid) if valid else None
# Physical plausibility bounds for Negev site
_BOUNDS = {
"air_temp": (-5.0, 55.0), # °C — extreme Negev range
"leaf_temp": (-5.0, 60.0), # °C — leaves can exceed air under direct sun
"soil_temp": (-2.0, 45.0), # °C — soil in Negev
"soil_moisture": (0.0, 100.0), # %
"par": (0.0, 3000.0), # µmol m⁻² s⁻¹
"vpd": (0.0, 10.0), # kPa
"co2": (300.0, 2000.0), # ppm
"ndvi": (-1.0, 1.0),
"pri": (-1.0, 1.0),
"dli": (0.0, 80.0), # mol m⁻² day⁻¹
"panel_temp": (-10.0, 100.0), # °C — panel surface
}
# ---------------------------------------------------------------------------
# CLI smoke test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
client = ThingsBoardClient()
print("Fetching vine snapshot from ThingsBoard...")
try:
snap = client.get_vine_snapshot()
print(snap.to_advisor_text())
print(f"\nSnapshot age: {snap.staleness_minutes:.1f} min")
except Exception as exc:
print(f"Error: {exc}")
print("Make sure THINGSBOARD_USERNAME/PASSWORD or THINGSBOARD_TOKEN are set in your .env")