api / src /tracker_fleet.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
TrackerFleet: fleet-level plan assignment and tracker management.
Manages which tracking plan is assigned to which tracker, with a
three-level scope hierarchy:
Tracker-level > Line-level > Vineyard-level
The highest-specificity, highest-priority active assignment wins.
This module provides both:
- A file-based assignment store (JSON) for standalone / development use
- A DB-ready interface matching the tracker repo's PlanAssignmentManager schema
Adapted from the tracker repo's TrackerManager + PlanAssignmentManager,
stripped of async MySQL dependency — uses file-based storage by default.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from config.settings import DATA_DIR, TRACKER_ID_MAP
logger = logging.getLogger(__name__)
ASSIGNMENTS_FILE = DATA_DIR / "tracker_assignments.json"
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class PlanAssignment:
"""A plan assigned to a scope (tracker / line / vineyard)."""
assignment_id: int
plan_id: str # plan name or ID
plan_file: Optional[str] = None # JSON plan filename
vineyard_id: int = 1 # default vineyard
line_id: Optional[int] = None # None = vineyard-level
tracker_id: Optional[int] = None # None = line or vineyard-level
priority: int = 0
active_from: Optional[str] = None # ISO datetime
active_until: Optional[str] = None # ISO datetime, None = no expiry
reason: str = ""
@property
def scope(self) -> str:
if self.tracker_id is not None:
return "tracker"
if self.line_id is not None:
return "line"
return "vineyard"
@property
def scope_priority(self) -> int:
"""Higher = more specific."""
if self.tracker_id is not None:
return 2
if self.line_id is not None:
return 1
return 0
def is_active(self, now: Optional[datetime] = None) -> bool:
now = now or datetime.now(tz=timezone.utc)
if self.active_from:
start = datetime.fromisoformat(self.active_from)
if now < start:
return False
if self.active_until:
end = datetime.fromisoformat(self.active_until)
if now >= end:
return False
return True
def to_dict(self) -> dict:
return {
"assignment_id": self.assignment_id,
"plan_id": self.plan_id,
"plan_file": self.plan_file,
"vineyard_id": self.vineyard_id,
"line_id": self.line_id,
"tracker_id": self.tracker_id,
"priority": self.priority,
"active_from": self.active_from,
"active_until": self.active_until,
"reason": self.reason,
}
@classmethod
def from_dict(cls, d: dict) -> PlanAssignment:
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
# ---------------------------------------------------------------------------
# Fleet manager (file-based)
# ---------------------------------------------------------------------------
# Tracker ↔ line mapping for Yeruham vineyard
TRACKER_LINES = {
501: 501, # Tracker501 on line 501
502: 502,
503: 503,
509: 509,
}
def tracker_id_to_name(tracker_id: int) -> str:
"""Convert integer tracker ID to ThingsBoard device name."""
name = TRACKER_ID_MAP.get(tracker_id)
if name is None:
raise KeyError(f"Unknown tracker ID: {tracker_id}")
return name
def tracker_name_to_id(name: str) -> int:
"""Convert ThingsBoard device name to integer tracker ID."""
for tid, tname in TRACKER_ID_MAP.items():
if tname == name:
return tid
raise KeyError(f"Unknown tracker name: {name}")
class TrackerFleet:
"""Manage plan assignments for a fleet of trackers.
File-based storage for development; can be swapped to DB backend.
Parameters
----------
assignments_file : Path
JSON file storing active assignments.
"""
def __init__(self, assignments_file: Path | str = ASSIGNMENTS_FILE):
self._file = Path(assignments_file)
self._assignments: List[PlanAssignment] = []
self._next_id = 1
self._load()
def _load(self) -> None:
if self._file.exists():
try:
with open(self._file) as f:
data = json.load(f)
self._assignments = [PlanAssignment.from_dict(d) for d in data]
if self._assignments:
self._next_id = max(a.assignment_id for a in self._assignments) + 1
logger.info("Loaded %d assignments from %s",
len(self._assignments), self._file)
except Exception as exc:
logger.warning("Failed to load assignments: %s", exc)
self._assignments = []
else:
self._assignments = []
def _save(self) -> None:
self._file.parent.mkdir(parents=True, exist_ok=True)
with open(self._file, "w") as f:
json.dump([a.to_dict() for a in self._assignments], f, indent=2)
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def get_active_assignments(
self,
now: Optional[datetime] = None,
) -> List[PlanAssignment]:
"""Return all currently active assignments, sorted by priority."""
now = now or datetime.now(tz=timezone.utc)
active = [a for a in self._assignments if a.is_active(now)]
active.sort(key=lambda a: (a.priority, a.scope_priority), reverse=True)
return active
def get_best_assignment(
self,
tracker_id: int,
now: Optional[datetime] = None,
) -> Optional[PlanAssignment]:
"""Return the best active assignment for a specific tracker.
Resolution order (first match wins):
1. Tracker-level assignment for this tracker_id
2. Line-level assignment for this tracker's line
3. Vineyard-level assignment
"""
now = now or datetime.now(tz=timezone.utc)
active = self.get_active_assignments(now)
line_id = TRACKER_LINES.get(tracker_id)
for a in active:
# Tracker-level match
if a.tracker_id == tracker_id:
return a
# Line-level match
if a.tracker_id is None and a.line_id == line_id and line_id is not None:
return a
# Vineyard-level match
if a.tracker_id is None and a.line_id is None:
return a
return None
def get_all_best_assignments(
self,
now: Optional[datetime] = None,
) -> Dict[int, Optional[PlanAssignment]]:
"""Return the best assignment for each known tracker."""
return {
t_id: self.get_best_assignment(t_id, now)
for t_id in TRACKER_LINES
}
# ------------------------------------------------------------------
# Mutate
# ------------------------------------------------------------------
def assign(
self,
plan_id: str,
plan_file: Optional[str] = None,
tracker_id: Optional[int] = None,
line_id: Optional[int] = None,
vineyard_id: int = 1,
priority: int = 0,
reason: str = "",
active_until: Optional[str] = None,
) -> PlanAssignment:
"""Create a new plan assignment."""
assignment = PlanAssignment(
assignment_id=self._next_id,
plan_id=plan_id,
plan_file=plan_file,
vineyard_id=vineyard_id,
line_id=line_id,
tracker_id=tracker_id,
priority=priority,
active_from=datetime.now(tz=timezone.utc).isoformat(),
active_until=active_until,
reason=reason,
)
self._next_id += 1
self._assignments.append(assignment)
self._save()
logger.info("Created assignment %d: plan=%s scope=%s tracker=%s",
assignment.assignment_id, plan_id, assignment.scope, tracker_id)
return assignment
def expire(self, assignment_id: int) -> bool:
"""Expire an assignment by setting its active_until to now."""
for a in self._assignments:
if a.assignment_id == assignment_id:
a.active_until = datetime.now(tz=timezone.utc).isoformat()
self._save()
logger.info("Expired assignment %d", assignment_id)
return True
return False
def expire_all(self) -> int:
"""Expire all active assignments."""
now = datetime.now(tz=timezone.utc).isoformat()
count = 0
for a in self._assignments:
if a.active_until is None:
a.active_until = now
count += 1
if count:
self._save()
return count
def summary(self) -> dict:
"""Return a summary of current assignments."""
best = self.get_all_best_assignments()
return {
"total_assignments": len(self._assignments),
"active_assignments": len(self.get_active_assignments()),
"trackers": {
t_id: {
"plan": a.plan_id if a else None,
"scope": a.scope if a else None,
"priority": a.priority if a else None,
}
for t_id, a in best.items()
},
}