api / src /data /data_schema.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
SolarWine 2.0 — Data Schema
============================
Canonical dataclasses for the four telemetry tables that flow through
the 15-minute control loop.
SensorRaw — one-slot snapshot of all on-site + IMS inputs
BiologicalState — photosynthesis model outputs + phenological state
TrackerKinematics — tracker position, commands, operational mode
SimulationLog — complete audit record for one 15-min slot
Storage
-------
CSV/Parquet backend via to_dict() / from_dict() helpers. Schema is forward-
compatible with a future TimescaleDB migration (all timestamps are UTC,
numeric fields are SI units).
Unit conventions
----------------
Temperatures : °C
PAR : µmol m⁻² s⁻¹
DLI : mol m⁻² day⁻¹
Irradiance (GHI) : W m⁻²
VPD : kPa
CO₂ : ppm
Angles : degrees (tilt: + = east-facing, 0 = horizontal, - = west-facing)
Energy : kWh
Soil moisture : %
Wind speed : m s⁻¹
"""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from src.utils import cwsi_from_delta_t
# ---------------------------------------------------------------------------
# SensorRaw — single 15-min slot of all sensor inputs
# ---------------------------------------------------------------------------
@dataclass
class SensorRaw:
"""
Canonical sensor snapshot for one 15-min control slot.
Populated from ThingsBoard (TB) via VineSnapshot for real-time control,
or from CSV/Parquet for historical replay. IMS fields are always from
the IMS station 43 (Sde Boker) cache.
"""
ts: datetime # UTC timestamp of the slot start
# --- TB microclimate (treatment area Air2/3/4 average) ---
air_temp_c: Optional[float] = None
leaf_temp_c: Optional[float] = None
vpd_kpa: Optional[float] = None
co2_ppm: Optional[float] = None
air_leaf_delta_t: Optional[float] = None # proxy for CWSI
humidity_pct: Optional[float] = None
dew_temp_c: Optional[float] = None
# --- PAR / irradiance ---
par_umol: Optional[float] = None # above-canopy ambient PAR (Air devices)
fruiting_zone_par_umol: Optional[float] = None # mid-canopy PAR (Crop3/5/6/7 avg)
ghi_w_m2: Optional[float] = None # IMS global horizontal irradiance
# --- Daily light / spectral indices ---
dli_mol_m2: Optional[float] = None # daily light integral so far
ndvi: Optional[float] = None
pri: Optional[float] = None
# --- Wind & rain ---
wind_speed_ms: Optional[float] = None
wind_angle_deg: Optional[float] = None
rain_mm: Optional[float] = None
air_pressure_hpa: Optional[float] = None
# --- TB soil (treatment area Soil1/3/5/6 average) ---
soil_moisture_pct: Optional[float] = None
soil_temp_c: Optional[float] = None
soil_ec_ds_m: Optional[float] = None
soil_ph: Optional[float] = None
# --- TB reference area (Crop1/2/4 avg, open sky) ---
reference_crop_par_umol: Optional[float] = None
reference_crop_leaf_temp_c: Optional[float] = None
reference_soil_moisture_pct: Optional[float] = None
# --- Shading effectiveness ---
par_shading_ratio: Optional[float] = None # treatment / reference PAR (<1 = shaded)
# --- Derived stress index ---
cwsi: Optional[float] = None # explicit CWSI if available from TB
# --- Data provenance ---
source: str = "unknown" # "thingsboard" | "ims" | "csv" | "mixed"
quality_flags: List[str] = field(default_factory=list)
# e.g. ["soil5_temp_outlier_excluded", "air3_stale"]
# ------------------------------------------------------------------
# Factory: build from a VineSnapshot
# ------------------------------------------------------------------
@classmethod
def from_vine_snapshot(cls, snapshot: Any) -> "SensorRaw":
"""
Construct SensorRaw from a ThingsBoardClient.VineSnapshot.
The snapshot already contains treatment-vs-reference aggregations
and bounded averages; this method simply re-maps them to the
canonical SensorRaw field names.
"""
flags: List[str] = []
if hasattr(snapshot, "staleness_minutes") and snapshot.staleness_minutes > 20:
flags.append(f"stale_{snapshot.staleness_minutes:.0f}min")
# CWSI proxy from air-leaf temperature delta (see src.utils.cwsi_from_delta_t)
cwsi_proxy: Optional[float] = None
delta_t = getattr(snapshot, "treatment_air_leaf_delta_t", None)
if delta_t is not None:
cwsi_proxy = cwsi_from_delta_t(delta_t=delta_t)
return cls(
ts=getattr(snapshot, "snapshot_ts", datetime.now(tz=timezone.utc)),
# Microclimate
air_temp_c=getattr(snapshot, "treatment_air_temp_c", None),
leaf_temp_c=getattr(snapshot, "treatment_leaf_temp_c", None)
or getattr(snapshot, "treatment_crop_leaf_temp_c", None),
vpd_kpa=getattr(snapshot, "treatment_vpd_kpa", None),
co2_ppm=getattr(snapshot, "treatment_co2_ppm", None),
air_leaf_delta_t=delta_t,
humidity_pct=getattr(snapshot, "ambient_humidity_pct", None),
# PAR
par_umol=getattr(snapshot, "treatment_par_umol", None),
fruiting_zone_par_umol=getattr(snapshot, "treatment_crop_par_umol", None),
dli_mol_m2=getattr(snapshot, "treatment_crop_dli_mol_m2", None),
ndvi=getattr(snapshot, "treatment_crop_ndvi", None),
pri=getattr(snapshot, "treatment_pri", None),
# Wind / weather
wind_speed_ms=getattr(snapshot, "ambient_wind_speed_ms", None),
wind_angle_deg=getattr(snapshot, "ambient_wind_angle_deg", None),
rain_mm=getattr(snapshot, "ambient_rain_mm", None),
# Soil
soil_moisture_pct=getattr(snapshot, "treatment_soil_moisture_pct", None),
soil_temp_c=getattr(snapshot, "treatment_soil_temp_c", None),
soil_ec_ds_m=getattr(snapshot, "treatment_soil_ec_ds_m", None),
soil_ph=getattr(snapshot, "treatment_soil_ph", None),
# Reference
reference_crop_par_umol=getattr(snapshot, "reference_crop_par_umol", None),
reference_crop_leaf_temp_c=getattr(snapshot, "reference_crop_leaf_temp_c", None),
reference_soil_moisture_pct=getattr(snapshot, "reference_soil_moisture_pct", None),
# Shading ratio
par_shading_ratio=getattr(snapshot, "par_shading_ratio", None),
cwsi=cwsi_proxy,
source="thingsboard",
quality_flags=flags,
)
# ------------------------------------------------------------------
# Serialization
# ------------------------------------------------------------------
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["ts"] = self.ts.isoformat() if self.ts else None
return d
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "SensorRaw":
d = d.copy()
if isinstance(d.get("ts"), str):
d["ts"] = datetime.fromisoformat(d["ts"])
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
# ---------------------------------------------------------------------------
# BiologicalState — photosynthesis model outputs + phenology
# ---------------------------------------------------------------------------
@dataclass
class BiologicalState:
"""
Computed vine physiological state for one control slot.
Produced by the FarquharModel (or ML ensemble via RoutingAgent) and
the phenology tracker. Drives the InterventionGate and TradeoffEngine.
"""
ts: datetime
# --- Photosynthesis model outputs ---
a_net_umol: Optional[float] = None # net carbon assimilation (µmol CO₂ m⁻² s⁻¹)
limiting_state: Optional[str] = None # "rubp" | "rubisco" | "tpu" | "transition"
shading_helps: Optional[bool] = None # True only when Rubisco-limited AND heat is bottleneck
# --- Model provenance ---
model_used: str = "unknown" # "fvcb" | "fvcb_semillon" | "ml" | "ml_ensemble"
model_confidence: Optional[float] = None # 0–1 (1 = high confidence in routing choice)
# --- Raw inputs echoed for auditing ---
par_input: Optional[float] = None
tleaf_input: Optional[float] = None
vpd_input: Optional[float] = None
co2_input: Optional[float] = None
# --- Phenological state ---
phenological_stage: str = "vegetative" # vegetative | flowering | veraison | harvest
gdd_cumulative: Optional[float] = None # growing degree days since budburst
crop_value_weight: float = 1.0 # seasonal multiplier (1.5× at veraison, 0.5× post-harvest)
# --- Stress levels ---
heat_stress_level: str = "none" # none | low | moderate | high | extreme
water_stress_level: str = "none"
sunburn_risk: bool = False # True when Tleaf > BERRY_SUNBURN_TEMP_C
# --- Fruiting-zone specific ---
fruiting_zone_a_net: Optional[float] = None # A at mid-canopy zone (zone index 1)
fruiting_zone_par: Optional[float] = None # PAR at mid-canopy
top_canopy_a_net: Optional[float] = None # A at top-canopy zone (zone index 2)
# ------------------------------------------------------------------
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["ts"] = self.ts.isoformat() if self.ts else None
return d
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "BiologicalState":
d = d.copy()
if isinstance(d.get("ts"), str):
d["ts"] = datetime.fromisoformat(d["ts"])
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
# ---------------------------------------------------------------------------
# TrackerKinematics — tracker position and operational mode
# ---------------------------------------------------------------------------
@dataclass
class TrackerKinematics:
"""
Single-axis tracker state for one control slot.
astronomical_tilt_deg is always the sun-following position (full-energy).
shade_offset_deg is the deliberate deviation for vine protection.
effective_tilt_deg = astronomical_tilt_deg + shade_offset_deg.
Angle convention: 0° = horizontal, positive = tilted toward east,
negative = tilted toward west (consistent with pvlib single-axis sign convention).
"""
ts: datetime
# --- Astronomical tracking (default / full-energy position) ---
astronomical_tilt_deg: float = 0.0
solar_azimuth_deg: Optional[float] = None
solar_elevation_deg: Optional[float] = None
# --- Shading offset (deliberate protection deviation) ---
shade_offset_deg: float = 0.0 # 0 = no protection, positive values = shade intervention
effective_tilt_deg: float = 0.0 # astronomical + shade_offset
# --- Previous slot (for hysteresis) ---
previous_tilt_deg: Optional[float] = None
tilt_change_deg: float = 0.0 # effective_tilt - previous_tilt
motion_triggered: bool = False # True if |change| > ANGLE_TOLERANCE_DEG
# --- Operational mode ---
operational_mode: str = "tracking" # tracking | wind_stow | heat_shield | harvest_park
mode_override_reason: Optional[str] = None
# --- Panel surface temperatures ---
panel_temp_treatment_c: Optional[float] = None # Thermocouples1 avg
panel_temp_reference_c: Optional[float] = None # Thermocouples2 avg
# ------------------------------------------------------------------
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["ts"] = self.ts.isoformat() if self.ts else None
return d
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "TrackerKinematics":
d = d.copy()
if isinstance(d.get("ts"), str):
d["ts"] = datetime.fromisoformat(d["ts"])
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
# ---------------------------------------------------------------------------
# SimulationLog — complete audit record for one 15-min slot
# ---------------------------------------------------------------------------
@dataclass
class SimulationLog:
"""
Full audit record for one 15-minute control loop execution.
Written to `data/simulation_log.parquet` (or CSV) after every slot.
Used for replay, validation, ROI reporting, and Phase 7 integration tests.
"""
ts: datetime
slot_index: int # 0–95 for a 24-hour day (96 × 15-min slots)
date_str: str = "" # YYYY-MM-DD local date for partitioning
# --- Nested state objects ---
sensor: Optional[SensorRaw] = None
bio: Optional[BiologicalState] = None
kinematics: Optional[TrackerKinematics] = None
# --- InterventionGate outcome ---
intervention_gate_passed: bool = False
gate_rejection_reason: Optional[str] = None
# Rejection categories: "no_shade_window:morning" | "no_shade_window:may" |
# "overcast" | "below_temp_threshold" | "below_cwsi_threshold" | "budget_exhausted"
# --- TradeoffEngine outcome ---
candidate_offsets_tested: List[float] = field(default_factory=list)
chosen_offset_deg: float = 0.0
minimum_dose_rationale: Optional[str] = None
# e.g. "offset 5° sufficient: fruiting PAR reduced below 400 µmol/m²/s"
# --- Safety rails ---
fvcb_a: Optional[float] = None
ml_a: Optional[float] = None
model_divergence_pct: Optional[float] = None # |fvcb_a - ml_a| / max * 100
safety_fallback_triggered: bool = False
routing_decision: Optional[str] = None # "fvcb" | "ml" — which model was used
# --- Energy budget accounting ---
energy_fraction_this_slot: float = 0.0 # fraction of max generation sacrificed
budget_remaining_daily_kwh: Optional[float] = None
budget_remaining_weekly_kwh: Optional[float] = None
budget_remaining_monthly_kwh: Optional[float] = None
# --- Feedback (filled in the following slot) ---
a_net_actual: Optional[float] = None # measured A in next slot (for validation)
a_net_improvement_pct: Optional[float] = None # vs unshaded counterfactual
# --- Explainability tags ---
decision_tags: List[str] = field(default_factory=list)
# e.g. ["rubisco_limited", "dose:5deg", "veraison_1.5x", "budget_ok:32%_remaining"]
# ------------------------------------------------------------------
# Serialization
# ------------------------------------------------------------------
def to_dict(self) -> Dict[str, Any]:
"""Deep-serialize to a plain dict (JSON-serializable)."""
d: Dict[str, Any] = {
"ts": self.ts.isoformat() if self.ts else None,
"slot_index": self.slot_index,
"date_str": self.date_str,
"sensor": self.sensor.to_dict() if self.sensor else None,
"bio": self.bio.to_dict() if self.bio else None,
"kinematics": self.kinematics.to_dict() if self.kinematics else None,
"intervention_gate_passed": self.intervention_gate_passed,
"gate_rejection_reason": self.gate_rejection_reason,
"candidate_offsets_tested": self.candidate_offsets_tested,
"chosen_offset_deg": self.chosen_offset_deg,
"minimum_dose_rationale": self.minimum_dose_rationale,
"fvcb_a": self.fvcb_a,
"ml_a": self.ml_a,
"model_divergence_pct": self.model_divergence_pct,
"safety_fallback_triggered": self.safety_fallback_triggered,
"routing_decision": self.routing_decision,
"energy_fraction_this_slot": self.energy_fraction_this_slot,
"budget_remaining_daily_kwh": self.budget_remaining_daily_kwh,
"budget_remaining_weekly_kwh": self.budget_remaining_weekly_kwh,
"budget_remaining_monthly_kwh": self.budget_remaining_monthly_kwh,
"a_net_actual": self.a_net_actual,
"a_net_improvement_pct": self.a_net_improvement_pct,
"decision_tags": self.decision_tags,
}
return d
def to_flat_row(self) -> Dict[str, Any]:
"""
Flatten all nested objects into a single dict row suitable for
appending to a Parquet or CSV log file.
Nested field names are prefixed: sensor__*, bio__*, kinematics__*.
"""
row: Dict[str, Any] = {
"ts": self.ts.isoformat() if self.ts else None,
"slot_index": self.slot_index,
"date_str": self.date_str,
"gate_passed": self.intervention_gate_passed,
"gate_reason": self.gate_rejection_reason,
"chosen_offset_deg": self.chosen_offset_deg,
"fvcb_a": self.fvcb_a,
"ml_a": self.ml_a,
"divergence_pct": self.model_divergence_pct,
"fallback": self.safety_fallback_triggered,
"routing": self.routing_decision,
"energy_fraction": self.energy_fraction_this_slot,
"budget_daily_kwh": self.budget_remaining_daily_kwh,
"budget_monthly_kwh": self.budget_remaining_monthly_kwh,
"a_net_actual": self.a_net_actual,
"a_net_improvement_pct": self.a_net_improvement_pct,
"tags": "|".join(self.decision_tags),
}
if self.sensor:
for k, v in self.sensor.to_dict().items():
if k not in ("ts", "quality_flags", "source"):
row[f"sensor__{k}"] = v
if self.bio:
for k, v in self.bio.to_dict().items():
if k != "ts":
row[f"bio__{k}"] = v
if self.kinematics:
for k, v in self.kinematics.to_dict().items():
if k != "ts":
row[f"kin__{k}"] = v
return row
# ---------------------------------------------------------------------------
# Public convenience re-exports from VineSnapshot
# ---------------------------------------------------------------------------
def sensor_raw_from_vine_snapshot(snapshot: Any) -> SensorRaw:
"""Module-level alias for SensorRaw.from_vine_snapshot()."""
return SensorRaw.from_vine_snapshot(snapshot)
# ---------------------------------------------------------------------------
# Quick self-test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import json
from datetime import timezone
now = datetime.now(tz=timezone.utc)
sensor = SensorRaw(
ts=now,
air_temp_c=33.5,
leaf_temp_c=35.1,
vpd_kpa=2.9,
co2_ppm=410.0,
fruiting_zone_par_umol=820.0,
soil_moisture_pct=31.2,
reference_crop_par_umol=1150.0,
par_shading_ratio=0.71,
source="thingsboard",
)
bio = BiologicalState(
ts=now,
a_net_umol=14.3,
limiting_state="rubisco",
shading_helps=True,
model_used="fvcb_semillon",
phenological_stage="veraison",
crop_value_weight=1.5,
heat_stress_level="moderate",
sunburn_risk=True,
)
kin = TrackerKinematics(
ts=now,
astronomical_tilt_deg=42.0,
shade_offset_deg=5.0,
effective_tilt_deg=47.0,
previous_tilt_deg=42.0,
tilt_change_deg=5.0,
motion_triggered=True,
operational_mode="tracking",
panel_temp_treatment_c=58.3,
)
log = SimulationLog(
ts=now,
slot_index=52,
date_str="2025-07-15",
sensor=sensor,
bio=bio,
kinematics=kin,
intervention_gate_passed=True,
candidate_offsets_tested=[3.0, 5.0],
chosen_offset_deg=5.0,
minimum_dose_rationale="5° sufficient to reduce fruiting-zone PAR below 400",
fvcb_a=14.3,
ml_a=14.8,
model_divergence_pct=3.4,
routing_decision="fvcb_semillon",
energy_fraction_this_slot=0.042,
budget_remaining_daily_kwh=8.1,
decision_tags=["rubisco_limited", "dose:5deg", "veraison_1.5x", "budget_ok"],
)
print("SensorRaw:")
print(json.dumps(sensor.to_dict(), indent=2, default=str))
print("\nBiologicalState:")
print(json.dumps(bio.to_dict(), indent=2, default=str))
print("\nTrackerKinematics:")
print(json.dumps(kin.to_dict(), indent=2, default=str))
print("\nSimulationLog flat row keys:")
row = log.to_flat_row()
print(f" {len(row)} columns")
print(" First 10:", list(row.keys())[:10])
print("\nSensorRaw round-trip:")
s2 = SensorRaw.from_dict(sensor.to_dict())
assert s2.air_temp_c == sensor.air_temp_c
assert isinstance(s2.ts, datetime)
print(" OK")