| """ |
| SolarWine 2.0 — Data Schema |
| ============================ |
| Canonical dataclasses for the four telemetry tables that flow through |
| the 15-minute control loop. |
| |
| SensorRaw — one-slot snapshot of all on-site + IMS inputs |
| BiologicalState — photosynthesis model outputs + phenological state |
| TrackerKinematics — tracker position, commands, operational mode |
| SimulationLog — complete audit record for one 15-min slot |
| |
| Storage |
| ------- |
| CSV/Parquet backend via to_dict() / from_dict() helpers. Schema is forward- |
| compatible with a future TimescaleDB migration (all timestamps are UTC, |
| numeric fields are SI units). |
| |
| Unit conventions |
| ---------------- |
| Temperatures : °C |
| PAR : µmol m⁻² s⁻¹ |
| DLI : mol m⁻² day⁻¹ |
| Irradiance (GHI) : W m⁻² |
| VPD : kPa |
| CO₂ : ppm |
| Angles : degrees (tilt: + = east-facing, 0 = horizontal, - = west-facing) |
| Energy : kWh |
| Soil moisture : % |
| Wind speed : m s⁻¹ |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import asdict, dataclass, field |
| from datetime import datetime, timezone |
| from typing import Any, Dict, List, Optional |
|
|
| from src.utils import cwsi_from_delta_t |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class SensorRaw: |
| """ |
| Canonical sensor snapshot for one 15-min control slot. |
| |
| Populated from ThingsBoard (TB) via VineSnapshot for real-time control, |
| or from CSV/Parquet for historical replay. IMS fields are always from |
| the IMS station 43 (Sde Boker) cache. |
| """ |
|
|
| ts: datetime |
|
|
| |
| air_temp_c: Optional[float] = None |
| leaf_temp_c: Optional[float] = None |
| vpd_kpa: Optional[float] = None |
| co2_ppm: Optional[float] = None |
| air_leaf_delta_t: Optional[float] = None |
| humidity_pct: Optional[float] = None |
| dew_temp_c: Optional[float] = None |
|
|
| |
| par_umol: Optional[float] = None |
| fruiting_zone_par_umol: Optional[float] = None |
| ghi_w_m2: Optional[float] = None |
|
|
| |
| dli_mol_m2: Optional[float] = None |
| ndvi: Optional[float] = None |
| pri: Optional[float] = None |
|
|
| |
| wind_speed_ms: Optional[float] = None |
| wind_angle_deg: Optional[float] = None |
| rain_mm: Optional[float] = None |
| air_pressure_hpa: Optional[float] = None |
|
|
| |
| soil_moisture_pct: Optional[float] = None |
| soil_temp_c: Optional[float] = None |
| soil_ec_ds_m: Optional[float] = None |
| soil_ph: Optional[float] = None |
|
|
| |
| reference_crop_par_umol: Optional[float] = None |
| reference_crop_leaf_temp_c: Optional[float] = None |
| reference_soil_moisture_pct: Optional[float] = None |
|
|
| |
| par_shading_ratio: Optional[float] = None |
|
|
| |
| cwsi: Optional[float] = None |
|
|
| |
| source: str = "unknown" |
| quality_flags: List[str] = field(default_factory=list) |
| |
|
|
| |
| |
| |
|
|
| @classmethod |
| def from_vine_snapshot(cls, snapshot: Any) -> "SensorRaw": |
| """ |
| Construct SensorRaw from a ThingsBoardClient.VineSnapshot. |
| |
| The snapshot already contains treatment-vs-reference aggregations |
| and bounded averages; this method simply re-maps them to the |
| canonical SensorRaw field names. |
| """ |
| flags: List[str] = [] |
| if hasattr(snapshot, "staleness_minutes") and snapshot.staleness_minutes > 20: |
| flags.append(f"stale_{snapshot.staleness_minutes:.0f}min") |
|
|
| |
| cwsi_proxy: Optional[float] = None |
| delta_t = getattr(snapshot, "treatment_air_leaf_delta_t", None) |
| if delta_t is not None: |
| cwsi_proxy = cwsi_from_delta_t(delta_t=delta_t) |
|
|
| return cls( |
| ts=getattr(snapshot, "snapshot_ts", datetime.now(tz=timezone.utc)), |
|
|
| |
| air_temp_c=getattr(snapshot, "treatment_air_temp_c", None), |
| leaf_temp_c=getattr(snapshot, "treatment_leaf_temp_c", None) |
| or getattr(snapshot, "treatment_crop_leaf_temp_c", None), |
| vpd_kpa=getattr(snapshot, "treatment_vpd_kpa", None), |
| co2_ppm=getattr(snapshot, "treatment_co2_ppm", None), |
| air_leaf_delta_t=delta_t, |
| humidity_pct=getattr(snapshot, "ambient_humidity_pct", None), |
|
|
| |
| par_umol=getattr(snapshot, "treatment_par_umol", None), |
| fruiting_zone_par_umol=getattr(snapshot, "treatment_crop_par_umol", None), |
| dli_mol_m2=getattr(snapshot, "treatment_crop_dli_mol_m2", None), |
| ndvi=getattr(snapshot, "treatment_crop_ndvi", None), |
| pri=getattr(snapshot, "treatment_pri", None), |
|
|
| |
| wind_speed_ms=getattr(snapshot, "ambient_wind_speed_ms", None), |
| wind_angle_deg=getattr(snapshot, "ambient_wind_angle_deg", None), |
| rain_mm=getattr(snapshot, "ambient_rain_mm", None), |
|
|
| |
| soil_moisture_pct=getattr(snapshot, "treatment_soil_moisture_pct", None), |
| soil_temp_c=getattr(snapshot, "treatment_soil_temp_c", None), |
| soil_ec_ds_m=getattr(snapshot, "treatment_soil_ec_ds_m", None), |
| soil_ph=getattr(snapshot, "treatment_soil_ph", None), |
|
|
| |
| reference_crop_par_umol=getattr(snapshot, "reference_crop_par_umol", None), |
| reference_crop_leaf_temp_c=getattr(snapshot, "reference_crop_leaf_temp_c", None), |
| reference_soil_moisture_pct=getattr(snapshot, "reference_soil_moisture_pct", None), |
|
|
| |
| par_shading_ratio=getattr(snapshot, "par_shading_ratio", None), |
|
|
| cwsi=cwsi_proxy, |
| source="thingsboard", |
| quality_flags=flags, |
| ) |
|
|
| |
| |
| |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| d = asdict(self) |
| d["ts"] = self.ts.isoformat() if self.ts else None |
| return d |
|
|
| @classmethod |
| def from_dict(cls, d: Dict[str, Any]) -> "SensorRaw": |
| d = d.copy() |
| if isinstance(d.get("ts"), str): |
| d["ts"] = datetime.fromisoformat(d["ts"]) |
| return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class BiologicalState: |
| """ |
| Computed vine physiological state for one control slot. |
| |
| Produced by the FarquharModel (or ML ensemble via RoutingAgent) and |
| the phenology tracker. Drives the InterventionGate and TradeoffEngine. |
| """ |
|
|
| ts: datetime |
|
|
| |
| a_net_umol: Optional[float] = None |
| limiting_state: Optional[str] = None |
| shading_helps: Optional[bool] = None |
|
|
| |
| model_used: str = "unknown" |
| model_confidence: Optional[float] = None |
|
|
| |
| par_input: Optional[float] = None |
| tleaf_input: Optional[float] = None |
| vpd_input: Optional[float] = None |
| co2_input: Optional[float] = None |
|
|
| |
| phenological_stage: str = "vegetative" |
| gdd_cumulative: Optional[float] = None |
| crop_value_weight: float = 1.0 |
|
|
| |
| heat_stress_level: str = "none" |
| water_stress_level: str = "none" |
| sunburn_risk: bool = False |
|
|
| |
| fruiting_zone_a_net: Optional[float] = None |
| fruiting_zone_par: Optional[float] = None |
| top_canopy_a_net: Optional[float] = None |
|
|
| |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| d = asdict(self) |
| d["ts"] = self.ts.isoformat() if self.ts else None |
| return d |
|
|
| @classmethod |
| def from_dict(cls, d: Dict[str, Any]) -> "BiologicalState": |
| d = d.copy() |
| if isinstance(d.get("ts"), str): |
| d["ts"] = datetime.fromisoformat(d["ts"]) |
| return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class TrackerKinematics: |
| """ |
| Single-axis tracker state for one control slot. |
| |
| astronomical_tilt_deg is always the sun-following position (full-energy). |
| shade_offset_deg is the deliberate deviation for vine protection. |
| effective_tilt_deg = astronomical_tilt_deg + shade_offset_deg. |
| |
| Angle convention: 0° = horizontal, positive = tilted toward east, |
| negative = tilted toward west (consistent with pvlib single-axis sign convention). |
| """ |
|
|
| ts: datetime |
|
|
| |
| astronomical_tilt_deg: float = 0.0 |
| solar_azimuth_deg: Optional[float] = None |
| solar_elevation_deg: Optional[float] = None |
|
|
| |
| shade_offset_deg: float = 0.0 |
| effective_tilt_deg: float = 0.0 |
|
|
| |
| previous_tilt_deg: Optional[float] = None |
| tilt_change_deg: float = 0.0 |
| motion_triggered: bool = False |
|
|
| |
| operational_mode: str = "tracking" |
| mode_override_reason: Optional[str] = None |
|
|
| |
| panel_temp_treatment_c: Optional[float] = None |
| panel_temp_reference_c: Optional[float] = None |
|
|
| |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| d = asdict(self) |
| d["ts"] = self.ts.isoformat() if self.ts else None |
| return d |
|
|
| @classmethod |
| def from_dict(cls, d: Dict[str, Any]) -> "TrackerKinematics": |
| d = d.copy() |
| if isinstance(d.get("ts"), str): |
| d["ts"] = datetime.fromisoformat(d["ts"]) |
| return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class SimulationLog: |
| """ |
| Full audit record for one 15-minute control loop execution. |
| |
| Written to `data/simulation_log.parquet` (or CSV) after every slot. |
| Used for replay, validation, ROI reporting, and Phase 7 integration tests. |
| """ |
|
|
| ts: datetime |
| slot_index: int |
| date_str: str = "" |
|
|
| |
| sensor: Optional[SensorRaw] = None |
| bio: Optional[BiologicalState] = None |
| kinematics: Optional[TrackerKinematics] = None |
|
|
| |
| intervention_gate_passed: bool = False |
| gate_rejection_reason: Optional[str] = None |
| |
| |
|
|
| |
| candidate_offsets_tested: List[float] = field(default_factory=list) |
| chosen_offset_deg: float = 0.0 |
| minimum_dose_rationale: Optional[str] = None |
| |
|
|
| |
| fvcb_a: Optional[float] = None |
| ml_a: Optional[float] = None |
| model_divergence_pct: Optional[float] = None |
| safety_fallback_triggered: bool = False |
| routing_decision: Optional[str] = None |
|
|
| |
| energy_fraction_this_slot: float = 0.0 |
| budget_remaining_daily_kwh: Optional[float] = None |
| budget_remaining_weekly_kwh: Optional[float] = None |
| budget_remaining_monthly_kwh: Optional[float] = None |
|
|
| |
| a_net_actual: Optional[float] = None |
| a_net_improvement_pct: Optional[float] = None |
|
|
| |
| decision_tags: List[str] = field(default_factory=list) |
| |
|
|
| |
| |
| |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| """Deep-serialize to a plain dict (JSON-serializable).""" |
| d: Dict[str, Any] = { |
| "ts": self.ts.isoformat() if self.ts else None, |
| "slot_index": self.slot_index, |
| "date_str": self.date_str, |
| "sensor": self.sensor.to_dict() if self.sensor else None, |
| "bio": self.bio.to_dict() if self.bio else None, |
| "kinematics": self.kinematics.to_dict() if self.kinematics else None, |
| "intervention_gate_passed": self.intervention_gate_passed, |
| "gate_rejection_reason": self.gate_rejection_reason, |
| "candidate_offsets_tested": self.candidate_offsets_tested, |
| "chosen_offset_deg": self.chosen_offset_deg, |
| "minimum_dose_rationale": self.minimum_dose_rationale, |
| "fvcb_a": self.fvcb_a, |
| "ml_a": self.ml_a, |
| "model_divergence_pct": self.model_divergence_pct, |
| "safety_fallback_triggered": self.safety_fallback_triggered, |
| "routing_decision": self.routing_decision, |
| "energy_fraction_this_slot": self.energy_fraction_this_slot, |
| "budget_remaining_daily_kwh": self.budget_remaining_daily_kwh, |
| "budget_remaining_weekly_kwh": self.budget_remaining_weekly_kwh, |
| "budget_remaining_monthly_kwh": self.budget_remaining_monthly_kwh, |
| "a_net_actual": self.a_net_actual, |
| "a_net_improvement_pct": self.a_net_improvement_pct, |
| "decision_tags": self.decision_tags, |
| } |
| return d |
|
|
| def to_flat_row(self) -> Dict[str, Any]: |
| """ |
| Flatten all nested objects into a single dict row suitable for |
| appending to a Parquet or CSV log file. |
| |
| Nested field names are prefixed: sensor__*, bio__*, kinematics__*. |
| """ |
| row: Dict[str, Any] = { |
| "ts": self.ts.isoformat() if self.ts else None, |
| "slot_index": self.slot_index, |
| "date_str": self.date_str, |
| "gate_passed": self.intervention_gate_passed, |
| "gate_reason": self.gate_rejection_reason, |
| "chosen_offset_deg": self.chosen_offset_deg, |
| "fvcb_a": self.fvcb_a, |
| "ml_a": self.ml_a, |
| "divergence_pct": self.model_divergence_pct, |
| "fallback": self.safety_fallback_triggered, |
| "routing": self.routing_decision, |
| "energy_fraction": self.energy_fraction_this_slot, |
| "budget_daily_kwh": self.budget_remaining_daily_kwh, |
| "budget_monthly_kwh": self.budget_remaining_monthly_kwh, |
| "a_net_actual": self.a_net_actual, |
| "a_net_improvement_pct": self.a_net_improvement_pct, |
| "tags": "|".join(self.decision_tags), |
| } |
| if self.sensor: |
| for k, v in self.sensor.to_dict().items(): |
| if k not in ("ts", "quality_flags", "source"): |
| row[f"sensor__{k}"] = v |
| if self.bio: |
| for k, v in self.bio.to_dict().items(): |
| if k != "ts": |
| row[f"bio__{k}"] = v |
| if self.kinematics: |
| for k, v in self.kinematics.to_dict().items(): |
| if k != "ts": |
| row[f"kin__{k}"] = v |
| return row |
|
|
|
|
| |
| |
| |
|
|
| def sensor_raw_from_vine_snapshot(snapshot: Any) -> SensorRaw: |
| """Module-level alias for SensorRaw.from_vine_snapshot().""" |
| return SensorRaw.from_vine_snapshot(snapshot) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import json |
| from datetime import timezone |
|
|
| now = datetime.now(tz=timezone.utc) |
|
|
| sensor = SensorRaw( |
| ts=now, |
| air_temp_c=33.5, |
| leaf_temp_c=35.1, |
| vpd_kpa=2.9, |
| co2_ppm=410.0, |
| fruiting_zone_par_umol=820.0, |
| soil_moisture_pct=31.2, |
| reference_crop_par_umol=1150.0, |
| par_shading_ratio=0.71, |
| source="thingsboard", |
| ) |
| bio = BiologicalState( |
| ts=now, |
| a_net_umol=14.3, |
| limiting_state="rubisco", |
| shading_helps=True, |
| model_used="fvcb_semillon", |
| phenological_stage="veraison", |
| crop_value_weight=1.5, |
| heat_stress_level="moderate", |
| sunburn_risk=True, |
| ) |
| kin = TrackerKinematics( |
| ts=now, |
| astronomical_tilt_deg=42.0, |
| shade_offset_deg=5.0, |
| effective_tilt_deg=47.0, |
| previous_tilt_deg=42.0, |
| tilt_change_deg=5.0, |
| motion_triggered=True, |
| operational_mode="tracking", |
| panel_temp_treatment_c=58.3, |
| ) |
| log = SimulationLog( |
| ts=now, |
| slot_index=52, |
| date_str="2025-07-15", |
| sensor=sensor, |
| bio=bio, |
| kinematics=kin, |
| intervention_gate_passed=True, |
| candidate_offsets_tested=[3.0, 5.0], |
| chosen_offset_deg=5.0, |
| minimum_dose_rationale="5° sufficient to reduce fruiting-zone PAR below 400", |
| fvcb_a=14.3, |
| ml_a=14.8, |
| model_divergence_pct=3.4, |
| routing_decision="fvcb_semillon", |
| energy_fraction_this_slot=0.042, |
| budget_remaining_daily_kwh=8.1, |
| decision_tags=["rubisco_limited", "dose:5deg", "veraison_1.5x", "budget_ok"], |
| ) |
|
|
| print("SensorRaw:") |
| print(json.dumps(sensor.to_dict(), indent=2, default=str)) |
| print("\nBiologicalState:") |
| print(json.dumps(bio.to_dict(), indent=2, default=str)) |
| print("\nTrackerKinematics:") |
| print(json.dumps(kin.to_dict(), indent=2, default=str)) |
| print("\nSimulationLog flat row keys:") |
| row = log.to_flat_row() |
| print(f" {len(row)} columns") |
| print(" First 10:", list(row.keys())[:10]) |
| print("\nSensorRaw round-trip:") |
| s2 = SensorRaw.from_dict(sensor.to_dict()) |
| assert s2.air_temp_c == sensor.air_temp_c |
| assert isinstance(s2.ts, datetime) |
| print(" OK") |
|
|