""" TrackerFleet: fleet-level plan assignment and tracker management. Manages which tracking plan is assigned to which tracker, with a three-level scope hierarchy: Tracker-level > Line-level > Vineyard-level The highest-specificity, highest-priority active assignment wins. This module provides both: - A file-based assignment store (JSON) for standalone / development use - A DB-ready interface matching the tracker repo's PlanAssignmentManager schema Adapted from the tracker repo's TrackerManager + PlanAssignmentManager, stripped of async MySQL dependency — uses file-based storage by default. """ from __future__ import annotations import json import logging from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional from config.settings import DATA_DIR, TRACKER_ID_MAP logger = logging.getLogger(__name__) ASSIGNMENTS_FILE = DATA_DIR / "tracker_assignments.json" # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass class PlanAssignment: """A plan assigned to a scope (tracker / line / vineyard).""" assignment_id: int plan_id: str # plan name or ID plan_file: Optional[str] = None # JSON plan filename vineyard_id: int = 1 # default vineyard line_id: Optional[int] = None # None = vineyard-level tracker_id: Optional[int] = None # None = line or vineyard-level priority: int = 0 active_from: Optional[str] = None # ISO datetime active_until: Optional[str] = None # ISO datetime, None = no expiry reason: str = "" @property def scope(self) -> str: if self.tracker_id is not None: return "tracker" if self.line_id is not None: return "line" return "vineyard" @property def scope_priority(self) -> int: """Higher = more specific.""" if self.tracker_id is not None: return 2 if self.line_id is not None: return 1 return 0 def is_active(self, now: Optional[datetime] = None) -> bool: now = now or datetime.now(tz=timezone.utc) if self.active_from: start = datetime.fromisoformat(self.active_from) if now < start: return False if self.active_until: end = datetime.fromisoformat(self.active_until) if now >= end: return False return True def to_dict(self) -> dict: return { "assignment_id": self.assignment_id, "plan_id": self.plan_id, "plan_file": self.plan_file, "vineyard_id": self.vineyard_id, "line_id": self.line_id, "tracker_id": self.tracker_id, "priority": self.priority, "active_from": self.active_from, "active_until": self.active_until, "reason": self.reason, } @classmethod def from_dict(cls, d: dict) -> PlanAssignment: return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) # --------------------------------------------------------------------------- # Fleet manager (file-based) # --------------------------------------------------------------------------- # Tracker ↔ line mapping for Yeruham vineyard TRACKER_LINES = { 501: 501, # Tracker501 on line 501 502: 502, 503: 503, 509: 509, } def tracker_id_to_name(tracker_id: int) -> str: """Convert integer tracker ID to ThingsBoard device name.""" name = TRACKER_ID_MAP.get(tracker_id) if name is None: raise KeyError(f"Unknown tracker ID: {tracker_id}") return name def tracker_name_to_id(name: str) -> int: """Convert ThingsBoard device name to integer tracker ID.""" for tid, tname in TRACKER_ID_MAP.items(): if tname == name: return tid raise KeyError(f"Unknown tracker name: {name}") class TrackerFleet: """Manage plan assignments for a fleet of trackers. File-based storage for development; can be swapped to DB backend. Parameters ---------- assignments_file : Path JSON file storing active assignments. """ def __init__(self, assignments_file: Path | str = ASSIGNMENTS_FILE): self._file = Path(assignments_file) self._assignments: List[PlanAssignment] = [] self._next_id = 1 self._load() def _load(self) -> None: if self._file.exists(): try: with open(self._file) as f: data = json.load(f) self._assignments = [PlanAssignment.from_dict(d) for d in data] if self._assignments: self._next_id = max(a.assignment_id for a in self._assignments) + 1 logger.info("Loaded %d assignments from %s", len(self._assignments), self._file) except Exception as exc: logger.warning("Failed to load assignments: %s", exc) self._assignments = [] else: self._assignments = [] def _save(self) -> None: self._file.parent.mkdir(parents=True, exist_ok=True) with open(self._file, "w") as f: json.dump([a.to_dict() for a in self._assignments], f, indent=2) # ------------------------------------------------------------------ # Query # ------------------------------------------------------------------ def get_active_assignments( self, now: Optional[datetime] = None, ) -> List[PlanAssignment]: """Return all currently active assignments, sorted by priority.""" now = now or datetime.now(tz=timezone.utc) active = [a for a in self._assignments if a.is_active(now)] active.sort(key=lambda a: (a.priority, a.scope_priority), reverse=True) return active def get_best_assignment( self, tracker_id: int, now: Optional[datetime] = None, ) -> Optional[PlanAssignment]: """Return the best active assignment for a specific tracker. Resolution order (first match wins): 1. Tracker-level assignment for this tracker_id 2. Line-level assignment for this tracker's line 3. Vineyard-level assignment """ now = now or datetime.now(tz=timezone.utc) active = self.get_active_assignments(now) line_id = TRACKER_LINES.get(tracker_id) for a in active: # Tracker-level match if a.tracker_id == tracker_id: return a # Line-level match if a.tracker_id is None and a.line_id == line_id and line_id is not None: return a # Vineyard-level match if a.tracker_id is None and a.line_id is None: return a return None def get_all_best_assignments( self, now: Optional[datetime] = None, ) -> Dict[int, Optional[PlanAssignment]]: """Return the best assignment for each known tracker.""" return { t_id: self.get_best_assignment(t_id, now) for t_id in TRACKER_LINES } # ------------------------------------------------------------------ # Mutate # ------------------------------------------------------------------ def assign( self, plan_id: str, plan_file: Optional[str] = None, tracker_id: Optional[int] = None, line_id: Optional[int] = None, vineyard_id: int = 1, priority: int = 0, reason: str = "", active_until: Optional[str] = None, ) -> PlanAssignment: """Create a new plan assignment.""" assignment = PlanAssignment( assignment_id=self._next_id, plan_id=plan_id, plan_file=plan_file, vineyard_id=vineyard_id, line_id=line_id, tracker_id=tracker_id, priority=priority, active_from=datetime.now(tz=timezone.utc).isoformat(), active_until=active_until, reason=reason, ) self._next_id += 1 self._assignments.append(assignment) self._save() logger.info("Created assignment %d: plan=%s scope=%s tracker=%s", assignment.assignment_id, plan_id, assignment.scope, tracker_id) return assignment def expire(self, assignment_id: int) -> bool: """Expire an assignment by setting its active_until to now.""" for a in self._assignments: if a.assignment_id == assignment_id: a.active_until = datetime.now(tz=timezone.utc).isoformat() self._save() logger.info("Expired assignment %d", assignment_id) return True return False def expire_all(self) -> int: """Expire all active assignments.""" now = datetime.now(tz=timezone.utc).isoformat() count = 0 for a in self._assignments: if a.active_until is None: a.active_until = now count += 1 if count: self._save() return count def summary(self) -> dict: """Return a summary of current assignments.""" best = self.get_all_best_assignments() return { "total_assignments": len(self._assignments), "active_assignments": len(self.get_active_assignments()), "trackers": { t_id: { "plan": a.plan_id if a else None, "scope": a.scope if a else None, "priority": a.priority if a else None, } for t_id, a in best.items() }, }