""" SolarWine 2.0 — Data Schema ============================ Canonical dataclasses for the four telemetry tables that flow through the 15-minute control loop. SensorRaw — one-slot snapshot of all on-site + IMS inputs BiologicalState — photosynthesis model outputs + phenological state TrackerKinematics — tracker position, commands, operational mode SimulationLog — complete audit record for one 15-min slot Storage ------- CSV/Parquet backend via to_dict() / from_dict() helpers. Schema is forward- compatible with a future TimescaleDB migration (all timestamps are UTC, numeric fields are SI units). Unit conventions ---------------- Temperatures : °C PAR : µmol m⁻² s⁻¹ DLI : mol m⁻² day⁻¹ Irradiance (GHI) : W m⁻² VPD : kPa CO₂ : ppm Angles : degrees (tilt: + = east-facing, 0 = horizontal, - = west-facing) Energy : kWh Soil moisture : % Wind speed : m s⁻¹ """ from __future__ import annotations from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from typing import Any, Dict, List, Optional from src.utils import cwsi_from_delta_t # --------------------------------------------------------------------------- # SensorRaw — single 15-min slot of all sensor inputs # --------------------------------------------------------------------------- @dataclass class SensorRaw: """ Canonical sensor snapshot for one 15-min control slot. Populated from ThingsBoard (TB) via VineSnapshot for real-time control, or from CSV/Parquet for historical replay. IMS fields are always from the IMS station 43 (Sde Boker) cache. """ ts: datetime # UTC timestamp of the slot start # --- TB microclimate (treatment area Air2/3/4 average) --- air_temp_c: Optional[float] = None leaf_temp_c: Optional[float] = None vpd_kpa: Optional[float] = None co2_ppm: Optional[float] = None air_leaf_delta_t: Optional[float] = None # proxy for CWSI humidity_pct: Optional[float] = None dew_temp_c: Optional[float] = None # --- PAR / irradiance --- par_umol: Optional[float] = None # above-canopy ambient PAR (Air devices) fruiting_zone_par_umol: Optional[float] = None # mid-canopy PAR (Crop3/5/6/7 avg) ghi_w_m2: Optional[float] = None # IMS global horizontal irradiance # --- Daily light / spectral indices --- dli_mol_m2: Optional[float] = None # daily light integral so far ndvi: Optional[float] = None pri: Optional[float] = None # --- Wind & rain --- wind_speed_ms: Optional[float] = None wind_angle_deg: Optional[float] = None rain_mm: Optional[float] = None air_pressure_hpa: Optional[float] = None # --- TB soil (treatment area Soil1/3/5/6 average) --- soil_moisture_pct: Optional[float] = None soil_temp_c: Optional[float] = None soil_ec_ds_m: Optional[float] = None soil_ph: Optional[float] = None # --- TB reference area (Crop1/2/4 avg, open sky) --- reference_crop_par_umol: Optional[float] = None reference_crop_leaf_temp_c: Optional[float] = None reference_soil_moisture_pct: Optional[float] = None # --- Shading effectiveness --- par_shading_ratio: Optional[float] = None # treatment / reference PAR (<1 = shaded) # --- Derived stress index --- cwsi: Optional[float] = None # explicit CWSI if available from TB # --- Data provenance --- source: str = "unknown" # "thingsboard" | "ims" | "csv" | "mixed" quality_flags: List[str] = field(default_factory=list) # e.g. ["soil5_temp_outlier_excluded", "air3_stale"] # ------------------------------------------------------------------ # Factory: build from a VineSnapshot # ------------------------------------------------------------------ @classmethod def from_vine_snapshot(cls, snapshot: Any) -> "SensorRaw": """ Construct SensorRaw from a ThingsBoardClient.VineSnapshot. The snapshot already contains treatment-vs-reference aggregations and bounded averages; this method simply re-maps them to the canonical SensorRaw field names. """ flags: List[str] = [] if hasattr(snapshot, "staleness_minutes") and snapshot.staleness_minutes > 20: flags.append(f"stale_{snapshot.staleness_minutes:.0f}min") # CWSI proxy from air-leaf temperature delta (see src.utils.cwsi_from_delta_t) cwsi_proxy: Optional[float] = None delta_t = getattr(snapshot, "treatment_air_leaf_delta_t", None) if delta_t is not None: cwsi_proxy = cwsi_from_delta_t(delta_t=delta_t) return cls( ts=getattr(snapshot, "snapshot_ts", datetime.now(tz=timezone.utc)), # Microclimate air_temp_c=getattr(snapshot, "treatment_air_temp_c", None), leaf_temp_c=getattr(snapshot, "treatment_leaf_temp_c", None) or getattr(snapshot, "treatment_crop_leaf_temp_c", None), vpd_kpa=getattr(snapshot, "treatment_vpd_kpa", None), co2_ppm=getattr(snapshot, "treatment_co2_ppm", None), air_leaf_delta_t=delta_t, humidity_pct=getattr(snapshot, "ambient_humidity_pct", None), # PAR par_umol=getattr(snapshot, "treatment_par_umol", None), fruiting_zone_par_umol=getattr(snapshot, "treatment_crop_par_umol", None), dli_mol_m2=getattr(snapshot, "treatment_crop_dli_mol_m2", None), ndvi=getattr(snapshot, "treatment_crop_ndvi", None), pri=getattr(snapshot, "treatment_pri", None), # Wind / weather wind_speed_ms=getattr(snapshot, "ambient_wind_speed_ms", None), wind_angle_deg=getattr(snapshot, "ambient_wind_angle_deg", None), rain_mm=getattr(snapshot, "ambient_rain_mm", None), # Soil soil_moisture_pct=getattr(snapshot, "treatment_soil_moisture_pct", None), soil_temp_c=getattr(snapshot, "treatment_soil_temp_c", None), soil_ec_ds_m=getattr(snapshot, "treatment_soil_ec_ds_m", None), soil_ph=getattr(snapshot, "treatment_soil_ph", None), # Reference reference_crop_par_umol=getattr(snapshot, "reference_crop_par_umol", None), reference_crop_leaf_temp_c=getattr(snapshot, "reference_crop_leaf_temp_c", None), reference_soil_moisture_pct=getattr(snapshot, "reference_soil_moisture_pct", None), # Shading ratio par_shading_ratio=getattr(snapshot, "par_shading_ratio", None), cwsi=cwsi_proxy, source="thingsboard", quality_flags=flags, ) # ------------------------------------------------------------------ # Serialization # ------------------------------------------------------------------ def to_dict(self) -> Dict[str, Any]: d = asdict(self) d["ts"] = self.ts.isoformat() if self.ts else None return d @classmethod def from_dict(cls, d: Dict[str, Any]) -> "SensorRaw": d = d.copy() if isinstance(d.get("ts"), str): d["ts"] = datetime.fromisoformat(d["ts"]) return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) # --------------------------------------------------------------------------- # BiologicalState — photosynthesis model outputs + phenology # --------------------------------------------------------------------------- @dataclass class BiologicalState: """ Computed vine physiological state for one control slot. Produced by the FarquharModel (or ML ensemble via RoutingAgent) and the phenology tracker. Drives the InterventionGate and TradeoffEngine. """ ts: datetime # --- Photosynthesis model outputs --- a_net_umol: Optional[float] = None # net carbon assimilation (µmol CO₂ m⁻² s⁻¹) limiting_state: Optional[str] = None # "rubp" | "rubisco" | "tpu" | "transition" shading_helps: Optional[bool] = None # True only when Rubisco-limited AND heat is bottleneck # --- Model provenance --- model_used: str = "unknown" # "fvcb" | "fvcb_semillon" | "ml" | "ml_ensemble" model_confidence: Optional[float] = None # 0–1 (1 = high confidence in routing choice) # --- Raw inputs echoed for auditing --- par_input: Optional[float] = None tleaf_input: Optional[float] = None vpd_input: Optional[float] = None co2_input: Optional[float] = None # --- Phenological state --- phenological_stage: str = "vegetative" # vegetative | flowering | veraison | harvest gdd_cumulative: Optional[float] = None # growing degree days since budburst crop_value_weight: float = 1.0 # seasonal multiplier (1.5× at veraison, 0.5× post-harvest) # --- Stress levels --- heat_stress_level: str = "none" # none | low | moderate | high | extreme water_stress_level: str = "none" sunburn_risk: bool = False # True when Tleaf > BERRY_SUNBURN_TEMP_C # --- Fruiting-zone specific --- fruiting_zone_a_net: Optional[float] = None # A at mid-canopy zone (zone index 1) fruiting_zone_par: Optional[float] = None # PAR at mid-canopy top_canopy_a_net: Optional[float] = None # A at top-canopy zone (zone index 2) # ------------------------------------------------------------------ def to_dict(self) -> Dict[str, Any]: d = asdict(self) d["ts"] = self.ts.isoformat() if self.ts else None return d @classmethod def from_dict(cls, d: Dict[str, Any]) -> "BiologicalState": d = d.copy() if isinstance(d.get("ts"), str): d["ts"] = datetime.fromisoformat(d["ts"]) return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) # --------------------------------------------------------------------------- # TrackerKinematics — tracker position and operational mode # --------------------------------------------------------------------------- @dataclass class TrackerKinematics: """ Single-axis tracker state for one control slot. astronomical_tilt_deg is always the sun-following position (full-energy). shade_offset_deg is the deliberate deviation for vine protection. effective_tilt_deg = astronomical_tilt_deg + shade_offset_deg. Angle convention: 0° = horizontal, positive = tilted toward east, negative = tilted toward west (consistent with pvlib single-axis sign convention). """ ts: datetime # --- Astronomical tracking (default / full-energy position) --- astronomical_tilt_deg: float = 0.0 solar_azimuth_deg: Optional[float] = None solar_elevation_deg: Optional[float] = None # --- Shading offset (deliberate protection deviation) --- shade_offset_deg: float = 0.0 # 0 = no protection, positive values = shade intervention effective_tilt_deg: float = 0.0 # astronomical + shade_offset # --- Previous slot (for hysteresis) --- previous_tilt_deg: Optional[float] = None tilt_change_deg: float = 0.0 # effective_tilt - previous_tilt motion_triggered: bool = False # True if |change| > ANGLE_TOLERANCE_DEG # --- Operational mode --- operational_mode: str = "tracking" # tracking | wind_stow | heat_shield | harvest_park mode_override_reason: Optional[str] = None # --- Panel surface temperatures --- panel_temp_treatment_c: Optional[float] = None # Thermocouples1 avg panel_temp_reference_c: Optional[float] = None # Thermocouples2 avg # ------------------------------------------------------------------ def to_dict(self) -> Dict[str, Any]: d = asdict(self) d["ts"] = self.ts.isoformat() if self.ts else None return d @classmethod def from_dict(cls, d: Dict[str, Any]) -> "TrackerKinematics": d = d.copy() if isinstance(d.get("ts"), str): d["ts"] = datetime.fromisoformat(d["ts"]) return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) # --------------------------------------------------------------------------- # SimulationLog — complete audit record for one 15-min slot # --------------------------------------------------------------------------- @dataclass class SimulationLog: """ Full audit record for one 15-minute control loop execution. Written to `data/simulation_log.parquet` (or CSV) after every slot. Used for replay, validation, ROI reporting, and Phase 7 integration tests. """ ts: datetime slot_index: int # 0–95 for a 24-hour day (96 × 15-min slots) date_str: str = "" # YYYY-MM-DD local date for partitioning # --- Nested state objects --- sensor: Optional[SensorRaw] = None bio: Optional[BiologicalState] = None kinematics: Optional[TrackerKinematics] = None # --- InterventionGate outcome --- intervention_gate_passed: bool = False gate_rejection_reason: Optional[str] = None # Rejection categories: "no_shade_window:morning" | "no_shade_window:may" | # "overcast" | "below_temp_threshold" | "below_cwsi_threshold" | "budget_exhausted" # --- TradeoffEngine outcome --- candidate_offsets_tested: List[float] = field(default_factory=list) chosen_offset_deg: float = 0.0 minimum_dose_rationale: Optional[str] = None # e.g. "offset 5° sufficient: fruiting PAR reduced below 400 µmol/m²/s" # --- Safety rails --- fvcb_a: Optional[float] = None ml_a: Optional[float] = None model_divergence_pct: Optional[float] = None # |fvcb_a - ml_a| / max * 100 safety_fallback_triggered: bool = False routing_decision: Optional[str] = None # "fvcb" | "ml" — which model was used # --- Energy budget accounting --- energy_fraction_this_slot: float = 0.0 # fraction of max generation sacrificed budget_remaining_daily_kwh: Optional[float] = None budget_remaining_weekly_kwh: Optional[float] = None budget_remaining_monthly_kwh: Optional[float] = None # --- Feedback (filled in the following slot) --- a_net_actual: Optional[float] = None # measured A in next slot (for validation) a_net_improvement_pct: Optional[float] = None # vs unshaded counterfactual # --- Explainability tags --- decision_tags: List[str] = field(default_factory=list) # e.g. ["rubisco_limited", "dose:5deg", "veraison_1.5x", "budget_ok:32%_remaining"] # ------------------------------------------------------------------ # Serialization # ------------------------------------------------------------------ def to_dict(self) -> Dict[str, Any]: """Deep-serialize to a plain dict (JSON-serializable).""" d: Dict[str, Any] = { "ts": self.ts.isoformat() if self.ts else None, "slot_index": self.slot_index, "date_str": self.date_str, "sensor": self.sensor.to_dict() if self.sensor else None, "bio": self.bio.to_dict() if self.bio else None, "kinematics": self.kinematics.to_dict() if self.kinematics else None, "intervention_gate_passed": self.intervention_gate_passed, "gate_rejection_reason": self.gate_rejection_reason, "candidate_offsets_tested": self.candidate_offsets_tested, "chosen_offset_deg": self.chosen_offset_deg, "minimum_dose_rationale": self.minimum_dose_rationale, "fvcb_a": self.fvcb_a, "ml_a": self.ml_a, "model_divergence_pct": self.model_divergence_pct, "safety_fallback_triggered": self.safety_fallback_triggered, "routing_decision": self.routing_decision, "energy_fraction_this_slot": self.energy_fraction_this_slot, "budget_remaining_daily_kwh": self.budget_remaining_daily_kwh, "budget_remaining_weekly_kwh": self.budget_remaining_weekly_kwh, "budget_remaining_monthly_kwh": self.budget_remaining_monthly_kwh, "a_net_actual": self.a_net_actual, "a_net_improvement_pct": self.a_net_improvement_pct, "decision_tags": self.decision_tags, } return d def to_flat_row(self) -> Dict[str, Any]: """ Flatten all nested objects into a single dict row suitable for appending to a Parquet or CSV log file. Nested field names are prefixed: sensor__*, bio__*, kinematics__*. """ row: Dict[str, Any] = { "ts": self.ts.isoformat() if self.ts else None, "slot_index": self.slot_index, "date_str": self.date_str, "gate_passed": self.intervention_gate_passed, "gate_reason": self.gate_rejection_reason, "chosen_offset_deg": self.chosen_offset_deg, "fvcb_a": self.fvcb_a, "ml_a": self.ml_a, "divergence_pct": self.model_divergence_pct, "fallback": self.safety_fallback_triggered, "routing": self.routing_decision, "energy_fraction": self.energy_fraction_this_slot, "budget_daily_kwh": self.budget_remaining_daily_kwh, "budget_monthly_kwh": self.budget_remaining_monthly_kwh, "a_net_actual": self.a_net_actual, "a_net_improvement_pct": self.a_net_improvement_pct, "tags": "|".join(self.decision_tags), } if self.sensor: for k, v in self.sensor.to_dict().items(): if k not in ("ts", "quality_flags", "source"): row[f"sensor__{k}"] = v if self.bio: for k, v in self.bio.to_dict().items(): if k != "ts": row[f"bio__{k}"] = v if self.kinematics: for k, v in self.kinematics.to_dict().items(): if k != "ts": row[f"kin__{k}"] = v return row # --------------------------------------------------------------------------- # Public convenience re-exports from VineSnapshot # --------------------------------------------------------------------------- def sensor_raw_from_vine_snapshot(snapshot: Any) -> SensorRaw: """Module-level alias for SensorRaw.from_vine_snapshot().""" return SensorRaw.from_vine_snapshot(snapshot) # --------------------------------------------------------------------------- # Quick self-test # --------------------------------------------------------------------------- if __name__ == "__main__": import json from datetime import timezone now = datetime.now(tz=timezone.utc) sensor = SensorRaw( ts=now, air_temp_c=33.5, leaf_temp_c=35.1, vpd_kpa=2.9, co2_ppm=410.0, fruiting_zone_par_umol=820.0, soil_moisture_pct=31.2, reference_crop_par_umol=1150.0, par_shading_ratio=0.71, source="thingsboard", ) bio = BiologicalState( ts=now, a_net_umol=14.3, limiting_state="rubisco", shading_helps=True, model_used="fvcb_semillon", phenological_stage="veraison", crop_value_weight=1.5, heat_stress_level="moderate", sunburn_risk=True, ) kin = TrackerKinematics( ts=now, astronomical_tilt_deg=42.0, shade_offset_deg=5.0, effective_tilt_deg=47.0, previous_tilt_deg=42.0, tilt_change_deg=5.0, motion_triggered=True, operational_mode="tracking", panel_temp_treatment_c=58.3, ) log = SimulationLog( ts=now, slot_index=52, date_str="2025-07-15", sensor=sensor, bio=bio, kinematics=kin, intervention_gate_passed=True, candidate_offsets_tested=[3.0, 5.0], chosen_offset_deg=5.0, minimum_dose_rationale="5° sufficient to reduce fruiting-zone PAR below 400", fvcb_a=14.3, ml_a=14.8, model_divergence_pct=3.4, routing_decision="fvcb_semillon", energy_fraction_this_slot=0.042, budget_remaining_daily_kwh=8.1, decision_tags=["rubisco_limited", "dose:5deg", "veraison_1.5x", "budget_ok"], ) print("SensorRaw:") print(json.dumps(sensor.to_dict(), indent=2, default=str)) print("\nBiologicalState:") print(json.dumps(bio.to_dict(), indent=2, default=str)) print("\nTrackerKinematics:") print(json.dumps(kin.to_dict(), indent=2, default=str)) print("\nSimulationLog flat row keys:") row = log.to_flat_row() print(f" {len(row)} columns") print(" First 10:", list(row.keys())[:10]) print("\nSensorRaw round-trip:") s2 = SensorRaw.from_dict(sensor.to_dict()) assert s2.air_temp_c == sensor.air_temp_c assert isinstance(s2.ts, datetime) print(" OK")