api / src /data /sensor_data_loader.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
SensorDataLoader: load and filter sensors_wide.csv for Stage 1 (Farquhar model).
Uses only on-site sensor data from the sensor data directory.
"""
from pathlib import Path
from typing import Optional
import pandas as pd
# Stage 1 columns (Farquhar + CWSI) per context/2_plan.md
STAGE1_COLUMNS = [
"Air1_PAR_ref",
"Air1_leafTemperature_ref",
"Air1_airTemperature_ref",
"Air1_CO2_ref",
"Air1_VPD_ref",
"Air1_airHumidity_ref",
]
# Optional spectral indices (Crop sensors); include if present
STAGE1_OPTIONAL = ["Air1_NDVI_ref", "Air1_PRI_ref", "Air1_rNDVI_ref", "Air1_RENDVI_ref"]
# Default timestamp column name in wide CSV
DEFAULT_TIMESTAMP_COL = "time"
class SensorDataLoader:
"""Load sensors_wide.csv and provide Stage 1 columns and daytime filter."""
def __init__(
self,
data_path: Optional[Path] = None,
metadata_path: Optional[Path] = None,
):
from config import settings
_default = settings.SENSORS_WIDE_PATH
if not _default.exists() and settings.SENSORS_WIDE_SAMPLE_PATH.exists():
_default = settings.SENSORS_WIDE_SAMPLE_PATH
self.data_path = data_path or _default
self.metadata_path = metadata_path or settings.SENSORS_WIDE_METADATA_PATH
def get_stage1_columns(self) -> list[str]:
"""Return list of column names required for Stage 1 (Farquhar + CWSI)."""
return list(STAGE1_COLUMNS)
def load(
self,
columns: Optional[list[str]] = None,
timestamp_col: Optional[str] = None,
) -> pd.DataFrame:
"""
Load sensors_wide.csv. If columns is None, load all Stage 1 columns
plus timestamp. Columns not present are dropped from the request.
"""
ts_col = timestamp_col or DEFAULT_TIMESTAMP_COL
use_cols = columns if columns is not None else self.get_stage1_columns()
use_cols = [c for c in use_cols if c != ts_col]
if ts_col not in use_cols:
use_cols = [ts_col] + use_cols
df = pd.read_csv(self.data_path, usecols=lambda c: c in use_cols)
missing = [c for c in use_cols if c not in df.columns]
if missing:
raise ValueError(
f"Sensor data missing required columns: {missing}. "
f"Available: {list(df.columns)[:20]}{'...' if len(df.columns) > 20 else ''}"
)
if ts_col in df.columns:
df[ts_col] = pd.to_datetime(df[ts_col], utc=True)
df = df.sort_values(ts_col).reset_index(drop=True)
# Correct Air1_CO2_ref — raw sensor reads ≈ 30% too high
if "Air1_CO2_ref" in df.columns:
df["Air1_CO2_ref"] = df["Air1_CO2_ref"] * 0.7
return df
def filter_daytime(
self,
df: pd.DataFrame,
par_threshold: float = 50.0,
par_column: str = "Air1_PAR_ref",
) -> pd.DataFrame:
"""Keep only rows where PAR > par_threshold (daytime, umol m-2 s-1)."""
if par_column not in df.columns:
return df
return df.loc[df[par_column] > par_threshold].copy()