| """ |
| CommandArbiter: priority stack, hysteresis, and fallback logic for tracker commands. |
| |
| Sits between the TradeoffEngine output and the physical tracker actuator. |
| Ensures: |
| 1. Weather protection and harvest mode override everything. |
| 2. Safety rail alerts and simulation timeouts fall back to θ_astro. |
| 3. Hysteresis prevents sub-slot jitter (motor protection). |
| 4. All fallbacks default to full astronomical tracking (zero energy cost). |
| |
| Priority Stack (highest to lowest): |
| P1 Weather Protection → stow angle (flat, 0°) |
| P2 Mechanical Harvest → vertical park (90°) |
| P3 Safety Rail Alert → θ_astro |
| P4 Simulation Timeout → θ_astro |
| P5 TradeoffEngine → θ_astro or θ_astro + offset |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from enum import Enum |
| from typing import Optional |
|
|
| import pandas as pd |
|
|
| from config.settings import ( |
| ANGLE_TOLERANCE_DEG, |
| HYSTERESIS_WINDOW_MIN, |
| SIMULATION_TIMEOUT_SEC, |
| WIND_STOW_SPEED_MS, |
| ) |
|
|
|
|
| class CommandSource(str, Enum): |
| """Priority source identifiers for tracker commands.""" |
| WEATHER = "weather_protection" |
| HARVEST = "harvest_mode" |
| SAFETY = "safety_fallback" |
| TIMEOUT = "timeout_fallback" |
| ENGINE = "engine" |
| HYSTERESIS = "hysteresis" |
| INITIAL = "initial" |
| STABLE = "stable" |
|
|
|
|
| @dataclass |
| class ArbiterDecision: |
| """Output of the CommandArbiter.""" |
|
|
| angle: float |
| dispatch: bool |
| source: str |
| requested_angle: float = 0.0 |
| suppressed_reason: Optional[str] = None |
|
|
| def decision_tags(self) -> list[str]: |
| tags = [f"source:{self.source}"] |
| if not self.dispatch and self.suppressed_reason: |
| tags.append(f"suppressed:{self.suppressed_reason}") |
| return tags |
|
|
|
|
| class CommandArbiter: |
| """Priority stack + hysteresis for tracker tilt commands. |
| |
| Parameters |
| ---------- |
| hysteresis_window_min : float |
| Minimum time (minutes) between consecutive tilt changes. |
| angle_tolerance_deg : float |
| Changes smaller than this are suppressed (motor protection). |
| """ |
|
|
| def __init__( |
| self, |
| hysteresis_window_min: float = HYSTERESIS_WINDOW_MIN, |
| angle_tolerance_deg: float = ANGLE_TOLERANCE_DEG, |
| ): |
| self.window_min = hysteresis_window_min |
| self.tolerance = angle_tolerance_deg |
| self._buffer: list[tuple[datetime, float]] = [] |
| self.current_angle: float = 0.0 |
| self._last_dispatch_time: Optional[datetime] = None |
|
|
| |
| |
| |
|
|
| def select_source( |
| self, |
| engine_result: dict, |
| safety_valid: bool = True, |
| sim_time_sec: float = 0.0, |
| weather_override: Optional[dict] = None, |
| harvest_active: bool = False, |
| theta_astro: float = 0.0, |
| ) -> dict: |
| """Select the highest-priority command source. |
| |
| Parameters |
| ---------- |
| engine_result : dict |
| Output from TradeoffEngine.evaluate_slot() or find_minimum_dose(). |
| Must contain 'angle' key (or 'chosen_offset_deg' for DoseResult). |
| safety_valid : bool |
| False if SafetyRails detected FvCB/ML divergence. |
| sim_time_sec : float |
| Wall-clock time the simulation took (seconds). |
| weather_override : dict or None |
| If not None, must contain 'target_angle' and optionally 'reason'. |
| harvest_active : bool |
| True if mechanical harvesting is in progress. |
| theta_astro : float |
| Astronomical tracking angle (safe default). |
| |
| Returns |
| ------- |
| dict with 'angle', 'source', 'reason' |
| """ |
| |
| if weather_override is not None: |
| return { |
| "angle": weather_override.get("target_angle", 0.0), |
| "source": CommandSource.WEATHER, |
| "reason": weather_override.get("reason", "weather override active"), |
| } |
|
|
| |
| if harvest_active: |
| return { |
| "angle": 90.0, |
| "source": CommandSource.HARVEST, |
| "reason": "mechanical harvesting in progress", |
| } |
|
|
| |
| if not safety_valid: |
| return { |
| "angle": theta_astro, |
| "source": CommandSource.SAFETY, |
| "reason": "FvCB/ML divergence exceeded threshold; reverting to astronomical", |
| } |
|
|
| |
| if sim_time_sec > SIMULATION_TIMEOUT_SEC: |
| return { |
| "angle": theta_astro, |
| "source": CommandSource.TIMEOUT, |
| "reason": f"simulation took {sim_time_sec:.1f}s > {SIMULATION_TIMEOUT_SEC}s limit", |
| } |
|
|
| |
| angle = engine_result.get("angle", theta_astro) |
| return { |
| "angle": angle, |
| "source": CommandSource.ENGINE, |
| "reason": engine_result.get("action", "tradeoff_engine"), |
| } |
|
|
| |
| |
| |
|
|
| def should_move( |
| self, |
| requested_angle: float, |
| timestamp: datetime, |
| ) -> ArbiterDecision: |
| """Apply hysteresis filter to a requested angle change. |
| |
| Motor protection logic: |
| - Suppresses changes smaller than angle_tolerance_deg. |
| - Requires the requested angle to be stable for hysteresis_window_min |
| before dispatching. |
| - Immediate dispatch if this is the first command or if the change |
| is large (e.g., weather stow). |
| """ |
| |
| self._buffer.append((timestamp, requested_angle)) |
|
|
| |
| cutoff = timestamp - pd.Timedelta(minutes=self.window_min) |
| self._buffer = [(t, a) for t, a in self._buffer if t >= cutoff] |
|
|
| |
| angle_diff = abs(requested_angle - self.current_angle) |
| if angle_diff <= self.tolerance: |
| return ArbiterDecision( |
| angle=self.current_angle, |
| dispatch=False, |
| source=CommandSource.HYSTERESIS, |
| requested_angle=requested_angle, |
| suppressed_reason=f"change {angle_diff:.1f}° ≤ tolerance {self.tolerance}°", |
| ) |
|
|
| |
| if len(self._buffer) < 2 or self._last_dispatch_time is None: |
| self.current_angle = requested_angle |
| self._last_dispatch_time = timestamp |
| return ArbiterDecision( |
| angle=requested_angle, |
| dispatch=True, |
| source=CommandSource.INITIAL, |
| requested_angle=requested_angle, |
| ) |
|
|
| |
| stable = all( |
| abs(a - requested_angle) <= self.tolerance |
| for _, a in self._buffer |
| ) |
|
|
| if stable: |
| self.current_angle = requested_angle |
| self._last_dispatch_time = timestamp |
| return ArbiterDecision( |
| angle=requested_angle, |
| dispatch=True, |
| source=CommandSource.STABLE, |
| requested_angle=requested_angle, |
| ) |
|
|
| return ArbiterDecision( |
| angle=self.current_angle, |
| dispatch=False, |
| source=CommandSource.HYSTERESIS, |
| requested_angle=requested_angle, |
| suppressed_reason="angle not stable within hysteresis window", |
| ) |
|
|
| |
| |
| |
|
|
| def arbitrate( |
| self, |
| timestamp: datetime, |
| engine_result: dict, |
| theta_astro: float, |
| safety_valid: bool = True, |
| sim_time_sec: float = 0.0, |
| weather_override: Optional[dict] = None, |
| harvest_active: bool = False, |
| ) -> ArbiterDecision: |
| """Full arbitration: priority selection → hysteresis filter. |
| |
| This is the main entry point for the 15-min control loop. |
| """ |
| selected = self.select_source( |
| engine_result=engine_result, |
| safety_valid=safety_valid, |
| sim_time_sec=sim_time_sec, |
| weather_override=weather_override, |
| harvest_active=harvest_active, |
| theta_astro=theta_astro, |
| ) |
|
|
| |
| if selected["source"] in {CommandSource.WEATHER, CommandSource.HARVEST}: |
| self.current_angle = selected["angle"] |
| self._last_dispatch_time = timestamp |
| self._buffer.clear() |
| return ArbiterDecision( |
| angle=selected["angle"], |
| dispatch=True, |
| source=selected["source"], |
| requested_angle=selected["angle"], |
| ) |
|
|
| |
| decision = self.should_move(selected["angle"], timestamp) |
| |
| if decision.dispatch: |
| decision.source = selected["source"] |
| return decision |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def check_wind_stow( |
| wind_speed_ms: float, |
| stow_threshold: float = WIND_STOW_SPEED_MS, |
| ) -> Optional[dict]: |
| """Return a weather override dict if wind speed exceeds stow threshold. |
| |
| Note: ControlLoop uses OperationalModeChecker instead of this method. |
| Kept for backward compatibility with direct arbiter usage. |
| """ |
| from src.operational_modes import check_wind_stow as _check |
| result = _check(wind_speed_ms, stow_threshold) |
| return result.to_weather_override() |
|
|
|
|
| class AstronomicalTracker: |
| """Pure sun-following. The always-safe default. |
| |
| Wraps ShadowModel to provide a simple get_angle(timestamp) interface. |
| """ |
|
|
| def __init__(self, shadow_model=None): |
| self._shadow_model = shadow_model |
|
|
| @property |
| def shadow_model(self): |
| if self._shadow_model is None: |
| from src.shading.solar_geometry import ShadowModel |
| self._shadow_model = ShadowModel() |
| return self._shadow_model |
|
|
| def get_angle(self, timestamp: datetime) -> float: |
| """Return the astronomical tracking angle for a given timestamp.""" |
| ts = pd.Timestamp(timestamp) |
| if ts.tzinfo is None: |
| ts = ts.tz_localize("UTC") |
| sp = self.shadow_model.get_solar_position( |
| pd.DatetimeIndex([ts]) |
| ) |
| elev = float(sp["solar_elevation"].iloc[0]) |
| if elev <= 0: |
| return 0.0 |
| azim = float(sp["solar_azimuth"].iloc[0]) |
| result = self.shadow_model.compute_tracker_tilt(azim, elev) |
| return float(result["tracker_theta"]) |
|
|