""" SensorDataLoader: load and filter sensors_wide.csv for Stage 1 (Farquhar model). Uses only on-site sensor data from the sensor data directory. """ from pathlib import Path from typing import Optional import pandas as pd # Stage 1 columns (Farquhar + CWSI) per context/2_plan.md STAGE1_COLUMNS = [ "Air1_PAR_ref", "Air1_leafTemperature_ref", "Air1_airTemperature_ref", "Air1_CO2_ref", "Air1_VPD_ref", "Air1_airHumidity_ref", ] # Optional spectral indices (Crop sensors); include if present STAGE1_OPTIONAL = ["Air1_NDVI_ref", "Air1_PRI_ref", "Air1_rNDVI_ref", "Air1_RENDVI_ref"] # Default timestamp column name in wide CSV DEFAULT_TIMESTAMP_COL = "time" class SensorDataLoader: """Load sensors_wide.csv and provide Stage 1 columns and daytime filter.""" def __init__( self, data_path: Optional[Path] = None, metadata_path: Optional[Path] = None, ): from config import settings _default = settings.SENSORS_WIDE_PATH if not _default.exists() and settings.SENSORS_WIDE_SAMPLE_PATH.exists(): _default = settings.SENSORS_WIDE_SAMPLE_PATH self.data_path = data_path or _default self.metadata_path = metadata_path or settings.SENSORS_WIDE_METADATA_PATH def get_stage1_columns(self) -> list[str]: """Return list of column names required for Stage 1 (Farquhar + CWSI).""" return list(STAGE1_COLUMNS) def load( self, columns: Optional[list[str]] = None, timestamp_col: Optional[str] = None, ) -> pd.DataFrame: """ Load sensors_wide.csv. If columns is None, load all Stage 1 columns plus timestamp. Columns not present are dropped from the request. """ ts_col = timestamp_col or DEFAULT_TIMESTAMP_COL use_cols = columns if columns is not None else self.get_stage1_columns() use_cols = [c for c in use_cols if c != ts_col] if ts_col not in use_cols: use_cols = [ts_col] + use_cols df = pd.read_csv(self.data_path, usecols=lambda c: c in use_cols) missing = [c for c in use_cols if c not in df.columns] if missing: raise ValueError( f"Sensor data missing required columns: {missing}. " f"Available: {list(df.columns)[:20]}{'...' if len(df.columns) > 20 else ''}" ) if ts_col in df.columns: df[ts_col] = pd.to_datetime(df[ts_col], utc=True) df = df.sort_values(ts_col).reset_index(drop=True) # Correct Air1_CO2_ref — raw sensor reads ≈ 30% too high if "Air1_CO2_ref" in df.columns: df["Air1_CO2_ref"] = df["Air1_CO2_ref"] * 0.7 return df def filter_daytime( self, df: pd.DataFrame, par_threshold: float = 50.0, par_column: str = "Air1_PAR_ref", ) -> pd.DataFrame: """Keep only rows where PAR > par_threshold (daytime, umol m-2 s-1).""" if par_column not in df.columns: return df return df.loc[df[par_column] > par_threshold].copy()