| """ |
| TrackerFleet: fleet-level plan assignment and tracker management. |
| |
| Manages which tracking plan is assigned to which tracker, with a |
| three-level scope hierarchy: |
| |
| Tracker-level > Line-level > Vineyard-level |
| |
| The highest-specificity, highest-priority active assignment wins. |
| |
| This module provides both: |
| - A file-based assignment store (JSON) for standalone / development use |
| - A DB-ready interface matching the tracker repo's PlanAssignmentManager schema |
| |
| Adapted from the tracker repo's TrackerManager + PlanAssignmentManager, |
| stripped of async MySQL dependency — uses file-based storage by default. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from dataclasses import dataclass, field |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| from config.settings import DATA_DIR, TRACKER_ID_MAP |
|
|
| logger = logging.getLogger(__name__) |
|
|
| ASSIGNMENTS_FILE = DATA_DIR / "tracker_assignments.json" |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class PlanAssignment: |
| """A plan assigned to a scope (tracker / line / vineyard).""" |
|
|
| assignment_id: int |
| plan_id: str |
| plan_file: Optional[str] = None |
| vineyard_id: int = 1 |
| line_id: Optional[int] = None |
| tracker_id: Optional[int] = None |
| priority: int = 0 |
| active_from: Optional[str] = None |
| active_until: Optional[str] = None |
| reason: str = "" |
|
|
| @property |
| def scope(self) -> str: |
| if self.tracker_id is not None: |
| return "tracker" |
| if self.line_id is not None: |
| return "line" |
| return "vineyard" |
|
|
| @property |
| def scope_priority(self) -> int: |
| """Higher = more specific.""" |
| if self.tracker_id is not None: |
| return 2 |
| if self.line_id is not None: |
| return 1 |
| return 0 |
|
|
| def is_active(self, now: Optional[datetime] = None) -> bool: |
| now = now or datetime.now(tz=timezone.utc) |
| if self.active_from: |
| start = datetime.fromisoformat(self.active_from) |
| if now < start: |
| return False |
| if self.active_until: |
| end = datetime.fromisoformat(self.active_until) |
| if now >= end: |
| return False |
| return True |
|
|
| def to_dict(self) -> dict: |
| return { |
| "assignment_id": self.assignment_id, |
| "plan_id": self.plan_id, |
| "plan_file": self.plan_file, |
| "vineyard_id": self.vineyard_id, |
| "line_id": self.line_id, |
| "tracker_id": self.tracker_id, |
| "priority": self.priority, |
| "active_from": self.active_from, |
| "active_until": self.active_until, |
| "reason": self.reason, |
| } |
|
|
| @classmethod |
| def from_dict(cls, d: dict) -> PlanAssignment: |
| return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) |
|
|
|
|
| |
| |
| |
|
|
| |
| TRACKER_LINES = { |
| 501: 501, |
| 502: 502, |
| 503: 503, |
| 509: 509, |
| } |
|
|
|
|
| def tracker_id_to_name(tracker_id: int) -> str: |
| """Convert integer tracker ID to ThingsBoard device name.""" |
| name = TRACKER_ID_MAP.get(tracker_id) |
| if name is None: |
| raise KeyError(f"Unknown tracker ID: {tracker_id}") |
| return name |
|
|
|
|
| def tracker_name_to_id(name: str) -> int: |
| """Convert ThingsBoard device name to integer tracker ID.""" |
| for tid, tname in TRACKER_ID_MAP.items(): |
| if tname == name: |
| return tid |
| raise KeyError(f"Unknown tracker name: {name}") |
|
|
|
|
| class TrackerFleet: |
| """Manage plan assignments for a fleet of trackers. |
| |
| File-based storage for development; can be swapped to DB backend. |
| |
| Parameters |
| ---------- |
| assignments_file : Path |
| JSON file storing active assignments. |
| """ |
|
|
| def __init__(self, assignments_file: Path | str = ASSIGNMENTS_FILE): |
| self._file = Path(assignments_file) |
| self._assignments: List[PlanAssignment] = [] |
| self._next_id = 1 |
| self._load() |
|
|
| def _load(self) -> None: |
| if self._file.exists(): |
| try: |
| with open(self._file) as f: |
| data = json.load(f) |
| self._assignments = [PlanAssignment.from_dict(d) for d in data] |
| if self._assignments: |
| self._next_id = max(a.assignment_id for a in self._assignments) + 1 |
| logger.info("Loaded %d assignments from %s", |
| len(self._assignments), self._file) |
| except Exception as exc: |
| logger.warning("Failed to load assignments: %s", exc) |
| self._assignments = [] |
| else: |
| self._assignments = [] |
|
|
| def _save(self) -> None: |
| self._file.parent.mkdir(parents=True, exist_ok=True) |
| with open(self._file, "w") as f: |
| json.dump([a.to_dict() for a in self._assignments], f, indent=2) |
|
|
| |
| |
| |
|
|
| def get_active_assignments( |
| self, |
| now: Optional[datetime] = None, |
| ) -> List[PlanAssignment]: |
| """Return all currently active assignments, sorted by priority.""" |
| now = now or datetime.now(tz=timezone.utc) |
| active = [a for a in self._assignments if a.is_active(now)] |
| active.sort(key=lambda a: (a.priority, a.scope_priority), reverse=True) |
| return active |
|
|
| def get_best_assignment( |
| self, |
| tracker_id: int, |
| now: Optional[datetime] = None, |
| ) -> Optional[PlanAssignment]: |
| """Return the best active assignment for a specific tracker. |
| |
| Resolution order (first match wins): |
| 1. Tracker-level assignment for this tracker_id |
| 2. Line-level assignment for this tracker's line |
| 3. Vineyard-level assignment |
| """ |
| now = now or datetime.now(tz=timezone.utc) |
| active = self.get_active_assignments(now) |
| line_id = TRACKER_LINES.get(tracker_id) |
|
|
| for a in active: |
| |
| if a.tracker_id == tracker_id: |
| return a |
| |
| if a.tracker_id is None and a.line_id == line_id and line_id is not None: |
| return a |
| |
| if a.tracker_id is None and a.line_id is None: |
| return a |
|
|
| return None |
|
|
| def get_all_best_assignments( |
| self, |
| now: Optional[datetime] = None, |
| ) -> Dict[int, Optional[PlanAssignment]]: |
| """Return the best assignment for each known tracker.""" |
| return { |
| t_id: self.get_best_assignment(t_id, now) |
| for t_id in TRACKER_LINES |
| } |
|
|
| |
| |
| |
|
|
| def assign( |
| self, |
| plan_id: str, |
| plan_file: Optional[str] = None, |
| tracker_id: Optional[int] = None, |
| line_id: Optional[int] = None, |
| vineyard_id: int = 1, |
| priority: int = 0, |
| reason: str = "", |
| active_until: Optional[str] = None, |
| ) -> PlanAssignment: |
| """Create a new plan assignment.""" |
| assignment = PlanAssignment( |
| assignment_id=self._next_id, |
| plan_id=plan_id, |
| plan_file=plan_file, |
| vineyard_id=vineyard_id, |
| line_id=line_id, |
| tracker_id=tracker_id, |
| priority=priority, |
| active_from=datetime.now(tz=timezone.utc).isoformat(), |
| active_until=active_until, |
| reason=reason, |
| ) |
| self._next_id += 1 |
| self._assignments.append(assignment) |
| self._save() |
| logger.info("Created assignment %d: plan=%s scope=%s tracker=%s", |
| assignment.assignment_id, plan_id, assignment.scope, tracker_id) |
| return assignment |
|
|
| def expire(self, assignment_id: int) -> bool: |
| """Expire an assignment by setting its active_until to now.""" |
| for a in self._assignments: |
| if a.assignment_id == assignment_id: |
| a.active_until = datetime.now(tz=timezone.utc).isoformat() |
| self._save() |
| logger.info("Expired assignment %d", assignment_id) |
| return True |
| return False |
|
|
| def expire_all(self) -> int: |
| """Expire all active assignments.""" |
| now = datetime.now(tz=timezone.utc).isoformat() |
| count = 0 |
| for a in self._assignments: |
| if a.active_until is None: |
| a.active_until = now |
| count += 1 |
| if count: |
| self._save() |
| return count |
|
|
| def summary(self) -> dict: |
| """Return a summary of current assignments.""" |
| best = self.get_all_best_assignments() |
| return { |
| "total_assignments": len(self._assignments), |
| "active_assignments": len(self.get_active_assignments()), |
| "trackers": { |
| t_id: { |
| "plan": a.plan_id if a else None, |
| "scope": a.scope if a else None, |
| "priority": a.priority if a else None, |
| } |
| for t_id, a in best.items() |
| }, |
| } |
|
|