| """ |
| IMSClient: fetch and cache IMS weather data from station 43 (Sde Boker). |
| Resamples 10min data to 15min for alignment with sensor data. |
| """ |
|
|
| import os |
| import time |
| from pathlib import Path |
| from typing import Optional |
|
|
| import pandas as pd |
| import requests |
|
|
| try: |
| from dotenv import load_dotenv |
| load_dotenv() |
| except ImportError: |
| pass |
|
|
|
|
| def _parse_ims_date(d: str) -> str: |
| """Convert YYYY-MM-DD to IMS format YYYY/MM/DD.""" |
| return d.replace("-", "/") |
|
|
|
|
| class IMSClient: |
| """Fetch IMS API data for a station and cache to Data/ims/.""" |
|
|
| def __init__( |
| self, |
| token: Optional[str] = None, |
| station_id: Optional[int] = None, |
| cache_dir: Optional[Path] = None, |
| channel_map: Optional[dict[int, str]] = None, |
| ): |
| from config import settings |
|
|
| self.token = (token or os.environ.get("IMS_API_TOKEN", "")).strip() |
| if not self.token: |
| raise ValueError( |
| "IMS API token is required. Set IMS_API_TOKEN in .env, " |
| "in Streamlit Secrets, or pass token= to IMSClient." |
| ) |
| self.station_id = station_id or settings.IMS_STATION_ID |
| self.cache_dir = cache_dir or settings.IMS_CACHE_DIR |
| self.channel_map = channel_map or settings.IMS_CHANNEL_MAP.copy() |
| self._base = f"{settings.IMS_BASE_URL}/{self.station_id}/data" |
| self._stations_url = settings.IMS_BASE_URL |
|
|
| def get_station_metadata(self, station_id: Optional[int] = None) -> dict: |
| """ |
| Fetch station metadata from IMS API (name, location, monitors/channels). |
| Returns dict with 'stationId', 'name', 'monitors' (list of {channelId, name, units, ...}). |
| """ |
| sid = station_id or self.station_id |
| url = f"{self._stations_url}/{sid}" |
| headers = {"Authorization": f"ApiToken {self.token}"} |
| r = requests.get(url, headers=headers, timeout=30) |
| r.raise_for_status() |
| return r.json() |
|
|
| def list_channels(self, station_id: Optional[int] = None) -> list[dict]: |
| """Return list of channel descriptors for the station (channelId, name, units, active).""" |
| meta = self.get_station_metadata(station_id) |
| monitors = meta.get("monitors", meta.get("channelGroups", [])) |
| |
| out = [] |
| for m in monitors: |
| if isinstance(m, dict): |
| out.append({ |
| "channelId": m.get("channelId", m.get("id")), |
| "name": m.get("name", m.get("channelName", "")), |
| "units": m.get("units", ""), |
| "active": m.get("active", True), |
| }) |
| return out |
|
|
| def fetch_channel( |
| self, |
| channel_id: int, |
| from_date: str, |
| to_date: str, |
| ) -> pd.DataFrame: |
| """ |
| Fetch one channel for date range. Dates as YYYY-MM-DD. |
| Returns DataFrame with timestamp_utc and one value column. |
| """ |
| from_f = _parse_ims_date(from_date) |
| to_f = _parse_ims_date(to_date) |
| url = f"{self._base}/{channel_id}?from={from_f}&to={to_f}" |
| headers = {"Authorization": f"ApiToken {self.token}"} |
| r = requests.get(url, headers=headers, timeout=120) |
| r.raise_for_status() |
| if not r.text or not r.text.strip(): |
| return pd.DataFrame() |
| try: |
| raw = r.json() |
| except Exception: |
| return pd.DataFrame() |
| data = raw.get("data", raw) if isinstance(raw, dict) else raw |
| if not isinstance(data, list): |
| data = [] |
| col_name = self.channel_map.get(channel_id, f"channel_{channel_id}") |
| rows = [] |
| for item in data: |
| dt = item.get("datetime") |
| |
| if isinstance(dt, str): |
| ts = pd.to_datetime(dt) |
| if ts.tzinfo is None: |
| ts = ts.tz_localize("Asia/Jerusalem").tz_convert("UTC") |
| else: |
| ts = ts.tz_convert("UTC") |
| else: |
| continue |
| ch_list = item.get("channels", []) |
| val = None |
| for ch in ch_list: |
| if ch.get("id") == channel_id and ch.get("status") == 1: |
| val = ch.get("value") |
| break |
| rows.append({"timestamp_utc": ts, col_name: val}) |
| df = pd.DataFrame(rows) |
| if not df.empty: |
| df = df.dropna(subset=[col_name]) |
| df = df.set_index("timestamp_utc").sort_index() |
| return df |
|
|
| def fetch_all_channels( |
| self, |
| from_date: str, |
| to_date: str, |
| delay_seconds: float = 0.5, |
| ) -> pd.DataFrame: |
| """Fetch all configured channels and merge on timestamp_utc.""" |
| out = None |
| for ch_id, col_name in self.channel_map.items(): |
| df = self.fetch_channel(ch_id, from_date, to_date) |
| if df.empty: |
| continue |
| df = df.rename(columns={c: c for c in df.columns}) |
| if out is None: |
| out = df |
| else: |
| out = out.join(df, how="outer") |
| time.sleep(delay_seconds) |
| if out is None: |
| return pd.DataFrame() |
| out = out.reset_index() |
| return out |
|
|
| def resample_to_15min(self, df: pd.DataFrame) -> pd.DataFrame: |
| """Resample 10min IMS data to 15min (mean). Expects timestamp_utc column.""" |
| if df.empty or "timestamp_utc" not in df.columns: |
| return df |
| d = df.set_index("timestamp_utc") |
| d = d.resample("15min").mean().dropna(how="all") |
| return d.reset_index() |
|
|
| def load_cached(self, cache_path: Optional[Path] = None) -> pd.DataFrame: |
| """Load merged IMS data from cache file if it exists.""" |
| path = cache_path or (self.cache_dir / "ims_merged_15min.csv") |
| if not path.exists(): |
| return pd.DataFrame() |
| df = pd.read_csv(path) |
| if "timestamp_utc" in df.columns: |
| df["timestamp_utc"] = pd.to_datetime(df["timestamp_utc"], utc=True) |
| return df |
|
|
| def fetch_and_cache( |
| self, |
| from_date: str, |
| to_date: str, |
| cache_path: Optional[Path] = None, |
| chunk_days: Optional[int] = 60, |
| ) -> pd.DataFrame: |
| """ |
| Fetch all channels for the date range, resample to 15min, save to cache. |
| If chunk_days is set, split the range into chunks to avoid API empty responses. |
| """ |
| path = cache_path or (self.cache_dir / "ims_merged_15min.csv") |
| path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| from datetime import datetime, timedelta |
|
|
| start = datetime.strptime(from_date, "%Y-%m-%d").date() |
| end = datetime.strptime(to_date, "%Y-%m-%d").date() |
| if start > end: |
| start, end = end, start |
|
|
| if chunk_days is None or (end - start).days <= chunk_days: |
| df = self.fetch_all_channels(from_date, to_date) |
| else: |
| chunks = [] |
| d = start |
| while d < end: |
| chunk_end = min(d + timedelta(days=chunk_days), end) |
| from_s = d.strftime("%Y-%m-%d") |
| to_s = chunk_end.strftime("%Y-%m-%d") |
| try: |
| df_chunk = self.fetch_all_channels(from_s, to_s) |
| if not df_chunk.empty: |
| chunks.append(df_chunk) |
| except Exception: |
| pass |
| d = chunk_end |
| df = pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame() |
| if not df.empty and "timestamp_utc" in df.columns: |
| df = df.drop_duplicates(subset=["timestamp_utc"]).sort_values("timestamp_utc") |
|
|
| if df.empty: |
| return df |
| df = self.resample_to_15min(df) |
| df.to_csv(path, index=False) |
| return df |
|
|