| """ |
| LLMDataEngineer: Gemini-assisted sensor data cleaning and feature engineering |
| for the SolarWine agrivoltaic pipeline. |
| |
| Phase 8B tasks: |
| - llm-data-cleaning : Gemini analyzes sensor stats, returns Z-score/IQR |
| filter thresholds for automated anomaly detection. |
| - llm-feature-eng : Gemini confirms feature formulae; module generates |
| cyclical time features and a Stress Risk Score. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Optional |
|
|
| import hashlib |
| import numpy as np |
| import pandas as pd |
|
|
| from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key |
| from src.time_features import add_cyclical_time_features |
|
|
|
|
| |
| |
| |
|
|
| SENSOR_CONTEXT = { |
| "Air1_PAR_ref": { |
| "description": "Photosynthetically Active Radiation (PAR)", |
| "unit": "μmol photons m⁻² s⁻¹", |
| "physical_range": [0, 2500], |
| "notes": "Solar PAR at surface cannot exceed ~2200–2500 under any realistic sky. " |
| "Values above 3000 are sensor artefacts.", |
| }, |
| "Air1_leafTemperature_ref": { |
| "description": "Leaf (canopy) temperature", |
| "unit": "°C", |
| "physical_range": [-5, 55], |
| "notes": "Grape leaf temperature in the Negev can reach ~45°C on extreme days, " |
| "but values above 55°C are physiologically impossible for a living leaf.", |
| }, |
| "Air1_airTemperature_ref": { |
| "description": "Air temperature near canopy", |
| "unit": "°C", |
| "physical_range": [0, 50], |
| "notes": "Sde Boker record high is ~47°C. Values above 50°C or below 0°C " |
| "during the growing season (May–Sep) are sensor faults.", |
| }, |
| "Air1_VPD_ref": { |
| "description": "Vapour Pressure Deficit", |
| "unit": "kPa", |
| "physical_range": [0, 7], |
| "notes": "Desert VPD rarely exceeds 6–7 kPa even in extreme heat. " |
| "Negative values and values above 8 kPa are sensor errors.", |
| }, |
| "Air1_airHumidity_ref": { |
| "description": "Relative Humidity", |
| "unit": "%", |
| "physical_range": [0, 100], |
| "notes": "Must be in [0, 100]. Values outside this range are invalid.", |
| }, |
| "Air1_CO2_ref": { |
| "description": "CO₂ concentration (raw sensor, corrected ×0.7 by SensorDataLoader)", |
| "unit": "ppm (raw)", |
| "physical_range": [400, 4000], |
| "notes": "Raw sensor reads ~30% too high (corrected ×0.7 in the data pipeline). " |
| "Raw values above 4000 ppm or below 400 ppm are sensor artefacts. " |
| "Post-correction (~280–2800 ppm) values above 2000 ppm indicate sensor drift.", |
| }, |
| } |
|
|
| _SYSTEM_PROMPT_CLEANING = ( |
| "You are a precision-agriculture sensor data quality engineer. " |
| "You are given descriptive statistics for sensor columns from a vineyard " |
| "in the Negev desert, Israel (Sde Boker region, Semillon grapevine, May–September). " |
| "Your task: for each column, propose anomaly filter thresholds to flag " |
| "or remove invalid readings. " |
| "Return ONLY a JSON object (no markdown, no explanation) with the following schema:\n" |
| "{\n" |
| ' "<column_name>": {\n' |
| ' "lower_bound": <float or null>,\n' |
| ' "upper_bound": <float or null>,\n' |
| ' "zscore_threshold": <float>,\n' |
| ' "iqr_multiplier": <float>,\n' |
| ' "rationale": "<one sentence>"\n' |
| " },\n" |
| " ...\n" |
| "}" |
| ) |
|
|
| _SYSTEM_PROMPT_FEATURES = ( |
| "You are a precision-agriculture feature engineering expert specialising in " |
| "grapevine physiology and agrivoltaic systems. " |
| "Given the available sensor columns, propose the exact mathematical formulae " |
| "for a Stress Risk Score that combines VPD and (optionally) CWSI. " |
| "Return ONLY a JSON object (no markdown, no explanation) with schema:\n" |
| "{\n" |
| ' "stress_risk_score": {\n' |
| ' "formula_description": "<one sentence>",\n' |
| ' "vpd_weight": <float>,\n' |
| ' "cwsi_weight": <float>,\n' |
| ' "vpd_clip_max": <float>,\n' |
| ' "cwsi_clip_max": <float>,\n' |
| ' "rationale": "<one or two sentences on biological justification>"\n' |
| " }\n" |
| "}" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def _extract_json(text: str) -> dict: |
| """Thin wrapper around the shared genai_utils implementation.""" |
| return extract_json_object(text) |
|
|
|
|
| |
| |
| |
|
|
| class LLMDataEngineer: |
| """ |
| Gemini-assisted sensor data cleaning and feature engineering. |
| |
| Usage |
| ----- |
| engineer = LLMDataEngineer() |
| df_clean, thresholds, features_meta = engineer.run_pipeline(df) |
| """ |
|
|
| def __init__( |
| self, |
| model_name: str = "gemini-2.5-flash", |
| api_key: Optional[str] = None, |
| verbose: bool = True, |
| ): |
| self.model_name = model_name |
| self._api_key = api_key |
| self._client = None |
| self.verbose = verbose |
| |
| self._threshold_cache: dict[str, dict] = {} |
| self._feature_spec_cache: dict[str, dict] = {} |
|
|
| |
| |
| |
|
|
| @property |
| def api_key(self) -> str: |
| return get_google_api_key(self._api_key) |
|
|
| @property |
| def client(self): |
| if self._client is None: |
| self._client = get_genai_client(self._api_key) |
| return self._client |
|
|
| def _call_gemini(self, system_prompt: str, user_prompt: str) -> str: |
| """Send a prompt to Gemini and return the raw text response.""" |
| response = self.client.models.generate_content( |
| model=self.model_name, |
| contents=user_prompt, |
| config={"system_instruction": system_prompt}, |
| ) |
| return response.text |
|
|
| @staticmethod |
| def _hash_key(*parts: str) -> str: |
| """Create a short hash from string parts for cache keying.""" |
| return hashlib.md5("|".join(parts).encode()).hexdigest()[:12] |
|
|
| def _log(self, msg: str) -> None: |
| if self.verbose: |
| print(f"[LLMDataEngineer] {msg}") |
|
|
| |
| |
| |
|
|
| def analyze_anomalies( |
| self, |
| df: pd.DataFrame, |
| columns: Optional[list[str]] = None, |
| ) -> dict: |
| """ |
| Send descriptive statistics to Gemini and receive per-column |
| anomaly filter thresholds. |
| |
| Parameters |
| ---------- |
| df : DataFrame with sensor measurements |
| columns : subset of columns to analyze; defaults to SENSOR_CONTEXT keys |
| |
| Returns |
| ------- |
| dict mapping column_name → {lower_bound, upper_bound, |
| zscore_threshold, iqr_multiplier, rationale} |
| """ |
| target_cols = [ |
| c for c in (columns or list(SENSOR_CONTEXT.keys())) if c in df.columns |
| ] |
| if not target_cols: |
| raise ValueError("No recognized sensor columns found in DataFrame.") |
|
|
| stats = df[target_cols].describe(percentiles=[0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99]) |
|
|
| |
| lines = [ |
| "Analyze the following sensor columns from a vineyard dataset.", |
| "For each column, the physical context and expected range are provided.", |
| "", |
| ] |
| for col in target_cols: |
| ctx = SENSOR_CONTEXT.get(col, {}) |
| lines.append(f"Column: {col}") |
| if ctx: |
| lines.append(f" Description : {ctx['description']} ({ctx['unit']})") |
| lines.append(f" Expected range : {ctx['physical_range']}") |
| lines.append(f" Domain notes : {ctx['notes']}") |
| lines.append(" Observed statistics:") |
| for stat_name, val in stats[col].items(): |
| lines.append(f" {stat_name:10s}: {val:.4f}") |
| lines.append("") |
|
|
| user_prompt = "\n".join(lines) |
|
|
| |
| cache_key = self._hash_key(user_prompt) |
| if cache_key in self._threshold_cache: |
| self._log("Using cached anomaly thresholds (same data fingerprint).") |
| return self._threshold_cache[cache_key] |
|
|
| self._log("Querying Gemini for anomaly thresholds …") |
|
|
| try: |
| raw = self._call_gemini(_SYSTEM_PROMPT_CLEANING, user_prompt) |
| thresholds = _extract_json(raw) |
| except Exception as exc: |
| self._log(f"Gemini API error: {exc}. Using statistical fallback.") |
| thresholds = self._fallback_thresholds(df, target_cols) |
|
|
| self._threshold_cache[cache_key] = thresholds |
| self._log(f"Received thresholds for {len(thresholds)} columns.") |
| return thresholds |
|
|
| @staticmethod |
| def _fallback_thresholds(df: pd.DataFrame, cols: list[str]) -> dict: |
| """Conservative statistical fallback used when API is unavailable.""" |
| result = {} |
| for col in cols: |
| ctx = SENSOR_CONTEXT.get(col, {}) |
| phys = ctx.get("physical_range", [None, None]) |
| result[col] = { |
| "lower_bound": phys[0], |
| "upper_bound": phys[1], |
| "zscore_threshold": 3.5, |
| "iqr_multiplier": 3.0, |
| "rationale": "Statistical fallback (Gemini unavailable).", |
| } |
| return result |
|
|
| |
| |
| |
|
|
| def apply_cleaning( |
| self, |
| df: pd.DataFrame, |
| thresholds: dict, |
| strategy: str = "clip", |
| ) -> pd.DataFrame: |
| """ |
| Apply Gemini-generated thresholds to clean the sensor DataFrame. |
| |
| Parameters |
| ---------- |
| df : raw sensor DataFrame |
| thresholds : dict from analyze_anomalies() |
| strategy : 'clip' — clamp values to [lower_bound, upper_bound] |
| 'drop' — drop rows where any column is out of bounds |
| 'nan' — replace out-of-bounds values with NaN |
| |
| Returns |
| ------- |
| Cleaned DataFrame (copy). |
| """ |
| result = df.copy() |
| report_lines = ["Anomaly cleaning report:"] |
|
|
| for col, thresh in thresholds.items(): |
| if col not in result.columns: |
| continue |
| series = result[col] |
| lower = thresh.get("lower_bound") |
| upper = thresh.get("upper_bound") |
|
|
| |
| mask_low = (series < lower) if lower is not None else pd.Series(False, index=series.index) |
| mask_high = (series > upper) if upper is not None else pd.Series(False, index=series.index) |
|
|
| |
| z_thresh = thresh.get("zscore_threshold", 3.5) |
| z_scores = (series - series.mean()) / (series.std() + 1e-9) |
| mask_zscore = z_scores.abs() > z_thresh |
|
|
| |
| iqr_mult = thresh.get("iqr_multiplier", 3.0) |
| q1, q3 = series.quantile(0.25), series.quantile(0.75) |
| iqr = q3 - q1 |
| mask_iqr = (series < q1 - iqr_mult * iqr) | (series > q3 + iqr_mult * iqr) |
|
|
| |
| mask_anomaly = mask_low | mask_high | (mask_zscore & mask_iqr) |
| n_anomalies = int(mask_anomaly.sum()) |
|
|
| if n_anomalies > 0: |
| report_lines.append( |
| f" {col}: {n_anomalies} anomalies ({n_anomalies / len(series) * 100:.2f}%)" |
| ) |
|
|
| if strategy == "clip": |
| result[col] = series.clip( |
| lower=lower if lower is not None else -np.inf, |
| upper=upper if upper is not None else np.inf, |
| ) |
| elif strategy == "nan": |
| result.loc[mask_anomaly, col] = np.nan |
| elif strategy == "drop": |
| result = result.loc[~mask_anomaly].copy() |
| else: |
| raise ValueError(f"Unknown strategy '{strategy}'. Use 'clip', 'nan', or 'drop'.") |
|
|
| self._log("\n".join(report_lines)) |
| return result |
|
|
| |
| |
| |
|
|
| def get_feature_spec( |
| self, |
| available_cols: list[str], |
| ) -> dict: |
| """ |
| Ask Gemini to confirm the Stress Risk Score formula given available columns. |
| |
| Returns a feature spec dict with vpd_weight, cwsi_weight, etc. |
| Falls back to a biologically motivated default if API is unavailable. |
| """ |
| has_cwsi = any("cwsi" in c.lower() or "CWSI" in c for c in available_cols) |
|
|
| |
| cache_key = f"cwsi={has_cwsi}" |
| if cache_key in self._feature_spec_cache: |
| self._log("Using cached feature spec.") |
| return self._feature_spec_cache[cache_key] |
|
|
| user_prompt = ( |
| f"Available sensor columns: {available_cols}.\n" |
| f"CWSI column available: {has_cwsi}.\n" |
| "Propose weights and clip bounds for a Stress Risk Score that linearly " |
| "combines normalised VPD and (if available) normalised CWSI. " |
| "The score should be in [0, 1] and reflect acute heat/drought stress " |
| "for Semillon grapevine in a desert agrivoltaic system." |
| ) |
| self._log("Querying Gemini for Stress Risk Score formula …") |
| try: |
| raw = self._call_gemini(_SYSTEM_PROMPT_FEATURES, user_prompt) |
| spec = _extract_json(raw).get("stress_risk_score", {}) |
| except Exception as exc: |
| self._log(f"Gemini API error: {exc}. Using default feature spec.") |
| spec = {} |
|
|
| |
| defaults = { |
| "formula_description": "Normalised weighted sum of VPD and CWSI stress signals", |
| "vpd_weight": 0.6, |
| "cwsi_weight": 0.4, |
| "vpd_clip_max": 6.0, |
| "cwsi_clip_max": 1.0, |
| "rationale": ( |
| "VPD dominates stomatal response (weight 0.6); " |
| "CWSI captures cumulative water status (weight 0.4)." |
| ), |
| } |
| for k, v in defaults.items(): |
| spec.setdefault(k, v) |
|
|
| self._feature_spec_cache[cache_key] = spec |
| return spec |
|
|
| def engineer_features( |
| self, |
| df: pd.DataFrame, |
| timestamp_col: str = "time", |
| cwsi_col: Optional[str] = None, |
| vpd_col: str = "Air1_VPD_ref", |
| feature_spec: Optional[dict] = None, |
| ) -> pd.DataFrame: |
| """ |
| Add engineered features to the sensor DataFrame. |
| |
| New columns added |
| ----------------- |
| hour_sin, hour_cos – cyclical encoding of hour-of-day |
| doy_sin, doy_cos – cyclical encoding of day-of-year |
| stress_risk_score – weighted VPD (+ CWSI) stress index in [0, 1] |
| |
| Parameters |
| ---------- |
| df : sensor DataFrame (original unmodified) |
| timestamp_col : name of the datetime column (or index if not a column) |
| cwsi_col : optional CWSI column name; if None, stress score uses VPD only |
| vpd_col : VPD column name |
| feature_spec : pre-fetched spec from get_feature_spec(); fetched if None |
| |
| Returns |
| ------- |
| DataFrame copy with additional feature columns. |
| """ |
| result = df.copy() |
|
|
| |
| ts_col = timestamp_col if timestamp_col in result.columns else None |
| use_index = ts_col is None and isinstance(result.index, pd.DatetimeIndex) |
| if ts_col is not None or use_index: |
| result = add_cyclical_time_features( |
| result, |
| timestamp_col=ts_col, |
| index_is_timestamp=use_index, |
| ) |
| self._log("Added cyclical time features: hour_sin, hour_cos, doy_sin, doy_cos") |
| else: |
| self._log("Warning: no timestamp found; skipping cyclical features.") |
|
|
| |
| if vpd_col in result.columns: |
| if feature_spec is None: |
| feature_spec = self.get_feature_spec(list(result.columns)) |
|
|
| vpd_w = float(feature_spec.get("vpd_weight", 0.6)) |
| cwsi_w = float(feature_spec.get("cwsi_weight", 0.4)) |
| vpd_max = float(feature_spec.get("vpd_clip_max", 6.0)) |
| cwsi_max = float(feature_spec.get("cwsi_clip_max", 1.0)) |
|
|
| vpd_norm = (result[vpd_col].clip(0, vpd_max) / vpd_max).fillna(0.0) |
|
|
| if cwsi_col and cwsi_col in result.columns: |
| cwsi_norm = (result[cwsi_col].clip(0, cwsi_max) / cwsi_max).fillna(0.0) |
| effective_cwsi_w = cwsi_w |
| effective_vpd_w = vpd_w |
| else: |
| |
| cwsi_norm = pd.Series(0.0, index=result.index) |
| effective_cwsi_w = 0.0 |
| effective_vpd_w = 1.0 |
|
|
| score = (effective_vpd_w * vpd_norm + effective_cwsi_w * cwsi_norm).clip(0, 1) |
| result["stress_risk_score"] = score.round(4) |
|
|
| self._log( |
| f"Added stress_risk_score (vpd_weight={effective_vpd_w:.2f}, " |
| f"cwsi_weight={effective_cwsi_w:.2f})" |
| ) |
| else: |
| self._log(f"Warning: VPD column '{vpd_col}' not found; skipping stress_risk_score.") |
|
|
| return result |
|
|
| |
| |
| |
|
|
| def run_pipeline( |
| self, |
| df: pd.DataFrame, |
| cleaning_strategy: str = "clip", |
| timestamp_col: str = "time", |
| cwsi_col: Optional[str] = None, |
| vpd_col: str = "Air1_VPD_ref", |
| ) -> tuple[pd.DataFrame, dict, dict]: |
| """ |
| Execute the full LLM data engineering pipeline. |
| |
| Steps |
| ----- |
| 1. Gemini analyzes column stats → anomaly thresholds |
| 2. Apply cleaning (clip / nan / drop) |
| 3. Gemini confirms feature spec → engineer features |
| |
| Returns |
| ------- |
| (df_engineered, thresholds, feature_spec) |
| """ |
| self._log("=== LLM Data Engineering Pipeline ===") |
|
|
| |
| thresholds = self.analyze_anomalies(df) |
|
|
| |
| df_clean = self.apply_cleaning(df, thresholds, strategy=cleaning_strategy) |
|
|
| |
| feature_spec = self.get_feature_spec(list(df_clean.columns)) |
| df_engineered = self.engineer_features( |
| df_clean, |
| timestamp_col=timestamp_col, |
| cwsi_col=cwsi_col, |
| vpd_col=vpd_col, |
| feature_spec=feature_spec, |
| ) |
|
|
| new_cols = [c for c in df_engineered.columns if c not in df.columns] |
| self._log(f"Pipeline complete. New columns: {new_cols}") |
| return df_engineered, thresholds, feature_spec |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| from pathlib import Path |
|
|
| DATA_DIR = Path(__file__).resolve().parent.parent / "Data" |
| sample_path = DATA_DIR / "Seymour" / "sensors_wide_sample.csv" |
| sensors_path = DATA_DIR / "Seymour" / "sensors_wide.csv" |
| csv_path = sample_path if sample_path.exists() else sensors_path |
|
|
| print(f"Loading sensor data from: {csv_path.name}") |
| df_raw = pd.read_csv(csv_path) |
| print(f"Shape: {df_raw.shape} | Columns: {list(df_raw.columns)}\n") |
|
|
| engineer = LLMDataEngineer(verbose=True) |
| df_out, thresh, feat_spec = engineer.run_pipeline(df_raw) |
|
|
| print("\n--- Anomaly Thresholds (from Gemini) ---") |
| for col, t in thresh.items(): |
| print( |
| f" {col:35s} lower={t.get('lower_bound')} " |
| f"upper={t.get('upper_bound')} " |
| f"z={t.get('zscore_threshold')} " |
| f"IQR×{t.get('iqr_multiplier')}" |
| ) |
| print(f" → {t.get('rationale', '')}") |
|
|
| print("\n--- Stress Risk Score Spec (from Gemini) ---") |
| for k, v in feat_spec.items(): |
| print(f" {k}: {v}") |
|
|
| print("\n--- Engineered DataFrame Head ---") |
| eng_cols = ["time", "Air1_PAR_ref", "Air1_VPD_ref", |
| "hour_sin", "hour_cos", "doy_sin", "doy_cos", "stress_risk_score"] |
| show = [c for c in eng_cols if c in df_out.columns] |
| print(df_out[show].head(6).to_string(index=False)) |
|
|