api / src /command_arbiter.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
CommandArbiter: priority stack, hysteresis, and fallback logic for tracker commands.
Sits between the TradeoffEngine output and the physical tracker actuator.
Ensures:
1. Weather protection and harvest mode override everything.
2. Safety rail alerts and simulation timeouts fall back to θ_astro.
3. Hysteresis prevents sub-slot jitter (motor protection).
4. All fallbacks default to full astronomical tracking (zero energy cost).
Priority Stack (highest to lowest):
P1 Weather Protection → stow angle (flat, 0°)
P2 Mechanical Harvest → vertical park (90°)
P3 Safety Rail Alert → θ_astro
P4 Simulation Timeout → θ_astro
P5 TradeoffEngine → θ_astro or θ_astro + offset
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import pandas as pd
from config.settings import (
ANGLE_TOLERANCE_DEG,
HYSTERESIS_WINDOW_MIN,
SIMULATION_TIMEOUT_SEC,
WIND_STOW_SPEED_MS,
)
class CommandSource(str, Enum):
"""Priority source identifiers for tracker commands."""
WEATHER = "weather_protection"
HARVEST = "harvest_mode"
SAFETY = "safety_fallback"
TIMEOUT = "timeout_fallback"
ENGINE = "engine"
HYSTERESIS = "hysteresis"
INITIAL = "initial"
STABLE = "stable"
@dataclass
class ArbiterDecision:
"""Output of the CommandArbiter."""
angle: float # final tracker tilt angle (degrees)
dispatch: bool # True = send command to actuator
source: str # which priority level decided
requested_angle: float = 0.0 # what was originally requested
suppressed_reason: Optional[str] = None # why dispatch=False (if suppressed)
def decision_tags(self) -> list[str]:
tags = [f"source:{self.source}"]
if not self.dispatch and self.suppressed_reason:
tags.append(f"suppressed:{self.suppressed_reason}")
return tags
class CommandArbiter:
"""Priority stack + hysteresis for tracker tilt commands.
Parameters
----------
hysteresis_window_min : float
Minimum time (minutes) between consecutive tilt changes.
angle_tolerance_deg : float
Changes smaller than this are suppressed (motor protection).
"""
def __init__(
self,
hysteresis_window_min: float = HYSTERESIS_WINDOW_MIN,
angle_tolerance_deg: float = ANGLE_TOLERANCE_DEG,
):
self.window_min = hysteresis_window_min
self.tolerance = angle_tolerance_deg
self._buffer: list[tuple[datetime, float]] = []
self.current_angle: float = 0.0
self._last_dispatch_time: Optional[datetime] = None
# ------------------------------------------------------------------
# Priority selection
# ------------------------------------------------------------------
def select_source(
self,
engine_result: dict,
safety_valid: bool = True,
sim_time_sec: float = 0.0,
weather_override: Optional[dict] = None,
harvest_active: bool = False,
theta_astro: float = 0.0,
) -> dict:
"""Select the highest-priority command source.
Parameters
----------
engine_result : dict
Output from TradeoffEngine.evaluate_slot() or find_minimum_dose().
Must contain 'angle' key (or 'chosen_offset_deg' for DoseResult).
safety_valid : bool
False if SafetyRails detected FvCB/ML divergence.
sim_time_sec : float
Wall-clock time the simulation took (seconds).
weather_override : dict or None
If not None, must contain 'target_angle' and optionally 'reason'.
harvest_active : bool
True if mechanical harvesting is in progress.
theta_astro : float
Astronomical tracking angle (safe default).
Returns
-------
dict with 'angle', 'source', 'reason'
"""
# P1: Weather protection (wind stow, hail, etc.)
if weather_override is not None:
return {
"angle": weather_override.get("target_angle", 0.0),
"source": CommandSource.WEATHER,
"reason": weather_override.get("reason", "weather override active"),
}
# P2: Mechanical harvesting — panels go vertical for clearance
if harvest_active:
return {
"angle": 90.0,
"source": CommandSource.HARVEST,
"reason": "mechanical harvesting in progress",
}
# P3: Safety rail alert — FvCB/ML divergence too high
if not safety_valid:
return {
"angle": theta_astro,
"source": CommandSource.SAFETY,
"reason": "FvCB/ML divergence exceeded threshold; reverting to astronomical",
}
# P4: Simulation timeout — shadow model took too long
if sim_time_sec > SIMULATION_TIMEOUT_SEC:
return {
"angle": theta_astro,
"source": CommandSource.TIMEOUT,
"reason": f"simulation took {sim_time_sec:.1f}s > {SIMULATION_TIMEOUT_SEC}s limit",
}
# P5: Normal — use TradeoffEngine result
angle = engine_result.get("angle", theta_astro)
return {
"angle": angle,
"source": CommandSource.ENGINE,
"reason": engine_result.get("action", "tradeoff_engine"),
}
# ------------------------------------------------------------------
# Hysteresis filter
# ------------------------------------------------------------------
def should_move(
self,
requested_angle: float,
timestamp: datetime,
) -> ArbiterDecision:
"""Apply hysteresis filter to a requested angle change.
Motor protection logic:
- Suppresses changes smaller than angle_tolerance_deg.
- Requires the requested angle to be stable for hysteresis_window_min
before dispatching.
- Immediate dispatch if this is the first command or if the change
is large (e.g., weather stow).
"""
# Record request in buffer
self._buffer.append((timestamp, requested_angle))
# Trim buffer to window
cutoff = timestamp - pd.Timedelta(minutes=self.window_min)
self._buffer = [(t, a) for t, a in self._buffer if t >= cutoff]
# Change smaller than tolerance → suppress
angle_diff = abs(requested_angle - self.current_angle)
if angle_diff <= self.tolerance:
return ArbiterDecision(
angle=self.current_angle,
dispatch=False,
source=CommandSource.HYSTERESIS,
requested_angle=requested_angle,
suppressed_reason=f"change {angle_diff:.1f}° ≤ tolerance {self.tolerance}°",
)
# First command or only one entry in buffer → dispatch immediately
if len(self._buffer) < 2 or self._last_dispatch_time is None:
self.current_angle = requested_angle
self._last_dispatch_time = timestamp
return ArbiterDecision(
angle=requested_angle,
dispatch=True,
source=CommandSource.INITIAL,
requested_angle=requested_angle,
)
# Check stability: all recent entries must agree within tolerance
stable = all(
abs(a - requested_angle) <= self.tolerance
for _, a in self._buffer
)
if stable:
self.current_angle = requested_angle
self._last_dispatch_time = timestamp
return ArbiterDecision(
angle=requested_angle,
dispatch=True,
source=CommandSource.STABLE,
requested_angle=requested_angle,
)
return ArbiterDecision(
angle=self.current_angle,
dispatch=False,
source=CommandSource.HYSTERESIS,
requested_angle=requested_angle,
suppressed_reason="angle not stable within hysteresis window",
)
# ------------------------------------------------------------------
# Combined: select + filter
# ------------------------------------------------------------------
def arbitrate(
self,
timestamp: datetime,
engine_result: dict,
theta_astro: float,
safety_valid: bool = True,
sim_time_sec: float = 0.0,
weather_override: Optional[dict] = None,
harvest_active: bool = False,
) -> ArbiterDecision:
"""Full arbitration: priority selection → hysteresis filter.
This is the main entry point for the 15-min control loop.
"""
selected = self.select_source(
engine_result=engine_result,
safety_valid=safety_valid,
sim_time_sec=sim_time_sec,
weather_override=weather_override,
harvest_active=harvest_active,
theta_astro=theta_astro,
)
# Weather and harvest overrides bypass hysteresis (safety-critical)
if selected["source"] in {CommandSource.WEATHER, CommandSource.HARVEST}:
self.current_angle = selected["angle"]
self._last_dispatch_time = timestamp
self._buffer.clear()
return ArbiterDecision(
angle=selected["angle"],
dispatch=True,
source=selected["source"],
requested_angle=selected["angle"],
)
# Normal path: apply hysteresis
decision = self.should_move(selected["angle"], timestamp)
# Override source with the priority level that selected the angle
if decision.dispatch:
decision.source = selected["source"]
return decision
# ------------------------------------------------------------------
# Wind stow helper (delegates to operational_modes)
# ------------------------------------------------------------------
@staticmethod
def check_wind_stow(
wind_speed_ms: float,
stow_threshold: float = WIND_STOW_SPEED_MS,
) -> Optional[dict]:
"""Return a weather override dict if wind speed exceeds stow threshold.
Note: ControlLoop uses OperationalModeChecker instead of this method.
Kept for backward compatibility with direct arbiter usage.
"""
from src.operational_modes import check_wind_stow as _check
result = _check(wind_speed_ms, stow_threshold)
return result.to_weather_override()
class AstronomicalTracker:
"""Pure sun-following. The always-safe default.
Wraps ShadowModel to provide a simple get_angle(timestamp) interface.
"""
def __init__(self, shadow_model=None):
self._shadow_model = shadow_model
@property
def shadow_model(self):
if self._shadow_model is None:
from src.shading.solar_geometry import ShadowModel
self._shadow_model = ShadowModel()
return self._shadow_model
def get_angle(self, timestamp: datetime) -> float:
"""Return the astronomical tracking angle for a given timestamp."""
ts = pd.Timestamp(timestamp)
if ts.tzinfo is None:
ts = ts.tz_localize("UTC")
sp = self.shadow_model.get_solar_position(
pd.DatetimeIndex([ts])
)
elev = float(sp["solar_elevation"].iloc[0])
if elev <= 0:
return 0.0
azim = float(sp["solar_azimuth"].iloc[0])
result = self.shadow_model.compute_tracker_tilt(azim, elev)
return float(result["tracker_theta"])