| """ |
| ControlLoop: the 15-minute agrivoltaic control cycle. |
| |
| Each tick: |
| 1. Fetch live sensor data (IMS weather + TB vine sensors) |
| 2. Load/validate the day-ahead plan for today |
| 3. Look up the planned offset for the current slot |
| 4. Run live gate check (may override plan if conditions diverged) |
| 5. Check energy budget (block intervention if budget exhausted) |
| 6. Run CommandArbiter (priority stack + hysteresis) |
| 7. Resolve per-tracker fleet overrides (rare; default = all same angle) |
| 8. Dispatch angle to trackers via TrackerDispatcher |
| 9. Spend energy budget for the slot |
| 10. Check plan divergence and trigger re-plan if needed |
| 11. Log the result |
| |
| The loop can run as: |
| - **one-shot**: ``loop.tick()`` — execute one cycle (called externally) |
| - **continuous**: ``loop.run()`` — blocking loop with 15-min sleep |
| - **plan-only**: ``loop.tick(dry_run=True)`` — compute decisions without sending |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from datetime import date, datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| import pandas as pd |
|
|
| from config.settings import ( |
| ANGLE_TOLERANCE_DEG, |
| DAILY_PLAN_PATH, |
| DP_SLOT_DURATION_MIN, |
| PLAN_DIVERGENCE_THRESHOLD_KWH, |
| PLAN_DIVERGENCE_THRESHOLD_SLOTS, |
| PLAN_REPLAN_COOLDOWN_SLOTS, |
| SIMULATION_LOG_PATH, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class TickResult: |
| """Output of a single control loop tick.""" |
|
|
| timestamp: datetime |
| slot_index: int |
| stage_id: str = "unknown" |
|
|
| |
| plan_offset_deg: float = 0.0 |
| plan_gate_passed: bool = False |
|
|
| |
| live_gate_passed: bool = False |
| live_override: bool = False |
| override_reason: Optional[str] = None |
|
|
| |
| target_angle: float = 0.0 |
| dispatch: bool = False |
| source: str = "" |
|
|
| |
| trackers_verified: int = 0 |
| trackers_total: int = 0 |
| dispatch_error: Optional[str] = None |
|
|
| |
| energy_cost_kwh: float = 0.0 |
|
|
| |
| budget_spent_kwh: float = 0.0 |
| budget_remaining_kwh: float = 0.0 |
|
|
| |
| model_route: str = "" |
|
|
| |
| fleet_overrides: Optional[Dict[str, float]] = None |
|
|
| |
| divergence_cumulative_kwh: float = 0.0 |
| divergence_consecutive: int = 0 |
| replan_triggered: bool = False |
|
|
| |
| air_temp_c: Optional[float] = None |
| ghi_w_m2: Optional[float] = None |
| wind_speed_ms: Optional[float] = None |
|
|
| def to_dict(self) -> dict: |
| return {k: (v.isoformat() if isinstance(v, datetime) else v) |
| for k, v in self.__dict__.items()} |
|
|
|
|
| |
| |
| |
|
|
| class ControlLoop: |
| """15-minute agrivoltaic control loop. |
| |
| Parameters |
| ---------- |
| dry_run : bool |
| If True, compute decisions but don't send commands to trackers. |
| plan_path : Path |
| Path to the day-ahead plan JSON file. |
| log_path : Path |
| Path for simulation log output. |
| """ |
|
|
| def __init__( |
| self, |
| dry_run: bool = True, |
| plan_path: Path = DAILY_PLAN_PATH, |
| log_path: Path = SIMULATION_LOG_PATH, |
| ): |
| self.dry_run = dry_run |
| self.plan_path = plan_path |
| self.log_path = log_path |
|
|
| |
| self._arbiter = None |
| self._dispatcher = None |
| self._astro = None |
| self._hub = None |
| self._modes = None |
| self._fleet = None |
| self._schedulers: Dict[str, object] = {} |
| self._budget_planner = None |
| self._router = None |
| self._current_plan: Optional[dict] = None |
| self._tick_log: List[dict] = [] |
|
|
| |
| self._daily_budget_plan: Optional[dict] = None |
| self._daily_budget_date: Optional[date] = None |
|
|
| |
| self._divergence_cumulative_kwh: float = 0.0 |
| self._divergence_consecutive: int = 0 |
| self._last_replan_slot: int = -99 |
| self._replan_count: int = 0 |
|
|
| |
| |
| |
|
|
| @property |
| def arbiter(self): |
| if self._arbiter is None: |
| from src.command_arbiter import CommandArbiter |
| self._arbiter = CommandArbiter() |
| return self._arbiter |
|
|
| @property |
| def dispatcher(self): |
| if self._dispatcher is None: |
| from src.tracker_dispatcher import TrackerDispatcher |
| self._dispatcher = TrackerDispatcher(dry_run=self.dry_run) |
| return self._dispatcher |
|
|
| @property |
| def astro(self): |
| if self._astro is None: |
| from src.command_arbiter import AstronomicalTracker |
| self._astro = AstronomicalTracker() |
| return self._astro |
|
|
| @property |
| def hub(self): |
| if self._hub is None: |
| from src.data.data_providers import DataHub |
| self._hub = DataHub.default() |
| return self._hub |
|
|
| @property |
| def modes(self): |
| if self._modes is None: |
| from src.operational_modes import OperationalModeChecker |
| self._modes = OperationalModeChecker() |
| return self._modes |
|
|
| @property |
| def fleet(self): |
| if self._fleet is None: |
| from src.tracker_fleet import TrackerFleet |
| self._fleet = TrackerFleet() |
| return self._fleet |
|
|
| @property |
| def budget_planner(self): |
| if self._budget_planner is None: |
| from src.energy_budget import EnergyBudgetPlanner |
| self._budget_planner = EnergyBudgetPlanner() |
| return self._budget_planner |
|
|
| @property |
| def router(self): |
| if self._router is None: |
| from src.chatbot.routing_agent import RoutingAgent |
| self._router = RoutingAgent() |
| return self._router |
|
|
| |
| |
| |
|
|
| def _build_persistence_forecast(self) -> tuple[list[float], list[float]]: |
| """Build 96-slot temp/GHI forecast from last available IMS day.""" |
| ims_df = self.hub.weather.get_dataframe() |
| if ims_df.empty: |
| return [25.0] * 96, [0.0] * 96 |
|
|
| df = ims_df.copy() |
| if "timestamp_utc" in df.columns: |
| df["timestamp_utc"] = pd.to_datetime(df["timestamp_utc"], utc=True) |
| df = df.set_index("timestamp_utc") |
|
|
| last_day = df.index.max().normalize() |
| day_data = df[df.index.normalize() == last_day] |
| if len(day_data) < 10: |
| last_day -= pd.Timedelta(days=1) |
| day_data = df[df.index.normalize() == last_day] |
|
|
| temps = [25.0] * 96 |
| ghis = [0.0] * 96 |
| for _, row in day_data.iterrows(): |
| slot = row.name.hour * 4 + row.name.minute // 15 |
| if 0 <= slot < 96: |
| t = row.get("air_temperature_c") |
| if pd.notna(t): |
| temps[slot] = float(t) |
| g = row.get("ghi_w_m2") |
| if pd.notna(g): |
| ghis[slot] = float(g) |
| return temps, ghis |
|
|
| def _compute_daily_budget(self, target: date) -> float: |
| """Compute the daily energy budget from the annual/monthly hierarchy.""" |
| annual = self.budget_planner.compute_annual_plan(target.year) |
| month_budget = annual["monthly_budgets"].get(target.month, 0.5) |
| weekly = self.budget_planner.compute_weekly_plan(target, month_budget) |
| dow = target.weekday() |
| return weekly["daily_budgets_kWh"][min(dow, 6)] |
|
|
| def load_plan(self, target_date: Optional[date] = None) -> Optional[dict]: |
| """Load the day-ahead plan for the given date.""" |
| target = target_date or date.today() |
|
|
| |
| if self.plan_path.exists(): |
| try: |
| with open(self.plan_path) as f: |
| plan = json.load(f) |
| if plan.get("target_date") == str(target): |
| self._current_plan = plan |
| logger.info("Loaded plan for %s (%d slots)", |
| target, len(plan.get("slots", []))) |
| return plan |
| except Exception as exc: |
| logger.warning("Failed to load plan from %s: %s", self.plan_path, exc) |
|
|
| |
| try: |
| from src.day_ahead_planner import DayAheadPlanner |
|
|
| temps, ghis = self._build_persistence_forecast() |
| daily_budget = self._compute_daily_budget(target) |
|
|
| planner = DayAheadPlanner() |
| plan_obj = planner.plan_day(target, temps, ghis, max(daily_budget, 0.1)) |
| plan = plan_obj.to_dict() |
|
|
| |
| self.plan_path.parent.mkdir(parents=True, exist_ok=True) |
| with open(self.plan_path, "w") as f: |
| json.dump(plan, f, indent=2) |
|
|
| self._current_plan = plan |
| return plan |
|
|
| except Exception as exc: |
| logger.error("Plan generation failed: %s", exc) |
| return None |
|
|
| def _get_slot_plan(self, slot_index: int) -> Optional[dict]: |
| """Look up the planned offset for a given slot.""" |
| if not self._current_plan: |
| return None |
| slots = self._current_plan.get("slots", []) |
| for s in slots: |
| t = s.get("time", "") |
| try: |
| h, m = map(int, t.split(":")) |
| s_idx = h * 4 + m // 15 |
| if s_idx == slot_index: |
| return s |
| except (ValueError, AttributeError): |
| continue |
| return None |
|
|
| |
| |
| |
|
|
| def _ensure_daily_budget(self, today: date) -> Optional[dict]: |
| """Load or reuse the daily slot-level budget plan.""" |
| if self._daily_budget_plan and self._daily_budget_date == today: |
| return self._daily_budget_plan |
|
|
| |
| try: |
| from src.data.redis_cache import get_redis |
| redis = get_redis() |
| if redis: |
| cached = redis.get_json("control:budget") |
| if cached and cached.get("date") == str(today): |
| self._daily_budget_plan = cached["plan"] |
| self._daily_budget_date = today |
| logger.info("Restored daily budget from Redis for %s", today) |
| return self._daily_budget_plan |
| except Exception: |
| pass |
|
|
| try: |
| daily_budget = self._compute_daily_budget(today) |
| self._daily_budget_plan = self.budget_planner.compute_daily_plan( |
| today, daily_budget, |
| ) |
| self._daily_budget_date = today |
|
|
| |
| self._divergence_cumulative_kwh = 0.0 |
| self._divergence_consecutive = 0 |
| self._last_replan_slot = -99 |
|
|
| |
| self._persist_budget(today) |
|
|
| return self._daily_budget_plan |
| except Exception as exc: |
| logger.warning("Failed to compute daily budget: %s", exc) |
| return None |
|
|
| def _persist_budget(self, today: date) -> None: |
| """Save daily budget state to Redis for cross-process access.""" |
| try: |
| from src.data.redis_cache import get_redis |
| import json as _json |
| redis = get_redis() |
| if redis and self._daily_budget_plan: |
| payload = { |
| "date": str(today), |
| "plan": _json.loads(_json.dumps(self._daily_budget_plan, default=str)), |
| } |
| redis.set_json("control:budget", payload, ttl=86400) |
| except Exception as exc: |
| logger.debug("Budget Redis persist failed: %s", exc) |
|
|
| @staticmethod |
| def _slot_key(now: datetime) -> str: |
| """Format a datetime as a slot key like '10:15'.""" |
| return f"{now.hour:02d}:{(now.minute // 15) * 15:02d}" |
|
|
| |
| |
| |
|
|
| def _resolve_fleet_overrides( |
| self, now: datetime, theta_astro: float, |
| ) -> Dict[str, float]: |
| """Resolve per-tracker angle overrides from TrackerFleet assignments. |
| |
| Returns an empty dict in the common case (all trackers follow the |
| arbiter's angle). Only returns overrides for trackers that have |
| an explicit non-tracking assignment active right now. |
| """ |
| from src.tracker_fleet import tracker_id_to_name |
| from src.tracker_scheduler import TrackerScheduler, PLAN_LIBRARY |
|
|
| overrides: Dict[str, float] = {} |
| try: |
| best = self.fleet.get_all_best_assignments(now) |
| except Exception as exc: |
| logger.debug("Fleet assignment lookup skipped: %s", exc) |
| return overrides |
|
|
| for tracker_id, assignment in best.items(): |
| if assignment is None: |
| continue |
|
|
| plan_id = assignment.plan_id |
| |
| if plan_id not in self._schedulers: |
| if assignment.plan_file: |
| plan_path = Path(assignment.plan_file) |
| if plan_path.exists(): |
| self._schedulers[plan_id] = TrackerScheduler( |
| plan_file=plan_path, |
| ) |
| else: |
| logger.warning("Plan file not found: %s", plan_path) |
| continue |
| elif plan_id in PLAN_LIBRARY: |
| self._schedulers[plan_id] = TrackerScheduler( |
| plan_data=PLAN_LIBRARY[plan_id], |
| ) |
| else: |
| logger.debug("Unknown plan_id %r, skipping", plan_id) |
| continue |
|
|
| sched = self._schedulers[plan_id] |
| event = sched.get_event(now) |
| if event is None: |
| continue |
|
|
| mode = event.get("mode") |
| event_angle = event.get("angle") |
|
|
| if mode == "tracking" or mode is None: |
| |
| continue |
| elif mode == "antiTracking" and event_angle is not None: |
| overrides[tracker_id_to_name(tracker_id)] = theta_astro + event_angle |
| elif mode == "fixed_angle" and event_angle is not None: |
| overrides[tracker_id_to_name(tracker_id)] = event_angle |
|
|
| return overrides |
|
|
| |
| |
| |
|
|
| def _check_plan_divergence( |
| self, |
| slot_index: int, |
| planned_offset: float, |
| actual_offset: float, |
| planned_cost: float, |
| actual_cost: float, |
| ) -> bool: |
| """Track divergence between plan and execution. Return True if re-plan needed.""" |
| cost_diff = abs(planned_cost - actual_cost) |
| offset_diverged = abs(planned_offset - actual_offset) > ANGLE_TOLERANCE_DEG |
|
|
| self._divergence_cumulative_kwh += cost_diff |
|
|
| if offset_diverged: |
| self._divergence_consecutive += 1 |
| else: |
| self._divergence_consecutive = 0 |
|
|
| |
| if slot_index - self._last_replan_slot < PLAN_REPLAN_COOLDOWN_SLOTS: |
| return False |
|
|
| if self._divergence_cumulative_kwh >= PLAN_DIVERGENCE_THRESHOLD_KWH: |
| logger.warning( |
| "Cumulative divergence %.3f kWh >= %.3f threshold; triggering re-plan", |
| self._divergence_cumulative_kwh, PLAN_DIVERGENCE_THRESHOLD_KWH, |
| ) |
| return True |
|
|
| if self._divergence_consecutive >= PLAN_DIVERGENCE_THRESHOLD_SLOTS: |
| logger.warning( |
| "%d consecutive divergent slots >= %d threshold; triggering re-plan", |
| self._divergence_consecutive, PLAN_DIVERGENCE_THRESHOLD_SLOTS, |
| ) |
| return True |
|
|
| return False |
|
|
| def _trigger_replan(self, now: datetime, slot_index: int) -> bool: |
| """Re-generate the day-ahead plan from the current slot onward.""" |
| today = now.date() |
| daily_bp = self._ensure_daily_budget(today) |
| spent = daily_bp["cumulative_spent"] if daily_bp else 0.0 |
| remaining = (daily_bp["daily_total_kWh"] - spent) if daily_bp else 0.0 |
|
|
| if remaining <= 0: |
| logger.info("Re-plan skipped: no budget remaining") |
| return False |
|
|
| try: |
| from src.day_ahead_planner import DayAheadPlanner |
|
|
| temps, ghis = self._build_persistence_forecast() |
|
|
| planner = DayAheadPlanner() |
| plan_obj = planner.plan_day(today, temps, ghis, max(remaining, 0.01)) |
| plan = plan_obj.to_dict() |
|
|
| |
| self.plan_path.parent.mkdir(parents=True, exist_ok=True) |
| with open(self.plan_path, "w") as f: |
| json.dump(plan, f, indent=2) |
|
|
| self._current_plan = plan |
| self._last_replan_slot = slot_index |
| self._divergence_cumulative_kwh = 0.0 |
| self._divergence_consecutive = 0 |
| self._replan_count += 1 |
|
|
| n_slots = len(plan.get("slots", [])) |
| logger.info( |
| "Re-plan #%d at slot %d: %d slots, %.4f kWh remaining budget", |
| self._replan_count, slot_index, n_slots, remaining, |
| ) |
| return True |
| except Exception as exc: |
| logger.error("Re-plan failed: %s", exc) |
| return False |
|
|
| |
| |
| |
|
|
| def tick(self, timestamp: Optional[datetime] = None) -> TickResult: |
| """Execute one control loop cycle. |
| |
| Parameters |
| ---------- |
| timestamp : datetime, optional |
| Override current time (for simulation/replay). |
| """ |
| now = timestamp or datetime.now(tz=timezone.utc) |
| slot_index = now.hour * 4 + now.minute // 15 |
|
|
| result = TickResult(timestamp=now, slot_index=slot_index) |
|
|
| |
| today = now.date() if hasattr(now, 'date') else date.today() |
| if (not self._current_plan or |
| self._current_plan.get("target_date") != str(today)): |
| self.load_plan(today) |
|
|
| |
| try: |
| wx = self.hub.weather.get_current() |
| if "error" not in wx: |
| result.air_temp_c = wx.get("air_temperature_c") |
| result.ghi_w_m2 = wx.get("ghi_w_m2") |
| result.wind_speed_ms = wx.get("wind_speed_ms") |
| except Exception as exc: |
| logger.warning("Weather fetch failed: %s", exc) |
|
|
| |
| try: |
| telemetry = { |
| "temp_c": result.air_temp_c, |
| "ghi_w_m2": result.ghi_w_m2, |
| "hour": now.hour, |
| } |
| result.model_route = self.router.route(telemetry) |
| except Exception as exc: |
| logger.debug("Model routing failed: %s", exc) |
| result.model_route = "fvcb" |
|
|
| |
| theta_astro = self.astro.get_angle(now) |
|
|
| |
| slot_plan = self._get_slot_plan(slot_index) |
| if slot_plan: |
| result.plan_offset_deg = slot_plan.get("offset_deg", 0.0) |
| result.plan_gate_passed = slot_plan.get("gate_passed", False) |
| result.energy_cost_kwh = slot_plan.get("energy_cost_kwh", 0.0) |
| result.stage_id = self._current_plan.get("stage_id", "unknown") |
| else: |
| logger.debug("No plan slot for index %d — defaulting to astronomical", slot_index) |
|
|
| |
| |
| |
| |
| planned_offset = result.plan_offset_deg |
| live_offset = planned_offset |
|
|
| if result.air_temp_c is not None: |
| from config.settings import ( |
| NO_SHADE_BEFORE_HOUR, |
| SEMILLON_TRANSITION_TEMP_C, |
| SHADE_ELIGIBLE_GHI_ABOVE, |
| ) |
|
|
| if planned_offset > 0: |
| blocked = False |
| reason = "" |
|
|
| if now.hour < NO_SHADE_BEFORE_HOUR: |
| blocked, reason = True, "morning — no shading before 10:00" |
| elif result.air_temp_c < SEMILLON_TRANSITION_TEMP_C: |
| blocked, reason = True, f"temp {result.air_temp_c:.0f}°C < {SEMILLON_TRANSITION_TEMP_C:.0f}°C" |
| elif result.ghi_w_m2 is not None and result.ghi_w_m2 < SHADE_ELIGIBLE_GHI_ABOVE: |
| blocked, reason = True, f"GHI {result.ghi_w_m2:.0f} < {SHADE_ELIGIBLE_GHI_ABOVE:.0f}" |
|
|
| if blocked: |
| live_offset = 0.0 |
| result.live_override = True |
| result.override_reason = reason |
| logger.info("Live override: plan offset %.0f° → 0° (%s)", |
| planned_offset, reason) |
|
|
| result.live_gate_passed = live_offset > 0 |
|
|
| |
| if live_offset > 0: |
| daily_bp = self._ensure_daily_budget(today) |
| if daily_bp: |
| sk = self._slot_key(now) |
| slot_remaining = daily_bp["slot_budgets"].get(sk, 0.0) |
| margin_remaining = daily_bp["daily_margin_remaining_kWh"] |
| if slot_remaining + margin_remaining <= 0: |
| live_offset = 0.0 |
| result.live_override = True |
| result.override_reason = "daily energy budget exhausted" |
| logger.info("Budget guard: forcing astronomical (budget depleted)") |
|
|
| |
| target_angle = theta_astro + live_offset |
| engine_result = { |
| "angle": target_angle, |
| "action": f"plan_offset_{live_offset:.0f}deg", |
| } |
|
|
| |
| mode_override = self.modes.check_all( |
| wind_speed_ms=result.wind_speed_ms, |
| air_temp_c=result.air_temp_c, |
| theta_astro=theta_astro, |
| current_date=today, |
| ) |
| weather_override = mode_override.to_weather_override() if mode_override else None |
|
|
| |
| decision = self.arbiter.arbitrate( |
| timestamp=now, |
| engine_result=engine_result, |
| theta_astro=theta_astro, |
| weather_override=weather_override, |
| ) |
|
|
| result.target_angle = decision.angle |
| result.dispatch = decision.dispatch |
| result.source = decision.source.value if hasattr(decision.source, 'value') else str(decision.source) |
|
|
| |
| fleet_overrides = self._resolve_fleet_overrides(now, theta_astro) |
| if fleet_overrides: |
| result.fleet_overrides = fleet_overrides |
| logger.info("Fleet overrides active: %s", fleet_overrides) |
|
|
| |
| if decision.dispatch: |
| try: |
| dispatch_result = self.dispatcher.dispatch( |
| decision, angle_overrides=fleet_overrides or None, |
| ) |
| result.trackers_verified = dispatch_result.n_success |
| result.trackers_total = len(dispatch_result.trackers) |
| if not dispatch_result.all_verified: |
| failed = [t.device_name for t in dispatch_result.trackers if not t.verified] |
| result.dispatch_error = f"failed: {', '.join(failed)}" |
| except Exception as exc: |
| result.dispatch_error = str(exc) |
| logger.error("Dispatch failed: %s", exc) |
|
|
| |
| if result.energy_cost_kwh > 0: |
| daily_bp = self._ensure_daily_budget(today) |
| if daily_bp: |
| sk = self._slot_key(now) |
| result.budget_spent_kwh = self.budget_planner.spend_slot( |
| daily_bp, sk, result.energy_cost_kwh, |
| ) |
| result.budget_remaining_kwh = ( |
| sum(daily_bp["slot_budgets"].values()) |
| + daily_bp["daily_margin_remaining_kWh"] |
| ) |
| |
| self._persist_budget(today) |
|
|
| if result.budget_spent_kwh < result.energy_cost_kwh: |
| logger.warning( |
| "Budget shortfall: requested %.4f kWh, spent %.4f kWh (slot %s)", |
| result.energy_cost_kwh, result.budget_spent_kwh, sk, |
| ) |
|
|
| |
| if slot_plan: |
| actual_offset = live_offset if not result.live_override else 0.0 |
| needs_replan = self._check_plan_divergence( |
| slot_index=slot_index, |
| planned_offset=result.plan_offset_deg, |
| actual_offset=actual_offset, |
| planned_cost=slot_plan.get("energy_cost_kwh", 0.0), |
| actual_cost=result.energy_cost_kwh, |
| ) |
| result.divergence_cumulative_kwh = self._divergence_cumulative_kwh |
| result.divergence_consecutive = self._divergence_consecutive |
| if needs_replan: |
| result.replan_triggered = self._trigger_replan(now, slot_index) |
|
|
| |
| self._tick_log.append(result.to_dict()) |
| logger.info( |
| "Tick %02d:%02d slot=%d angle=%.1f° offset=%.0f° dispatch=%s source=%s" |
| " budget_remaining=%.3f kWh%s", |
| now.hour, now.minute, slot_index, decision.angle, |
| live_offset, decision.dispatch, decision.source, |
| result.budget_remaining_kwh, |
| f" [OVERRIDE: {result.override_reason}]" if result.live_override else "", |
| ) |
|
|
| return result |
|
|
| |
| |
| |
|
|
| def run(self, max_ticks: Optional[int] = None) -> None: |
| """Run the control loop continuously (blocking). |
| |
| Parameters |
| ---------- |
| max_ticks : int, optional |
| Stop after this many ticks (for testing). None = run forever. |
| """ |
| logger.info("Control loop starting (dry_run=%s)", self.dry_run) |
| tick_count = 0 |
|
|
| while max_ticks is None or tick_count < max_ticks: |
| try: |
| result = self.tick() |
| tick_count += 1 |
| except Exception as exc: |
| logger.error("Tick failed: %s", exc) |
|
|
| |
| now = datetime.now(tz=timezone.utc) |
| next_slot = now.replace( |
| minute=(now.minute // DP_SLOT_DURATION_MIN + 1) * DP_SLOT_DURATION_MIN % 60, |
| second=0, microsecond=0, |
| ) |
| if next_slot <= now: |
| next_slot += timedelta(hours=1) |
| sleep_sec = (next_slot - now).total_seconds() |
| logger.debug("Sleeping %.0f s until %s", sleep_sec, next_slot) |
| time.sleep(max(sleep_sec, 1.0)) |
|
|
| |
| |
| |
|
|
| def get_log(self) -> List[dict]: |
| """Return all tick results from this session.""" |
| return list(self._tick_log) |
|
|
| def save_log(self, path: Optional[Path] = None) -> Path: |
| """Save tick log to JSON file.""" |
| out = path or self.log_path.with_suffix(".json") |
| out.parent.mkdir(parents=True, exist_ok=True) |
| with open(out, "w") as f: |
| json.dump(self._tick_log, f, indent=2, default=str) |
| logger.info("Saved %d tick results to %s", len(self._tick_log), out) |
| return out |
|
|