| """ |
| Data provider layer for the VineyardChatbot. |
| |
| Architecture |
| ------------ |
| Each data domain gets a **Service** class that encapsulates: |
| - data fetching (IMS API, ThingsBoard API, model inference, ...) |
| - caching / TTL logic |
| - error handling (returns dict with "error" key on failure) |
| - serialisation to LLM-friendly dicts |
| |
| Services are registered on a lightweight **DataHub** which is injected |
| into the chatbot. The chatbot's tool methods become thin one-liners |
| that delegate to ``self.hub.<service>.<method>()``. |
| |
| ββββββββββββββββββββββ |
| β VineyardChatbot β |
| β (tool dispatch) β |
| ββββββββββ¬ββββββββββββ |
| β self.hub |
| ββββββββββΌββββββββββββ |
| β DataHub β |
| β (service registry) β |
| ββββββββββ¬ββββββββββββ |
| ββββββββββββ¬βββββββββΌβββββββββ¬βββββββββββ |
| βΌ βΌ βΌ βΌ βΌ |
| WeatherSvc VineSensorSvc PSSvc EnergySvc BiologySvc |
| β β β β β |
| IMSClient TB Client Farquhar TB+Analytical rules dict |
| ML Pred |
| |
| Loose coupling guarantees: |
| - The chatbot never imports IMS / TB / Farquhar / ML directly. |
| - Each service can be unit-tested in isolation (pass a mock client). |
| - Adding a new data source = write a new Service + register it. |
| - Services own their TTL caches β the chatbot is stateless w.r.t. data. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import math |
| import time |
| import traceback |
|
|
| log = logging.getLogger("solarwine.data_providers") |
| from abc import ABC, abstractmethod |
| from dataclasses import dataclass, field |
| from datetime import date, datetime, timedelta, timezone |
| from typing import Any, Dict, List, Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| |
| |
| |
|
|
| class CircuitBreaker: |
| """Simple circuit breaker: after `threshold` consecutive failures within |
| `window_sec`, the circuit opens and calls are short-circuited for |
| `cooldown_sec` before retrying. |
| """ |
|
|
| def __init__(self, threshold: int = 3, cooldown_sec: float = 300, window_sec: float = 60): |
| self.threshold = threshold |
| self.cooldown_sec = cooldown_sec |
| self.window_sec = window_sec |
| self._failures: list[float] = [] |
| self._opened_at: float | None = None |
|
|
| @property |
| def is_open(self) -> bool: |
| if self._opened_at is None: |
| return False |
| if time.monotonic() - self._opened_at > self.cooldown_sec: |
| |
| self._opened_at = None |
| self._failures.clear() |
| return False |
| return True |
|
|
| def record_success(self) -> None: |
| self._failures.clear() |
| self._opened_at = None |
|
|
| def record_failure(self) -> None: |
| now = time.monotonic() |
| self._failures = [t for t in self._failures if now - t < self.window_sec] |
| self._failures.append(now) |
| if len(self._failures) >= self.threshold: |
| self._opened_at = now |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class _CacheEntry: |
| value: Any |
| expires_at: float |
|
|
|
|
| class TTLCache: |
| """TTL cache with optional Redis backend. |
| |
| When Redis is available (``UPSTASH_REDIS_URL`` set), values are stored |
| in Redis so multiple processes (API server, workers) share state. |
| Falls back to in-memory when Redis is unavailable β Streamlit keeps |
| working exactly as before. |
| """ |
|
|
| def __init__(self, ttl_seconds: float = 300, redis_prefix: str = ""): |
| self.ttl = ttl_seconds |
| self._prefix = redis_prefix |
| self._store: Dict[str, _CacheEntry] = {} |
| |
| self._redis_checked = False |
| self._redis = None |
|
|
| def _get_redis(self): |
| if not self._redis_checked: |
| self._redis_checked = True |
| try: |
| from src.data.redis_cache import get_redis |
| self._redis = get_redis() |
| except Exception: |
| self._redis = None |
| return self._redis |
|
|
| def _rkey(self, key: str) -> str: |
| return f"{self._prefix}{key}" if self._prefix else key |
|
|
| def get(self, key: str) -> Any | None: |
| |
| redis = self._get_redis() |
| if redis: |
| val = redis.get_json(self._rkey(key)) |
| if val is not None: |
| return val |
| |
| entry = self._store.get(key) |
| if entry and time.monotonic() < entry.expires_at: |
| return entry.value |
| return None |
|
|
| def set(self, key: str, value: Any) -> None: |
| |
| redis = self._get_redis() |
| if redis and not isinstance(value, pd.DataFrame): |
| redis.set_json(self._rkey(key), value, ttl=int(self.ttl)) |
| |
| self._store[key] = _CacheEntry(value=value, expires_at=time.monotonic() + self.ttl) |
|
|
| def invalidate(self, key: str) -> None: |
| redis = self._get_redis() |
| if redis: |
| redis.delete(self._rkey(key)) |
| self._store.pop(key, None) |
|
|
|
|
| |
| |
| |
|
|
| def summarise_dataframe(df: pd.DataFrame, max_rows: int = 48) -> Dict[str, Any]: |
| """Compress a DataFrame to key stats when it exceeds *max_rows*. |
| |
| Returns a dict with ``rows`` (list of dicts) if small enough, or |
| ``summary`` (per-column min/max/mean/trend) if too large. |
| """ |
| if df.empty: |
| return {"rows": [], "note": "No data available."} |
|
|
| if len(df) <= max_rows: |
| records = df.reset_index().to_dict(orient="records") |
| for r in records: |
| for k, v in list(r.items()): |
| if isinstance(v, (pd.Timestamp, datetime)): |
| r[k] = str(v) |
| elif isinstance(v, (float, np.floating)): |
| fv = float(v) |
| r[k] = None if (math.isnan(fv) or math.isinf(fv)) else round(fv, 2) |
| return {"rows": records, "row_count": len(records)} |
|
|
| |
| summary: Dict[str, Any] = {"row_count": len(df), "summarised": True, "columns": {}} |
| numeric = df.select_dtypes(include=[np.number]) |
| for col in numeric.columns: |
| s = numeric[col].dropna() |
| if s.empty: |
| continue |
| summary["columns"][col] = { |
| "min": round(float(s.min()), 2), |
| "max": round(float(s.max()), 2), |
| "mean": round(float(s.mean()), 2), |
| "first": round(float(s.iloc[0]), 2), |
| "last": round(float(s.iloc[-1]), 2), |
| } |
|
|
| |
| if isinstance(df.index, pd.DatetimeIndex): |
| summary["time_range"] = {"start": str(df.index.min()), "end": str(df.index.max())} |
| return summary |
|
|
|
|
| |
| |
| |
|
|
| class BaseService(ABC): |
| """Abstract base for all data-provider services. |
| |
| Subclasses must implement ``service_name`` (used as registry key). |
| All public methods should return plain dicts (JSON-serialisable) |
| so the chatbot can forward them to the LLM without conversion. |
| """ |
|
|
| @property |
| @abstractmethod |
| def service_name(self) -> str: ... |
|
|
|
|
| |
| |
| |
|
|
| class WeatherService(BaseService): |
| """IMS weather data β cached CSV for history, latest row for 'now'.""" |
|
|
| service_name = "weather" |
|
|
| def __init__(self, ims_client: Any = None, cache_ttl: float = 1800): |
| self._ims = ims_client |
| self._df_cache = TTLCache(ttl_seconds=cache_ttl, redis_prefix="weather:") |
|
|
| |
|
|
| def _client(self): |
| if self._ims is None: |
| from src.ims_client import IMSClient |
| self._ims = IMSClient() |
| return self._ims |
|
|
| def _load_df(self) -> pd.DataFrame: |
| cached = self._df_cache.get("ims") |
| if cached is not None: |
| |
| if isinstance(cached, pd.DataFrame): |
| return cached |
| |
| df = self._client().load_cached() |
| if not df.empty: |
| self._df_cache.set("ims", df) |
| return df |
|
|
| def get_dataframe(self) -> pd.DataFrame: |
| """Public accessor for the cached IMS DataFrame.""" |
| return self._load_df() |
|
|
| |
|
|
| def _now_israel(self) -> Dict[str, str]: |
| """Current time in Yeruham (Asia/Jerusalem) for context in API responses.""" |
| try: |
| from zoneinfo import ZoneInfo |
| tz = ZoneInfo("Asia/Jerusalem") |
| except ImportError: |
| tz = timezone(timedelta(hours=2)) |
| now = datetime.now(tz) |
| return { |
| "current_time_israel": now.strftime("%H:%M"), |
| "current_date_israel": now.strftime("%Y-%m-%d"), |
| "current_datetime_israel": now.isoformat(), |
| } |
|
|
| def get_current(self) -> Dict[str, Any]: |
| """Latest IMS weather row with local time and staleness. Always includes current time (Yeruham) so callers can compare.""" |
| try: |
| df = self._load_df() |
| if df.empty: |
| return {"error": "No cached IMS data available.", **self._now_israel()} |
| last = df.iloc[-1] |
|
|
| result: Dict[str, Any] = { |
| "timezone": "Asia/Jerusalem (Israel local, Yeruham/Sde Boker)", |
| **self._now_israel(), |
| } |
| try: |
| ts_utc = pd.to_datetime(last.get("timestamp_utc"), utc=True) |
| ts_local = ts_utc.tz_convert("Asia/Jerusalem") |
| now_utc = pd.Timestamp.now(tz="UTC") |
| result["timestamp_utc"] = ts_utc.isoformat() |
| result["timestamp_local"] = ts_local.isoformat() |
| result["age_minutes"] = round((now_utc - ts_utc).total_seconds() / 60, 1) |
| except Exception: |
| result["timestamp_utc"] = str(last.get("timestamp_utc", "unknown")) |
|
|
| for col in df.columns: |
| if col != "timestamp_utc": |
| val = last[col] |
| if pd.notna(val): |
| result[col] = round(float(val), 2) if isinstance(val, (int, float, np.floating)) else str(val) |
| return result |
| except Exception as exc: |
| return {"error": f"Could not load weather data: {exc}"} |
|
|
| def get_history(self, start_date: str, end_date: str) -> Dict[str, Any]: |
| """Hourly IMS summary for a date range (from cached CSV).""" |
| try: |
| df = self._load_df() |
| if df.empty: |
| return {"error": "No cached IMS data."} |
| if "timestamp_utc" in df.columns: |
| df = df.set_index(pd.to_datetime(df["timestamp_utc"], utc=True)) |
| start = pd.Timestamp(start_date, tz="UTC") |
| end = pd.Timestamp(end_date, tz="UTC") + pd.Timedelta(days=1) |
| subset = df.loc[start:end] |
| if subset.empty: |
| return {"error": f"No data in range {start_date} to {end_date}."} |
| hourly = subset.resample("1h").mean(numeric_only=True) |
| return summarise_dataframe(hourly) |
| except Exception as exc: |
| return {"error": f"Weather history failed: {exc}"} |
|
|
|
|
| |
| |
| |
|
|
| class VineSensorService(BaseService): |
| """On-site vine sensors via ThingsBoard β snapshot + time-series.""" |
|
|
| service_name = "vine_sensors" |
|
|
| def __init__(self, tb_client: Any = None, snapshot_ttl: float = 300): |
| self._tb = tb_client |
| self._snap_cache = TTLCache(ttl_seconds=snapshot_ttl, redis_prefix="vine:") |
| self._ts_cache = TTLCache(ttl_seconds=900, redis_prefix="vine_ts:") |
| self._tracker_cache = TTLCache(ttl_seconds=300, redis_prefix="tracker:") |
| self._breaker = CircuitBreaker(threshold=3, cooldown_sec=300) |
|
|
| def _client(self): |
| if self._tb is None: |
| from src.thingsboard_client import ThingsBoardClient |
| self._tb = ThingsBoardClient() |
| return self._tb |
|
|
| |
|
|
| def get_snapshot(self, light: bool = False, |
| mode: Optional[str] = None) -> Dict[str, Any]: |
| """Latest vine state (treatment vs reference), 5-min TTL. |
| |
| Parameters |
| ---------- |
| light : bool |
| If True, fetch only ~6 key devices instead of all 21. |
| mode : str, optional |
| "dashboard" = 4 devices only (air + soil + irrigation). |
| """ |
| cache_key = mode or ("snap_light" if light else "snap") |
| cached = self._snap_cache.get(cache_key) |
| if cached is not None: |
| return cached |
| if self._breaker.is_open: |
| return {"error": "ThingsBoard circuit breaker open β retrying in 5 min", "cached": True} |
| try: |
| snapshot = self._client().get_vine_snapshot(light=light, mode=mode) |
| result = snapshot.to_dict() |
| self._snap_cache.set(cache_key, result) |
| self._breaker.record_success() |
| return result |
| except Exception as exc: |
| self._breaker.record_failure() |
| return { |
| "error": f"ThingsBoard unavailable: {exc}", |
| "hint": "Check THINGSBOARD_USERNAME/PASSWORD in .env", |
| } |
|
|
| def get_history( |
| self, |
| device_type: str = "crop", |
| area: str = "treatment", |
| hours_back: int = 24, |
| ) -> Dict[str, Any]: |
| """Hourly averages for a device group over the last N hours.""" |
| try: |
| from src.thingsboard_client import ( |
| AIR_KEYS, CROP_KEYS, SOIL_KEYS, DEVICE_REGISTRY, VineArea, |
| ) |
| except Exception as exc: |
| log.error("ThingsBoard client import failed: %s", exc) |
| return {"error": f"ThingsBoard client unavailable: {exc}"} |
|
|
| key_map = {"air": AIR_KEYS, "crop": CROP_KEYS, "soil": SOIL_KEYS} |
| keys = key_map.get(device_type.lower()) |
| if keys is None: |
| return {"error": f"Unknown device_type '{device_type}'. Use air/crop/soil."} |
|
|
| area_enum = { |
| "treatment": VineArea.TREATMENT, |
| "reference": VineArea.REFERENCE, |
| "ambient": VineArea.AMBIENT, |
| }.get(area.lower()) |
| if area_enum is None: |
| return {"error": f"Unknown area '{area}'. Use treatment/reference/ambient."} |
|
|
| |
| devices = [ |
| name for name, info in DEVICE_REGISTRY.items() |
| if info.area == area_enum and name.lower().startswith(device_type.lower()) |
| ] |
| if not devices: |
| return {"error": f"No {device_type} devices in {area} area."} |
|
|
| end = datetime.now(tz=timezone.utc) |
| start = end - timedelta(hours=hours_back) |
|
|
| try: |
| frames = [] |
| for dev in devices: |
| df = self._client().get_timeseries(dev, keys, start, end) |
| if not df.empty: |
| df = df.add_prefix(f"{dev}_") |
| frames.append(df) |
| if not frames: |
| return {"error": "No time-series data returned from ThingsBoard."} |
| merged = pd.concat(frames, axis=1).sort_index() |
| hourly = merged.resample("1h").mean(numeric_only=True) |
| return summarise_dataframe(hourly) |
| except Exception as exc: |
| log.error("Sensor history query failed: %s", exc) |
| return {"error": f"Sensor history failed: {exc}"} |
|
|
| def get_device_timeseries( |
| self, |
| device: str, |
| keys: List[str], |
| hours_back: int = 168, |
| agg: str = "AVG", |
| ) -> List[Dict[str, Any]]: |
| """Hourly time-series for a specific device + keys (15-min TTL cache). |
| |
| Returns a list of ``{timestamp, key1, key2, ...}`` dicts. |
| Used by sensor history endpoints (soil moisture, VPD, NDVI, etc.). |
| """ |
| cache_key = f"{device}:{','.join(sorted(keys))}:{hours_back}:{agg}" |
| cached = self._ts_cache.get(cache_key) |
| if cached is not None: |
| return cached |
| if self._breaker.is_open: |
| return [] |
| try: |
| end = datetime.now(tz=timezone.utc) |
| start = end - timedelta(hours=hours_back) |
| client = self._client() |
|
|
| |
| df = client.get_timeseries( |
| device, keys, start=start, end=end, |
| interval_ms=3_600_000, agg=agg, limit=2000, |
| ) |
| if df.empty: |
| |
| df = client.get_timeseries( |
| device, keys, start=start, end=end, |
| interval_ms=0, agg="NONE", limit=10000, |
| ) |
| if not df.empty: |
| df = df.resample("1h").mean(numeric_only=True).dropna(how="all") |
|
|
| if df.empty: |
| self._breaker.record_success() |
| result: List[Dict[str, Any]] = [] |
| self._ts_cache.set(cache_key, result) |
| return result |
|
|
| rows: List[Dict[str, Any]] = [] |
| for ts, row in df.iterrows(): |
| r: Dict[str, Any] = {"timestamp": ts.isoformat()} |
| for col in df.columns: |
| val = row[col] |
| if val is not None and val == val: |
| r[col] = round(float(val), 2) |
| rows.append(r) |
|
|
| self._breaker.record_success() |
| self._ts_cache.set(cache_key, rows) |
| return rows |
| except Exception as exc: |
| self._breaker.record_failure() |
| log.error("Device timeseries failed (%s): %s", device, exc) |
| return [] |
|
|
| def get_tracker_details(self) -> Dict[str, Any]: |
| """Latest tracker angles/modes for all 4 trackers (5-min TTL cache).""" |
| cached = self._tracker_cache.get("details") |
| if cached is not None: |
| return cached |
| if self._breaker.is_open: |
| return {"trackers": [], "error": "ThingsBoard circuit breaker open"} |
| try: |
| client = self._client() |
| tracker_keys = ["angle", "manualMode", "setAngle", "setMode"] |
| trackers = [] |
| for name in ["Tracker501", "Tracker502", "Tracker503", "Tracker509"]: |
| try: |
| vals = client.get_latest_telemetry(name, tracker_keys) |
| trackers.append({ |
| "name": name, |
| "label": name.replace("Tracker", "Row "), |
| "angle": round(float(vals.get("angle", 0)), 1) if vals.get("angle") is not None else None, |
| "manual_mode": vals.get("manualMode"), |
| "set_angle": round(float(vals.get("setAngle", 0)), 1) if vals.get("setAngle") is not None else None, |
| "set_mode": vals.get("setMode"), |
| }) |
| except Exception as exc: |
| trackers.append({"name": name, "label": name, "error": str(exc)}) |
| result = {"trackers": trackers} |
| self._breaker.record_success() |
| self._tracker_cache.set("details", result) |
| return result |
| except Exception as exc: |
| self._breaker.record_failure() |
| log.error("Tracker details failed: %s", exc) |
| return {"trackers": [], "error": str(exc)} |
|
|
|
|
| |
| |
| |
|
|
| class PhotosynthesisService(BaseService): |
| """Photosynthesis predictions β mechanistic, ML, and day-ahead.""" |
|
|
| service_name = "photosynthesis" |
|
|
| def __init__(self): |
| self._farquhar = None |
| self._ml_predictor = None |
| self._shadow = None |
| self._canopy = None |
|
|
| |
|
|
| def _get_farquhar(self): |
| if self._farquhar is None: |
| from src.farquhar_model import FarquharModel |
| self._farquhar = FarquharModel() |
| return self._farquhar |
|
|
| def _get_shadow(self): |
| if self._shadow is None: |
| from src.solar_geometry import ShadowModel |
| self._shadow = ShadowModel() |
| return self._shadow |
|
|
| def _get_canopy(self): |
| if self._canopy is None: |
| from src.canopy_photosynthesis import CanopyPhotosynthesisModel |
| self._canopy = CanopyPhotosynthesisModel( |
| shadow_model=self._get_shadow(), |
| farquhar_model=self._get_farquhar(), |
| ) |
| return self._canopy |
|
|
| |
|
|
| def predict_fvcb( |
| self, PAR: float, Tleaf: float, CO2: float, VPD: float, Tair: float, |
| ) -> Dict[str, Any]: |
| """Single-point Farquhar model prediction with limiting factor.""" |
| model = self._get_farquhar() |
| A = model.calc_photosynthesis(PAR=PAR, Tleaf=Tleaf, CO2=CO2, VPD=VPD, Tair=Tair) |
|
|
| Tk = Tleaf + 273.15 |
| Vcmax = model.calc_Vcmax(Tk) |
| Jmax = model.calc_Jmax(Tk) |
| gamma_star = model.calc_gamma_star(Tk) |
| Kc = model.calc_Kc(Tk) |
| Ko = model.calc_Ko(Tk) |
| ci = model._ci_from_ca(CO2, VPD) |
| J = model.calc_electron_transport(PAR, Jmax) |
| Ac = Vcmax * (ci - gamma_star) / (ci + Kc * (1.0 + 210.0 / Ko)) |
| Aj = J * (ci - gamma_star) / (4.0 * ci + 8.0 * gamma_star) |
|
|
| limiting = ("Rubisco-limited (high temperature is the bottleneck)" |
| if Ac < Aj else |
| "RuBP-limited (light is the bottleneck)") |
| shading_helps = Tleaf > 30.0 |
|
|
| return { |
| "A_net": round(A, 3), |
| "units": "umol CO2 m-2 s-1", |
| "limiting_factor": limiting, |
| "Tleaf": Tleaf, |
| "shading_would_help": shading_helps, |
| "model": "fvcb", |
| "note": ("Shading may help reduce heat stress" if shading_helps |
| else "Shading would reduce photosynthesis (vine needs light)"), |
| } |
|
|
| def predict_ml(self, features: Optional[Dict[str, float]] = None) -> Dict[str, Any]: |
| """ML ensemble prediction. Auto-fills features from latest IMS if not provided. |
| |
| Trains the model once on first call (lazy), then caches it. |
| """ |
| try: |
| predictor, feature_cols, best_name = self._ensure_ml_predictor() |
| except Exception as exc: |
| return {"error": f"ML predictor unavailable: {exc}"} |
|
|
| try: |
| if features: |
| row = {col: features.get(col, 0.0) for col in feature_cols} |
| else: |
| row = self._auto_fill_features(feature_cols) |
| if row is None: |
| return {"error": "No IMS data available to auto-fill features."} |
|
|
| import pandas as _pd |
| X = _pd.DataFrame([row])[feature_cols] |
| model = predictor.models[best_name] |
| pred = float(model.predict(X)[0]) |
| metrics = predictor.results.get(best_name, {}) |
|
|
| return { |
| "A_net_predicted": round(pred, 3), |
| "units": "umol CO2 m-2 s-1", |
| "model": best_name, |
| "model_mae": round(metrics.get("mae", 0), 3), |
| "model_r2": round(metrics.get("r2", 0), 3), |
| "features_used": {k: round(v, 2) for k, v in row.items()}, |
| "note": "Prediction from ML ensemble trained on IMS weather features.", |
| } |
| except Exception as exc: |
| return {"error": f"ML prediction failed: {exc}"} |
|
|
| def _ensure_ml_predictor(self): |
| """Train the ML predictor once and cache it. Returns (predictor, feature_cols, best_name).""" |
| if self._ml_predictor is not None: |
| return self._ml_predictor |
|
|
| from src.ims_client import IMSClient |
| from src.farquhar_model import FarquharModel |
| from src.preprocessor import Preprocessor |
| from src.predictor import PhotosynthesisPredictor |
|
|
| ims = IMSClient() |
| ims_df = ims.load_cached() |
| if ims_df.empty: |
| raise RuntimeError("No IMS cache data β cannot train ML predictor.") |
|
|
| |
| from src.sensor_data_loader import SensorDataLoader |
| loader = SensorDataLoader() |
| sensor_df = loader.load() |
| fvcb = FarquharModel() |
| labels = fvcb.compute_all(sensor_df) |
| labels.name = "A" |
|
|
| |
| if "time" in sensor_df.columns: |
| ts = pd.to_datetime(sensor_df["time"], utc=True) |
| labels.index = ts |
|
|
| |
| prep = Preprocessor() |
| merged = prep.merge_ims_with_labels(ims_df, labels) |
| if merged.empty: |
| raise RuntimeError("Merge of IMS + labels produced empty DataFrame.") |
| merged = prep.create_time_features(merged) |
| X_train, y_train, X_test, y_test = prep.temporal_split(merged) |
| if X_train.empty: |
| raise RuntimeError("Not enough data to train ML predictor.") |
|
|
| predictor = PhotosynthesisPredictor() |
| predictor.train(X_train, y_train) |
| if not X_test.empty: |
| predictor.evaluate(X_test, y_test) |
|
|
| best_name = "GradientBoosting" |
| if predictor.results: |
| best_name = min(predictor.results, key=lambda n: predictor.results[n].get("mae", 999)) |
|
|
| feature_cols = list(X_train.columns) |
| self._ml_predictor = (predictor, feature_cols, best_name) |
| return self._ml_predictor |
|
|
| def _auto_fill_features(self, feature_cols: List[str]) -> Optional[Dict[str, float]]: |
| """Fill feature vector from the latest IMS cache row + time features.""" |
| try: |
| from src.ims_client import IMSClient |
| from src.time_features import add_cyclical_time_features |
| ims = IMSClient() |
| df = ims.load_cached() |
| if df.empty: |
| return None |
| last_row_df = df.tail(1).copy() |
| last_row_df = add_cyclical_time_features(last_row_df, timestamp_col="timestamp_utc") |
| ts = pd.to_datetime(last_row_df["timestamp_utc"].iloc[0], utc=True) |
| last_row_df["month"] = ts.month |
| last_row_df["day_of_year"] = ts.day_of_year |
| row = {} |
| for col in feature_cols: |
| if col in last_row_df.columns: |
| val = last_row_df[col].iloc[0] |
| row[col] = float(val) if pd.notna(val) else 0.0 |
| else: |
| row[col] = 0.0 |
| return row |
| except Exception: |
| return None |
|
|
| def forecast_day_ahead(self, target_date: Optional[str] = None) -> Dict[str, Any]: |
| """24h A profile using FvCB model over IMS weather data. |
| |
| For each daytime hour, computes A from IMS temperature/GHI/humidity |
| using typical vine conditions. Falls back to FvCB-based projection |
| when Chronos or ML forecast is unavailable. |
| """ |
| try: |
| from src.ims_client import IMSClient |
| ims = IMSClient() |
| df = ims.load_cached() |
| if df.empty: |
| return {"error": "No IMS data cached for PS forecast."} |
|
|
| if "timestamp_utc" in df.columns: |
| df["timestamp_utc"] = pd.to_datetime(df["timestamp_utc"], utc=True) |
| df = df.set_index("timestamp_utc") |
|
|
| target = target_date or str(date.today()) |
| try: |
| day_start = pd.Timestamp(target, tz="UTC") |
| day_end = day_start + pd.Timedelta(days=1) |
| day_df = df.loc[day_start:day_end] |
| except Exception: |
| day_df = pd.DataFrame() |
|
|
| |
| if day_df.empty: |
| day_df = df.tail(96) |
| if day_df.empty: |
| return {"error": "Not enough IMS data for forecast."} |
| target = str(day_df.index[-1].date()) |
|
|
| hourly = day_df.resample("1h").mean(numeric_only=True) |
| model = self._get_farquhar() |
|
|
| |
| def _find_col(df_cols, exact_names, fuzzy_terms, exclude_terms=()): |
| for name in exact_names: |
| if name in df_cols: |
| return name |
| for c in df_cols: |
| cl = c.lower() |
| if any(t in cl for t in fuzzy_terms) and not any(t in cl for t in exclude_terms): |
| return c |
| return None |
|
|
| temp_col = _find_col(hourly.columns, ["air_temperature_c"], ["temp"], ["dew", "soil"]) |
| ghi_col = _find_col(hourly.columns, ["ghi_w_m2"], ["ghi", "rad", "irrad"]) |
| rh_col = _find_col(hourly.columns, ["rh_percent"], ["rh", "humid"]) |
|
|
| hourly_results = [] |
| for idx, row in hourly.iterrows(): |
| hour = idx.hour if hasattr(idx, "hour") else 0 |
| if hour < 6 or hour > 19: |
| continue |
|
|
| Tair = float(row[temp_col]) if temp_col and pd.notna(row.get(temp_col)) else 25.0 |
| Tleaf = Tair + 2.0 |
| ghi = float(row[ghi_col]) if ghi_col and pd.notna(row.get(ghi_col)) else 0.0 |
| PAR = ghi * 2.0 |
| rh = float(row[rh_col]) if rh_col and pd.notna(row.get(rh_col)) else 40.0 |
|
|
| |
| es = 0.6108 * np.exp(17.27 * Tair / (Tair + 237.3)) |
| VPD = max(es * (1 - rh / 100), 0.1) |
|
|
| if PAR < 50: |
| A = 0.0 |
| limiting = "dark" |
| else: |
| A = model.calc_photosynthesis(PAR=PAR, Tleaf=Tleaf, CO2=400.0, VPD=VPD, Tair=Tair) |
| limiting = "rubisco" if Tleaf > 30 else "rubp" |
|
|
| hourly_results.append({ |
| "hour": hour, |
| "A_predicted": round(A, 2), |
| "Tair": round(Tair, 1), |
| "PAR": round(PAR, 0), |
| "VPD": round(VPD, 2), |
| "limiting": limiting, |
| "shading_helps": Tleaf > 30.0, |
| }) |
|
|
| if not hourly_results: |
| return {"error": "No daytime hours available in forecast range."} |
|
|
| peak = max(hourly_results, key=lambda r: r["A_predicted"]) |
| total_A = sum(r["A_predicted"] for r in hourly_results) |
| stress_hours = sum(1 for r in hourly_results if r["limiting"] == "rubisco") |
|
|
| return { |
| "date": target, |
| "method": "fvcb_projection", |
| "hourly": hourly_results, |
| "peak_A": peak["A_predicted"], |
| "peak_hour": peak["hour"], |
| "daily_total_A": round(total_A, 1), |
| "rubisco_limited_hours": stress_hours, |
| "note": "FvCB-based projection from IMS weather data. " |
| "PAR estimated as 2x GHI. Leaf temp estimated as Tair+2C.", |
| } |
| except Exception as exc: |
| return {"error": f"PS forecast failed: {exc}"} |
|
|
| def simulate_shading( |
| self, |
| angle_offset: float, |
| hour: int, |
| date_str: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Compare A at astronomical tracking vs offset angle.""" |
| shadow = self._get_shadow() |
| canopy = self._get_canopy() |
|
|
| dt_str = date_str or str(date.today()) |
| try: |
| dt = pd.Timestamp(f"{dt_str} {hour:02d}:00:00", tz="Asia/Jerusalem") |
| except Exception: |
| dt = pd.Timestamp(f"{date.today()} {hour:02d}:00:00", tz="Asia/Jerusalem") |
|
|
| solar_pos = shadow.get_solar_position(pd.DatetimeIndex([dt])) |
| elev = float(solar_pos["solar_elevation"].iloc[0]) |
| azim = float(solar_pos["solar_azimuth"].iloc[0]) |
|
|
| if elev <= 2.0: |
| return {"error": f"Sun below horizon at hour {hour} (elevation {elev:.1f}\u00b0)."} |
|
|
| tracker = shadow.compute_tracker_tilt(azim, elev) |
| astro_tilt = tracker["tracker_theta"] |
|
|
| PAR, Tleaf, CO2, VPD, Tair = 1800.0, 32.0, 400.0, 2.5, 33.0 |
|
|
| mask_un = shadow.project_shadow(elev, azim, astro_tilt) |
| res_un = canopy.compute_vine_A( |
| par=PAR, Tleaf=Tleaf, CO2=CO2, VPD=VPD, Tair=Tair, |
| shadow_mask=mask_un, solar_elevation=elev, |
| solar_azimuth=azim, tracker_tilt=astro_tilt, |
| ) |
|
|
| shaded_tilt = astro_tilt + angle_offset |
| mask_sh = shadow.project_shadow(elev, azim, shaded_tilt) |
| res_sh = canopy.compute_vine_A( |
| par=PAR, Tleaf=Tleaf, CO2=CO2, VPD=VPD, Tair=Tair, |
| shadow_mask=mask_sh, solar_elevation=elev, |
| solar_azimuth=azim, tracker_tilt=shaded_tilt, |
| ) |
|
|
| A_un = res_un["A_vine"] |
| A_sh = res_sh["A_vine"] |
| change = ((A_sh - A_un) / A_un * 100) if A_un > 0 else 0 |
|
|
| return { |
| "hour": hour, "date": dt_str, "angle_offset": angle_offset, |
| "solar_elevation": round(elev, 1), |
| "A_unshaded": round(A_un, 3), "A_shaded": round(A_sh, 3), |
| "A_change_pct": round(change, 1), |
| "sunlit_fraction_unshaded": round(res_un["sunlit_fraction"], 3), |
| "sunlit_fraction_shaded": round(res_sh["sunlit_fraction"], 3), |
| "tracker_tilt_astronomical": round(astro_tilt, 1), |
| "tracker_tilt_shaded": round(shaded_tilt, 1), |
| } |
|
|
| def compare_angles(self, angles: Optional[List[int]] = None) -> Dict[str, Any]: |
| """Compare A and energy across tilt angle offsets.""" |
| try: |
| from src.tracker_optimizer import simulate_tilt_angles, load_sensor_data |
| df = load_sensor_data() |
| result_df = simulate_tilt_angles(df, angles=angles) |
| records = result_df.to_dict(orient="records") |
| for r in records: |
| for k, v in r.items(): |
| if isinstance(v, (float, np.floating)): |
| r[k] = round(float(v), 2) |
| return {"angles": records} |
| except Exception as exc: |
| return {"error": f"Angle comparison failed: {exc}"} |
|
|
| def daily_schedule( |
| self, stress_threshold: float = 2.0, shade_angle: int = 20, |
| ) -> Dict[str, Any]: |
| """Hourly shading schedule based on leaf-air temperature stress.""" |
| try: |
| from src.tracker_optimizer import compute_daily_schedule, load_sensor_data |
| df = load_sensor_data() |
| last_date = df["date"].max() |
| day_df = df[df["date"] == last_date].copy() |
| if day_df.empty: |
| return {"error": "No sensor data available for schedule."} |
| result_df = compute_daily_schedule( |
| day_df, stress_threshold=stress_threshold, shade_angle=shade_angle, |
| ) |
| records = result_df.to_dict(orient="records") |
| for r in records: |
| for k, v in list(r.items()): |
| if isinstance(v, (float, np.floating)): |
| r[k] = round(float(v), 2) |
| elif isinstance(v, (pd.Timestamp, datetime)): |
| r[k] = str(v) |
| return {"date": str(last_date), "schedule": records} |
| except Exception as exc: |
| return {"error": f"Schedule failed: {exc}"} |
|
|
| def get_photosynthesis_3d_scene( |
| self, |
| hour: Optional[int] = None, |
| date_str: Optional[str] = None, |
| height_px: int = 480, |
| ) -> Dict[str, Any]: |
| """Build 3D scene data and HTML for vine, tracker, sun and photosynthesis. |
| |
| Returns dict with scene_3d (data), scene_3d_html (full HTML string), |
| A_vine, sunlit_fraction, and optional error. |
| """ |
| try: |
| from src.vine_3d_scene import build_scene_data, build_scene_html |
| except Exception as exc: |
| return {"error": f"3D scene module unavailable: {exc}"} |
|
|
| try: |
| from datetime import datetime |
| h = hour if hour is not None else datetime.now().hour |
| scene_data = build_scene_data(hour=h, date_str=date_str) |
| html = build_scene_html(scene_data, height_px=height_px) |
| return { |
| "scene_3d": scene_data, |
| "scene_3d_html": html, |
| "A_vine": scene_data["A_vine"], |
| "sunlit_fraction": scene_data["sunlit_fraction"], |
| "hour": scene_data["hour"], |
| "date": scene_data["date"], |
| } |
| except Exception as exc: |
| return {"error": f"3D scene build failed: {exc}"} |
|
|
|
|
| |
| |
| |
|
|
| class EnergyService(BaseService): |
| """Energy generation data from ThingsBoard Plant asset. |
| |
| The 'Yeruham Vineyard' asset (type=Plant) provides: |
| - ``power``: instantaneous power in W |
| - ``production``: energy produced per 5-min interval in Wh |
| |
| Daily kWh = sum(production) / 1000 over the day. |
| """ |
|
|
| service_name = "energy" |
|
|
| def __init__(self, tb_client: Any = None): |
| self._tb = tb_client |
| self._breaker = CircuitBreaker(threshold=3, cooldown_sec=300) |
| self._current_cache = TTLCache(ttl_seconds=300, redis_prefix="energy:") |
| self._daily_cache = TTLCache(ttl_seconds=900, redis_prefix="energy_daily:") |
|
|
| def _client(self): |
| if self._tb is None: |
| from src.data.thingsboard_client import ThingsBoardClient |
| self._tb = ThingsBoardClient() |
| return self._tb |
|
|
| |
| |
| |
|
|
| def get_current(self) -> Dict[str, Any]: |
| """Latest power reading from the Plant asset (5-min TTL cache).""" |
| cached = self._current_cache.get("current") |
| if cached is not None: |
| return cached |
| if self._breaker.is_open: |
| return {"error": "ThingsBoard circuit breaker open β retrying in 5 min"} |
| try: |
| vals = self._client().get_asset_latest("Plant", ["power", "production"]) |
| power_w = vals.get("power") |
| self._breaker.record_success() |
| result = { |
| "power_kw": round(power_w / 1000, 1) if power_w else None, |
| "source": "ThingsBoard Plant asset", |
| } |
| self._current_cache.set("current", result) |
| return result |
| except Exception as exc: |
| self._breaker.record_failure() |
| return {"error": f"Energy current failed: {exc}"} |
|
|
| def get_daily_production(self, target_date: Optional[str] = None) -> Dict[str, Any]: |
| """Accumulated energy production for a single day (real TB data, 15-min TTL cache). |
| |
| Returns dict with daily_kwh, peak_hour, hourly_profile. |
| """ |
| try: |
| target = target_date or str(date.today()) |
| cached = self._daily_cache.get(f"daily:{target}") |
| if cached is not None: |
| return cached |
| day_start = pd.Timestamp(target, tz="UTC") |
| day_end = day_start + pd.Timedelta(days=1) |
|
|
| df = self._client().get_asset_timeseries( |
| "Plant", ["production"], |
| start=day_start.to_pydatetime(), |
| end=day_end.to_pydatetime(), |
| limit=500, |
| interval_ms=3_600_000, |
| agg="SUM", |
| ) |
| if df.empty or "production" not in df.columns: |
| return {"date": target, "daily_kwh": None, "error": "No production data"} |
|
|
| |
| df["kwh"] = df["production"].fillna(0) / 1000 |
| total_kwh = df["kwh"].sum() |
|
|
| |
| try: |
| import zoneinfo |
| tz_il = zoneinfo.ZoneInfo("Asia/Jerusalem") |
| except Exception: |
| tz_il = None |
|
|
| hourly_profile = [] |
| peak_hour = 12 |
| peak_kwh = 0.0 |
| for ts, row in df.iterrows(): |
| local_ts = ts.astimezone(tz_il) if tz_il else ts |
| h = local_ts.hour if hasattr(local_ts, "hour") else 0 |
| kwh = row["kwh"] |
| hourly_profile.append({"hour": h, "energy_kwh": round(kwh, 2)}) |
| if kwh > peak_kwh: |
| peak_kwh = kwh |
| peak_hour = h |
|
|
| result = { |
| "date": target, |
| "daily_kwh": round(total_kwh, 1), |
| "peak_hour": peak_hour, |
| "peak_hour_kwh": round(peak_kwh, 2), |
| "hourly_profile": hourly_profile, |
| "source": "ThingsBoard Plant asset", |
| } |
| self._daily_cache.set(f"daily:{target}", result) |
| return result |
| except Exception as exc: |
| return {"date": target_date, "daily_kwh": None, "error": f"Energy fetch failed: {exc}"} |
|
|
| def get_history(self, hours_back: int = 24) -> Dict[str, Any]: |
| """Hourly power time-series from TB Plant asset.""" |
| try: |
| end = datetime.now(tz=timezone.utc) |
| start = end - timedelta(hours=hours_back) |
| df = self._client().get_asset_timeseries( |
| "Plant", ["power", "production"], |
| start=start, end=end, |
| limit=500, |
| interval_ms=3_600_000, |
| agg="AVG", |
| ) |
| if df.empty: |
| return {"error": f"No energy data in last {hours_back} hours."} |
| df["power_kw"] = df["power"].fillna(0) / 1000 |
| return summarise_dataframe(df[["power_kw"]]) |
| except Exception as exc: |
| return {"error": f"Energy history failed: {exc}"} |
|
|
| def predict(self, target_date: Optional[str] = None, |
| *, ims_df: Optional[pd.DataFrame] = None) -> Dict[str, Any]: |
| """For future dates: analytical estimate. For past/today: real TB data.""" |
| target = target_date or str(date.today()) |
| target_d = date.fromisoformat(target) |
| today = date.today() |
|
|
| |
| if target_d <= today: |
| return self.get_daily_production(target) |
|
|
| |
| return self._predict_analytical(target, ims_df=ims_df) |
|
|
| def _predict_analytical(self, target_date: str, |
| *, ims_df: Optional[pd.DataFrame] = None) -> Dict[str, Any]: |
| """Energy estimate for future dates. |
| |
| Strategy (in priority order): |
| 1. ML predictor (XGBoost) with ThingsBoard Air1 weather persistence |
| 2. ML predictor with IMS weather persistence |
| 3. Analytical fallback (GHI Γ system capacity) |
| """ |
| |
| try: |
| result = self._predict_ml(target_date) |
| if result and result.get("daily_kwh") is not None: |
| return result |
| except Exception: |
| pass |
|
|
| |
| try: |
| if ims_df is not None: |
| df = ims_df |
| else: |
| from src.ims_client import IMSClient |
| df = IMSClient().load_cached() |
| if df.empty: |
| return {"date": target_date, "daily_kwh": None, "error": "No weather data"} |
|
|
| if "timestamp_utc" in df.columns: |
| df = df.copy() |
| df["timestamp_utc"] = pd.to_datetime(df["timestamp_utc"], utc=True) |
| df = df.set_index("timestamp_utc") |
|
|
| |
| try: |
| from src.energy_predictor import EnergyPredictor |
| ep = EnergyPredictor() |
| return ep.predict_day_from_weather_df(target_date, df.tail(96)) |
| except Exception as exc: |
| log.warning("ML energy prediction unavailable, falling back to analytical: %s", exc) |
|
|
| |
| day_df = df.tail(96).copy() |
| if day_df.empty: |
| return {"date": target_date, "daily_kwh": None, "error": "Not enough IMS data"} |
|
|
| ghi_col = next( |
| (c for c in day_df.columns if "ghi" in c.lower() or "rad" in c.lower()), None) |
| if ghi_col is None: |
| return {"date": target_date, "daily_kwh": None, "error": "No GHI column"} |
|
|
| from config.settings import SYSTEM_CAPACITY_KW, STC_IRRADIANCE_W_M2 |
| system_kw = SYSTEM_CAPACITY_KW |
| stc_ghi = STC_IRRADIANCE_W_M2 |
| slot_hours = 0.25 |
|
|
| total_kwh = 0.0 |
| hourly_kwh: Dict[int, float] = {} |
| for idx, row in day_df.iterrows(): |
| ghi = float(row[ghi_col]) if pd.notna(row.get(ghi_col)) else 0.0 |
| if ghi <= 0: |
| continue |
| energy = system_kw * (ghi / stc_ghi) * slot_hours |
| total_kwh += energy |
| h = idx.hour if hasattr(idx, "hour") else 0 |
| hourly_kwh[h] = hourly_kwh.get(h, 0) + energy |
|
|
| peak_hour = max(hourly_kwh, key=hourly_kwh.get) if hourly_kwh else 12 |
| hourly_profile = [ |
| {"hour": h, "energy_kwh": round(e, 2)} |
| for h, e in sorted(hourly_kwh.items()) |
| ] |
| return { |
| "date": target_date, |
| "daily_kwh": round(total_kwh, 1), |
| "peak_hour": peak_hour, |
| "peak_hour_kwh": round(hourly_kwh.get(peak_hour, 0), 2), |
| "hourly_profile": hourly_profile, |
| "source": f"Analytical estimate (persistence forecast Γ {system_kw:.0f} kW system)", |
| } |
| except Exception as exc: |
| return {"date": target_date, "daily_kwh": None, "error": f"Prediction failed: {exc}"} |
|
|
| def _predict_ml(self, target_date: str) -> Optional[Dict[str, Any]]: |
| """ML energy prediction using latest ThingsBoard Air1 weather as persistence forecast.""" |
| from src.energy_predictor import EnergyPredictor |
|
|
| ep = EnergyPredictor() |
|
|
| |
| end = datetime.now(tz=timezone.utc) |
| start = end - timedelta(hours=24) |
|
|
| df = self._client().get_timeseries( |
| "Air1", |
| keys=["GSR", "airTemperature", "windSpeed"], |
| start=start, end=end, |
| limit=500, |
| interval_ms=3_600_000, |
| agg="AVG", |
| ) |
| if df.empty or len(df) < 8: |
| return None |
|
|
| return ep.predict_day_from_weather_df(target_date, df) |
|
|
|
|
| |
| |
| |
|
|
| class AdvisoryService(BaseService): |
| """Gemini-powered day-ahead stress advisory.""" |
|
|
| service_name = "advisory" |
|
|
| def __init__(self, vine_sensor_svc: Optional[VineSensorService] = None, verbose: bool = False): |
| self._vine_svc = vine_sensor_svc |
| self._verbose = verbose |
|
|
| def run_advisory(self, target_date: Optional[str] = None) -> Dict[str, Any]: |
| """Full DayAheadAdvisor report, enriched with vine snapshot if available.""" |
| try: |
| from src.day_ahead_advisor import DayAheadAdvisor |
| from src.ims_client import IMSClient |
|
|
| advisor = DayAheadAdvisor(verbose=self._verbose) |
| weather_df = IMSClient().load_cached() |
| if weather_df.empty: |
| return {"error": "No IMS weather data cached. Cannot run advisory."} |
|
|
| vine_snapshot = None |
| if self._vine_svc: |
| snap_dict = self._vine_svc.get_snapshot() |
| if "error" not in snap_dict: |
| |
| try: |
| from src.thingsboard_client import ThingsBoardClient |
| tb = self._vine_svc._client() |
| vine_snapshot = tb.get_vine_snapshot() |
| except Exception: |
| pass |
|
|
| report = advisor.advise( |
| date=target_date or str(date.today()), |
| weather_forecast=weather_df, |
| phenological_stage="vegetative", |
| vine_snapshot=vine_snapshot, |
| ) |
| return DayAheadAdvisor.report_to_dict(report) |
| except Exception as exc: |
| return {"error": f"Advisory failed: {exc}"} |
|
|
|
|
| |
| |
| |
|
|
| class BiologyService(BaseService): |
| """Biology rules lookup + chill unit computation.""" |
|
|
| service_name = "biology" |
|
|
| def __init__(self, rules: Optional[Dict[str, str]] = None, tb_client: Any = None): |
| if rules is None: |
| from src.vineyard_chatbot import BIOLOGY_RULES |
| rules = BIOLOGY_RULES |
| self._rules = rules |
| self._tb = tb_client |
| self._chill_cache = TTLCache(ttl_seconds=21600, redis_prefix="biology:") |
|
|
| def _client(self): |
| if self._tb is None: |
| from src.data.thingsboard_client import ThingsBoardClient |
| self._tb = ThingsBoardClient() |
| return self._tb |
|
|
| def explain_rule(self, rule_name: str) -> Dict[str, Any]: |
| key = rule_name.lower().strip() |
| if key in self._rules: |
| return {"rule": key, "explanation": self._rules[key]} |
| return {"error": f"Unknown rule '{key}'", "available_rules": list(self._rules.keys())} |
|
|
| def list_rules(self) -> Dict[str, Any]: |
| return {"rules": list(self._rules.keys())} |
|
|
| def get_chill_units(self, season_start: str = "2025-11-01") -> Dict[str, Any]: |
| """Accumulated chill units from ThingsBoard Air1 temperature (Utah model, 6h TTL). |
| |
| Richardson et al. 1974: |
| T <= 7Β°C β +1.0 CU/hour |
| 7 < T <= 10 β +0.5 |
| 10 < T <= 18 β 0.0 |
| T > 18 β -1.0 |
| """ |
| cache_key = f"chill:{season_start}" |
| cached = self._chill_cache.get(cache_key) |
| if cached is not None: |
| return cached |
|
|
| try: |
| import numpy as np |
| from zoneinfo import ZoneInfo |
|
|
| tz = ZoneInfo("Asia/Jerusalem") |
| client = self._client() |
| start = pd.Timestamp(season_start, tz="UTC") |
| end = pd.Timestamp.now(tz="UTC") |
|
|
| |
| chunks = [] |
| cursor = start |
| while cursor < end: |
| chunk_end = min(cursor + pd.Timedelta(days=7), end) |
| try: |
| df = client.get_timeseries( |
| "Air1", ["airTemperature"], |
| start=cursor.to_pydatetime(), end=chunk_end.to_pydatetime(), |
| interval_ms=0, agg="NONE", limit=10000, |
| ) |
| if not df.empty: |
| chunks.append(df) |
| except Exception: |
| pass |
| cursor = chunk_end |
|
|
| if not chunks: |
| return {"error": "No Air1 temperature data available from ThingsBoard"} |
|
|
| full = pd.concat(chunks).sort_index() |
| full = full[~full.index.duplicated(keep="first")] |
| full = full.tz_convert(tz) |
|
|
| hourly = full["airTemperature"].resample("1h").mean().dropna() |
| if hourly.empty: |
| return {"error": "No hourly temperature after resampling"} |
|
|
| PANEL_MULTIPLIER = 1.1 |
| temps = hourly.values |
| chill_hourly = np.select( |
| [temps <= 7.0, (temps > 7.0) & (temps <= 10.0), |
| (temps > 10.0) & (temps <= 18.0), temps > 18.0], |
| [1.0, 0.5, 0.0, -1.0], |
| ) |
|
|
| daily_chill = pd.Series(chill_hourly, index=hourly.index).resample("D").sum().clip(lower=0) |
| cu_open = daily_chill.cumsum() |
| cu_panels = (daily_chill * PANEL_MULTIPLIER).cumsum() |
|
|
| daily = [ |
| { |
| "date": ts.strftime("%Y-%m-%d"), |
| "under_panels": round(float(cu_panels.loc[ts]), 1), |
| "open_field": round(float(cu_open.loc[ts]), 1), |
| } |
| for ts in daily_chill.index |
| ] |
|
|
| result = { |
| "season_start": season_start, |
| "latest_under_panels": round(float(cu_panels.iloc[-1]), 1) if len(cu_panels) else 0, |
| "latest_open_field": round(float(cu_open.iloc[-1]), 1) if len(cu_open) else 0, |
| "days_counted": len(daily_chill), |
| "daily": daily, |
| } |
| self._chill_cache.set(cache_key, result) |
| return result |
| except Exception as exc: |
| log.error("Chill units failed: %s", exc) |
| return {"error": f"Chill units failed: {exc}"} |
|
|
|
|
| |
| |
| |
|
|
| class DataHub: |
| """Lightweight registry of data-provider services. |
| |
| Usage |
| ----- |
| hub = DataHub.default() |
| hub.weather.get_current() |
| hub.vine_sensors.get_snapshot() |
| hub.photosynthesis.predict_fvcb(PAR=1500, ...) |
| hub.energy.get_current() |
| |
| The chatbot receives a hub at init and delegates all data access |
| through it β never importing data clients directly. |
| """ |
|
|
| def __init__(self) -> None: |
| self._services: Dict[str, BaseService] = {} |
|
|
| |
|
|
| def register(self, service: BaseService) -> None: |
| self._services[service.service_name] = service |
|
|
| def get(self, name: str) -> BaseService: |
| if name not in self._services: |
| raise KeyError(f"No service registered as '{name}'. " |
| f"Available: {list(self._services)}") |
| return self._services[name] |
|
|
| |
|
|
| @property |
| def weather(self) -> WeatherService: |
| return self._services["weather"] |
|
|
| @property |
| def vine_sensors(self) -> VineSensorService: |
| return self._services["vine_sensors"] |
|
|
| @property |
| def photosynthesis(self) -> PhotosynthesisService: |
| return self._services["photosynthesis"] |
|
|
| @property |
| def energy(self) -> EnergyService: |
| return self._services["energy"] |
|
|
| @property |
| def advisory(self) -> AdvisoryService: |
| return self._services["advisory"] |
|
|
| @property |
| def biology(self) -> BiologyService: |
| return self._services["biology"] |
|
|
| |
|
|
| @classmethod |
| def default(cls, verbose: bool = False) -> "DataHub": |
| """Create a hub with all default services (lazy clients).""" |
| hub = cls() |
| vine_svc = VineSensorService() |
| hub.register(WeatherService()) |
| hub.register(vine_svc) |
| hub.register(PhotosynthesisService()) |
| hub.register(EnergyService()) |
| hub.register(AdvisoryService(vine_sensor_svc=vine_svc, verbose=verbose)) |
| hub.register(BiologyService()) |
| return hub |
|
|