""" CommandArbiter: priority stack, hysteresis, and fallback logic for tracker commands. Sits between the TradeoffEngine output and the physical tracker actuator. Ensures: 1. Weather protection and harvest mode override everything. 2. Safety rail alerts and simulation timeouts fall back to θ_astro. 3. Hysteresis prevents sub-slot jitter (motor protection). 4. All fallbacks default to full astronomical tracking (zero energy cost). Priority Stack (highest to lowest): P1 Weather Protection → stow angle (flat, 0°) P2 Mechanical Harvest → vertical park (90°) P3 Safety Rail Alert → θ_astro P4 Simulation Timeout → θ_astro P5 TradeoffEngine → θ_astro or θ_astro + offset """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Optional import pandas as pd from config.settings import ( ANGLE_TOLERANCE_DEG, HYSTERESIS_WINDOW_MIN, SIMULATION_TIMEOUT_SEC, WIND_STOW_SPEED_MS, ) class CommandSource(str, Enum): """Priority source identifiers for tracker commands.""" WEATHER = "weather_protection" HARVEST = "harvest_mode" SAFETY = "safety_fallback" TIMEOUT = "timeout_fallback" ENGINE = "engine" HYSTERESIS = "hysteresis" INITIAL = "initial" STABLE = "stable" @dataclass class ArbiterDecision: """Output of the CommandArbiter.""" angle: float # final tracker tilt angle (degrees) dispatch: bool # True = send command to actuator source: str # which priority level decided requested_angle: float = 0.0 # what was originally requested suppressed_reason: Optional[str] = None # why dispatch=False (if suppressed) def decision_tags(self) -> list[str]: tags = [f"source:{self.source}"] if not self.dispatch and self.suppressed_reason: tags.append(f"suppressed:{self.suppressed_reason}") return tags class CommandArbiter: """Priority stack + hysteresis for tracker tilt commands. Parameters ---------- hysteresis_window_min : float Minimum time (minutes) between consecutive tilt changes. angle_tolerance_deg : float Changes smaller than this are suppressed (motor protection). """ def __init__( self, hysteresis_window_min: float = HYSTERESIS_WINDOW_MIN, angle_tolerance_deg: float = ANGLE_TOLERANCE_DEG, ): self.window_min = hysteresis_window_min self.tolerance = angle_tolerance_deg self._buffer: list[tuple[datetime, float]] = [] self.current_angle: float = 0.0 self._last_dispatch_time: Optional[datetime] = None # ------------------------------------------------------------------ # Priority selection # ------------------------------------------------------------------ def select_source( self, engine_result: dict, safety_valid: bool = True, sim_time_sec: float = 0.0, weather_override: Optional[dict] = None, harvest_active: bool = False, theta_astro: float = 0.0, ) -> dict: """Select the highest-priority command source. Parameters ---------- engine_result : dict Output from TradeoffEngine.evaluate_slot() or find_minimum_dose(). Must contain 'angle' key (or 'chosen_offset_deg' for DoseResult). safety_valid : bool False if SafetyRails detected FvCB/ML divergence. sim_time_sec : float Wall-clock time the simulation took (seconds). weather_override : dict or None If not None, must contain 'target_angle' and optionally 'reason'. harvest_active : bool True if mechanical harvesting is in progress. theta_astro : float Astronomical tracking angle (safe default). Returns ------- dict with 'angle', 'source', 'reason' """ # P1: Weather protection (wind stow, hail, etc.) if weather_override is not None: return { "angle": weather_override.get("target_angle", 0.0), "source": CommandSource.WEATHER, "reason": weather_override.get("reason", "weather override active"), } # P2: Mechanical harvesting — panels go vertical for clearance if harvest_active: return { "angle": 90.0, "source": CommandSource.HARVEST, "reason": "mechanical harvesting in progress", } # P3: Safety rail alert — FvCB/ML divergence too high if not safety_valid: return { "angle": theta_astro, "source": CommandSource.SAFETY, "reason": "FvCB/ML divergence exceeded threshold; reverting to astronomical", } # P4: Simulation timeout — shadow model took too long if sim_time_sec > SIMULATION_TIMEOUT_SEC: return { "angle": theta_astro, "source": CommandSource.TIMEOUT, "reason": f"simulation took {sim_time_sec:.1f}s > {SIMULATION_TIMEOUT_SEC}s limit", } # P5: Normal — use TradeoffEngine result angle = engine_result.get("angle", theta_astro) return { "angle": angle, "source": CommandSource.ENGINE, "reason": engine_result.get("action", "tradeoff_engine"), } # ------------------------------------------------------------------ # Hysteresis filter # ------------------------------------------------------------------ def should_move( self, requested_angle: float, timestamp: datetime, ) -> ArbiterDecision: """Apply hysteresis filter to a requested angle change. Motor protection logic: - Suppresses changes smaller than angle_tolerance_deg. - Requires the requested angle to be stable for hysteresis_window_min before dispatching. - Immediate dispatch if this is the first command or if the change is large (e.g., weather stow). """ # Record request in buffer self._buffer.append((timestamp, requested_angle)) # Trim buffer to window cutoff = timestamp - pd.Timedelta(minutes=self.window_min) self._buffer = [(t, a) for t, a in self._buffer if t >= cutoff] # Change smaller than tolerance → suppress angle_diff = abs(requested_angle - self.current_angle) if angle_diff <= self.tolerance: return ArbiterDecision( angle=self.current_angle, dispatch=False, source=CommandSource.HYSTERESIS, requested_angle=requested_angle, suppressed_reason=f"change {angle_diff:.1f}° ≤ tolerance {self.tolerance}°", ) # First command or only one entry in buffer → dispatch immediately if len(self._buffer) < 2 or self._last_dispatch_time is None: self.current_angle = requested_angle self._last_dispatch_time = timestamp return ArbiterDecision( angle=requested_angle, dispatch=True, source=CommandSource.INITIAL, requested_angle=requested_angle, ) # Check stability: all recent entries must agree within tolerance stable = all( abs(a - requested_angle) <= self.tolerance for _, a in self._buffer ) if stable: self.current_angle = requested_angle self._last_dispatch_time = timestamp return ArbiterDecision( angle=requested_angle, dispatch=True, source=CommandSource.STABLE, requested_angle=requested_angle, ) return ArbiterDecision( angle=self.current_angle, dispatch=False, source=CommandSource.HYSTERESIS, requested_angle=requested_angle, suppressed_reason="angle not stable within hysteresis window", ) # ------------------------------------------------------------------ # Combined: select + filter # ------------------------------------------------------------------ def arbitrate( self, timestamp: datetime, engine_result: dict, theta_astro: float, safety_valid: bool = True, sim_time_sec: float = 0.0, weather_override: Optional[dict] = None, harvest_active: bool = False, ) -> ArbiterDecision: """Full arbitration: priority selection → hysteresis filter. This is the main entry point for the 15-min control loop. """ selected = self.select_source( engine_result=engine_result, safety_valid=safety_valid, sim_time_sec=sim_time_sec, weather_override=weather_override, harvest_active=harvest_active, theta_astro=theta_astro, ) # Weather and harvest overrides bypass hysteresis (safety-critical) if selected["source"] in {CommandSource.WEATHER, CommandSource.HARVEST}: self.current_angle = selected["angle"] self._last_dispatch_time = timestamp self._buffer.clear() return ArbiterDecision( angle=selected["angle"], dispatch=True, source=selected["source"], requested_angle=selected["angle"], ) # Normal path: apply hysteresis decision = self.should_move(selected["angle"], timestamp) # Override source with the priority level that selected the angle if decision.dispatch: decision.source = selected["source"] return decision # ------------------------------------------------------------------ # Wind stow helper (delegates to operational_modes) # ------------------------------------------------------------------ @staticmethod def check_wind_stow( wind_speed_ms: float, stow_threshold: float = WIND_STOW_SPEED_MS, ) -> Optional[dict]: """Return a weather override dict if wind speed exceeds stow threshold. Note: ControlLoop uses OperationalModeChecker instead of this method. Kept for backward compatibility with direct arbiter usage. """ from src.operational_modes import check_wind_stow as _check result = _check(wind_speed_ms, stow_threshold) return result.to_weather_override() class AstronomicalTracker: """Pure sun-following. The always-safe default. Wraps ShadowModel to provide a simple get_angle(timestamp) interface. """ def __init__(self, shadow_model=None): self._shadow_model = shadow_model @property def shadow_model(self): if self._shadow_model is None: from src.shading.solar_geometry import ShadowModel self._shadow_model = ShadowModel() return self._shadow_model def get_angle(self, timestamp: datetime) -> float: """Return the astronomical tracking angle for a given timestamp.""" ts = pd.Timestamp(timestamp) if ts.tzinfo is None: ts = ts.tz_localize("UTC") sp = self.shadow_model.get_solar_position( pd.DatetimeIndex([ts]) ) elev = float(sp["solar_elevation"].iloc[0]) if elev <= 0: return 0.0 azim = float(sp["solar_azimuth"].iloc[0]) result = self.shadow_model.compute_tracker_tilt(azim, elev) return float(result["tracker_theta"])