""" LLMDataEngineer: Gemini-assisted sensor data cleaning and feature engineering for the SolarWine agrivoltaic pipeline. Phase 8B tasks: - llm-data-cleaning : Gemini analyzes sensor stats, returns Z-score/IQR filter thresholds for automated anomaly detection. - llm-feature-eng : Gemini confirms feature formulae; module generates cyclical time features and a Stress Risk Score. """ from __future__ import annotations from typing import Optional import hashlib import numpy as np import pandas as pd from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key from src.time_features import add_cyclical_time_features # --------------------------------------------------------------------------- # Domain knowledge injected into Gemini prompts # --------------------------------------------------------------------------- SENSOR_CONTEXT = { "Air1_PAR_ref": { "description": "Photosynthetically Active Radiation (PAR)", "unit": "μmol photons m⁻² s⁻¹", "physical_range": [0, 2500], "notes": "Solar PAR at surface cannot exceed ~2200–2500 under any realistic sky. " "Values above 3000 are sensor artefacts.", }, "Air1_leafTemperature_ref": { "description": "Leaf (canopy) temperature", "unit": "°C", "physical_range": [-5, 55], "notes": "Grape leaf temperature in the Negev can reach ~45°C on extreme days, " "but values above 55°C are physiologically impossible for a living leaf.", }, "Air1_airTemperature_ref": { "description": "Air temperature near canopy", "unit": "°C", "physical_range": [0, 50], "notes": "Sde Boker record high is ~47°C. Values above 50°C or below 0°C " "during the growing season (May–Sep) are sensor faults.", }, "Air1_VPD_ref": { "description": "Vapour Pressure Deficit", "unit": "kPa", "physical_range": [0, 7], "notes": "Desert VPD rarely exceeds 6–7 kPa even in extreme heat. " "Negative values and values above 8 kPa are sensor errors.", }, "Air1_airHumidity_ref": { "description": "Relative Humidity", "unit": "%", "physical_range": [0, 100], "notes": "Must be in [0, 100]. Values outside this range are invalid.", }, "Air1_CO2_ref": { "description": "CO₂ concentration (raw sensor, corrected ×0.7 by SensorDataLoader)", "unit": "ppm (raw)", "physical_range": [400, 4000], "notes": "Raw sensor reads ~30% too high (corrected ×0.7 in the data pipeline). " "Raw values above 4000 ppm or below 400 ppm are sensor artefacts. " "Post-correction (~280–2800 ppm) values above 2000 ppm indicate sensor drift.", }, } _SYSTEM_PROMPT_CLEANING = ( "You are a precision-agriculture sensor data quality engineer. " "You are given descriptive statistics for sensor columns from a vineyard " "in the Negev desert, Israel (Sde Boker region, Semillon grapevine, May–September). " "Your task: for each column, propose anomaly filter thresholds to flag " "or remove invalid readings. " "Return ONLY a JSON object (no markdown, no explanation) with the following schema:\n" "{\n" ' "": {\n' ' "lower_bound": ,\n' ' "upper_bound": ,\n' ' "zscore_threshold": ,\n' ' "iqr_multiplier": ,\n' ' "rationale": ""\n' " },\n" " ...\n" "}" ) _SYSTEM_PROMPT_FEATURES = ( "You are a precision-agriculture feature engineering expert specialising in " "grapevine physiology and agrivoltaic systems. " "Given the available sensor columns, propose the exact mathematical formulae " "for a Stress Risk Score that combines VPD and (optionally) CWSI. " "Return ONLY a JSON object (no markdown, no explanation) with schema:\n" "{\n" ' "stress_risk_score": {\n' ' "formula_description": "",\n' ' "vpd_weight": ,\n' ' "cwsi_weight": ,\n' ' "vpd_clip_max": ,\n' ' "cwsi_clip_max": ,\n' ' "rationale": ""\n' " }\n" "}" ) # --------------------------------------------------------------------------- # Helper: robust JSON extraction from LLM response # --------------------------------------------------------------------------- def _extract_json(text: str) -> dict: """Thin wrapper around the shared genai_utils implementation.""" return extract_json_object(text) # --------------------------------------------------------------------------- # Main class # --------------------------------------------------------------------------- class LLMDataEngineer: """ Gemini-assisted sensor data cleaning and feature engineering. Usage ----- engineer = LLMDataEngineer() df_clean, thresholds, features_meta = engineer.run_pipeline(df) """ def __init__( self, model_name: str = "gemini-2.5-flash", api_key: Optional[str] = None, verbose: bool = True, ): self.model_name = model_name self._api_key = api_key self._client = None self.verbose = verbose # Caches keyed by content hash — avoids repeated Gemini calls self._threshold_cache: dict[str, dict] = {} self._feature_spec_cache: dict[str, dict] = {} # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @property def api_key(self) -> str: return get_google_api_key(self._api_key) @property def client(self): if self._client is None: self._client = get_genai_client(self._api_key) return self._client def _call_gemini(self, system_prompt: str, user_prompt: str) -> str: """Send a prompt to Gemini and return the raw text response.""" response = self.client.models.generate_content( model=self.model_name, contents=user_prompt, config={"system_instruction": system_prompt}, ) return response.text @staticmethod def _hash_key(*parts: str) -> str: """Create a short hash from string parts for cache keying.""" return hashlib.md5("|".join(parts).encode()).hexdigest()[:12] def _log(self, msg: str) -> None: if self.verbose: print(f"[LLMDataEngineer] {msg}") # ------------------------------------------------------------------ # Step 1: Anomaly detection — ask Gemini for filter thresholds # ------------------------------------------------------------------ def analyze_anomalies( self, df: pd.DataFrame, columns: Optional[list[str]] = None, ) -> dict: """ Send descriptive statistics to Gemini and receive per-column anomaly filter thresholds. Parameters ---------- df : DataFrame with sensor measurements columns : subset of columns to analyze; defaults to SENSOR_CONTEXT keys Returns ------- dict mapping column_name → {lower_bound, upper_bound, zscore_threshold, iqr_multiplier, rationale} """ target_cols = [ c for c in (columns or list(SENSOR_CONTEXT.keys())) if c in df.columns ] if not target_cols: raise ValueError("No recognized sensor columns found in DataFrame.") stats = df[target_cols].describe(percentiles=[0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99]) # Build prompt with stats + domain context lines = [ "Analyze the following sensor columns from a vineyard dataset.", "For each column, the physical context and expected range are provided.", "", ] for col in target_cols: ctx = SENSOR_CONTEXT.get(col, {}) lines.append(f"Column: {col}") if ctx: lines.append(f" Description : {ctx['description']} ({ctx['unit']})") lines.append(f" Expected range : {ctx['physical_range']}") lines.append(f" Domain notes : {ctx['notes']}") lines.append(" Observed statistics:") for stat_name, val in stats[col].items(): lines.append(f" {stat_name:10s}: {val:.4f}") lines.append("") user_prompt = "\n".join(lines) # Check cache (same stats → same thresholds) cache_key = self._hash_key(user_prompt) if cache_key in self._threshold_cache: self._log("Using cached anomaly thresholds (same data fingerprint).") return self._threshold_cache[cache_key] self._log("Querying Gemini for anomaly thresholds …") try: raw = self._call_gemini(_SYSTEM_PROMPT_CLEANING, user_prompt) thresholds = _extract_json(raw) except Exception as exc: self._log(f"Gemini API error: {exc}. Using statistical fallback.") thresholds = self._fallback_thresholds(df, target_cols) self._threshold_cache[cache_key] = thresholds self._log(f"Received thresholds for {len(thresholds)} columns.") return thresholds @staticmethod def _fallback_thresholds(df: pd.DataFrame, cols: list[str]) -> dict: """Conservative statistical fallback used when API is unavailable.""" result = {} for col in cols: ctx = SENSOR_CONTEXT.get(col, {}) phys = ctx.get("physical_range", [None, None]) result[col] = { "lower_bound": phys[0], "upper_bound": phys[1], "zscore_threshold": 3.5, "iqr_multiplier": 3.0, "rationale": "Statistical fallback (Gemini unavailable).", } return result # ------------------------------------------------------------------ # Step 2: Apply cleaning # ------------------------------------------------------------------ def apply_cleaning( self, df: pd.DataFrame, thresholds: dict, strategy: str = "clip", ) -> pd.DataFrame: """ Apply Gemini-generated thresholds to clean the sensor DataFrame. Parameters ---------- df : raw sensor DataFrame thresholds : dict from analyze_anomalies() strategy : 'clip' — clamp values to [lower_bound, upper_bound] 'drop' — drop rows where any column is out of bounds 'nan' — replace out-of-bounds values with NaN Returns ------- Cleaned DataFrame (copy). """ result = df.copy() report_lines = ["Anomaly cleaning report:"] for col, thresh in thresholds.items(): if col not in result.columns: continue series = result[col] lower = thresh.get("lower_bound") upper = thresh.get("upper_bound") # Count violations before cleaning mask_low = (series < lower) if lower is not None else pd.Series(False, index=series.index) mask_high = (series > upper) if upper is not None else pd.Series(False, index=series.index) # Z-score based detection (secondary flag) z_thresh = thresh.get("zscore_threshold", 3.5) z_scores = (series - series.mean()) / (series.std() + 1e-9) mask_zscore = z_scores.abs() > z_thresh # IQR-based detection (tertiary flag) iqr_mult = thresh.get("iqr_multiplier", 3.0) q1, q3 = series.quantile(0.25), series.quantile(0.75) iqr = q3 - q1 mask_iqr = (series < q1 - iqr_mult * iqr) | (series > q3 + iqr_mult * iqr) # Union of all anomaly flags mask_anomaly = mask_low | mask_high | (mask_zscore & mask_iqr) n_anomalies = int(mask_anomaly.sum()) if n_anomalies > 0: report_lines.append( f" {col}: {n_anomalies} anomalies ({n_anomalies / len(series) * 100:.2f}%)" ) if strategy == "clip": result[col] = series.clip( lower=lower if lower is not None else -np.inf, upper=upper if upper is not None else np.inf, ) elif strategy == "nan": result.loc[mask_anomaly, col] = np.nan elif strategy == "drop": result = result.loc[~mask_anomaly].copy() else: raise ValueError(f"Unknown strategy '{strategy}'. Use 'clip', 'nan', or 'drop'.") self._log("\n".join(report_lines)) return result # ------------------------------------------------------------------ # Step 3: Feature engineering # ------------------------------------------------------------------ def get_feature_spec( self, available_cols: list[str], ) -> dict: """ Ask Gemini to confirm the Stress Risk Score formula given available columns. Returns a feature spec dict with vpd_weight, cwsi_weight, etc. Falls back to a biologically motivated default if API is unavailable. """ has_cwsi = any("cwsi" in c.lower() or "CWSI" in c for c in available_cols) # Cache key: just depends on whether CWSI is available cache_key = f"cwsi={has_cwsi}" if cache_key in self._feature_spec_cache: self._log("Using cached feature spec.") return self._feature_spec_cache[cache_key] user_prompt = ( f"Available sensor columns: {available_cols}.\n" f"CWSI column available: {has_cwsi}.\n" "Propose weights and clip bounds for a Stress Risk Score that linearly " "combines normalised VPD and (if available) normalised CWSI. " "The score should be in [0, 1] and reflect acute heat/drought stress " "for Semillon grapevine in a desert agrivoltaic system." ) self._log("Querying Gemini for Stress Risk Score formula …") try: raw = self._call_gemini(_SYSTEM_PROMPT_FEATURES, user_prompt) spec = _extract_json(raw).get("stress_risk_score", {}) except Exception as exc: self._log(f"Gemini API error: {exc}. Using default feature spec.") spec = {} # Merge with defaults so the dict is always complete defaults = { "formula_description": "Normalised weighted sum of VPD and CWSI stress signals", "vpd_weight": 0.6, "cwsi_weight": 0.4, "vpd_clip_max": 6.0, "cwsi_clip_max": 1.0, "rationale": ( "VPD dominates stomatal response (weight 0.6); " "CWSI captures cumulative water status (weight 0.4)." ), } for k, v in defaults.items(): spec.setdefault(k, v) self._feature_spec_cache[cache_key] = spec return spec def engineer_features( self, df: pd.DataFrame, timestamp_col: str = "time", cwsi_col: Optional[str] = None, vpd_col: str = "Air1_VPD_ref", feature_spec: Optional[dict] = None, ) -> pd.DataFrame: """ Add engineered features to the sensor DataFrame. New columns added ----------------- hour_sin, hour_cos – cyclical encoding of hour-of-day doy_sin, doy_cos – cyclical encoding of day-of-year stress_risk_score – weighted VPD (+ CWSI) stress index in [0, 1] Parameters ---------- df : sensor DataFrame (original unmodified) timestamp_col : name of the datetime column (or index if not a column) cwsi_col : optional CWSI column name; if None, stress score uses VPD only vpd_col : VPD column name feature_spec : pre-fetched spec from get_feature_spec(); fetched if None Returns ------- DataFrame copy with additional feature columns. """ result = df.copy() # --- Cyclical time features (via shared utility) --- ts_col = timestamp_col if timestamp_col in result.columns else None use_index = ts_col is None and isinstance(result.index, pd.DatetimeIndex) if ts_col is not None or use_index: result = add_cyclical_time_features( result, timestamp_col=ts_col, index_is_timestamp=use_index, ) self._log("Added cyclical time features: hour_sin, hour_cos, doy_sin, doy_cos") else: self._log("Warning: no timestamp found; skipping cyclical features.") # --- Stress Risk Score --- if vpd_col in result.columns: if feature_spec is None: feature_spec = self.get_feature_spec(list(result.columns)) vpd_w = float(feature_spec.get("vpd_weight", 0.6)) cwsi_w = float(feature_spec.get("cwsi_weight", 0.4)) vpd_max = float(feature_spec.get("vpd_clip_max", 6.0)) cwsi_max = float(feature_spec.get("cwsi_clip_max", 1.0)) vpd_norm = (result[vpd_col].clip(0, vpd_max) / vpd_max).fillna(0.0) if cwsi_col and cwsi_col in result.columns: cwsi_norm = (result[cwsi_col].clip(0, cwsi_max) / cwsi_max).fillna(0.0) effective_cwsi_w = cwsi_w effective_vpd_w = vpd_w else: # No CWSI — redistribute weight entirely to VPD cwsi_norm = pd.Series(0.0, index=result.index) effective_cwsi_w = 0.0 effective_vpd_w = 1.0 score = (effective_vpd_w * vpd_norm + effective_cwsi_w * cwsi_norm).clip(0, 1) result["stress_risk_score"] = score.round(4) self._log( f"Added stress_risk_score (vpd_weight={effective_vpd_w:.2f}, " f"cwsi_weight={effective_cwsi_w:.2f})" ) else: self._log(f"Warning: VPD column '{vpd_col}' not found; skipping stress_risk_score.") return result # ------------------------------------------------------------------ # Full pipeline # ------------------------------------------------------------------ def run_pipeline( self, df: pd.DataFrame, cleaning_strategy: str = "clip", timestamp_col: str = "time", cwsi_col: Optional[str] = None, vpd_col: str = "Air1_VPD_ref", ) -> tuple[pd.DataFrame, dict, dict]: """ Execute the full LLM data engineering pipeline. Steps ----- 1. Gemini analyzes column stats → anomaly thresholds 2. Apply cleaning (clip / nan / drop) 3. Gemini confirms feature spec → engineer features Returns ------- (df_engineered, thresholds, feature_spec) """ self._log("=== LLM Data Engineering Pipeline ===") # Step 1: anomaly thresholds thresholds = self.analyze_anomalies(df) # Step 2: clean df_clean = self.apply_cleaning(df, thresholds, strategy=cleaning_strategy) # Step 3: feature spec + engineering feature_spec = self.get_feature_spec(list(df_clean.columns)) df_engineered = self.engineer_features( df_clean, timestamp_col=timestamp_col, cwsi_col=cwsi_col, vpd_col=vpd_col, feature_spec=feature_spec, ) new_cols = [c for c in df_engineered.columns if c not in df.columns] self._log(f"Pipeline complete. New columns: {new_cols}") return df_engineered, thresholds, feature_spec # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- if __name__ == "__main__": from pathlib import Path DATA_DIR = Path(__file__).resolve().parent.parent / "Data" sample_path = DATA_DIR / "Seymour" / "sensors_wide_sample.csv" sensors_path = DATA_DIR / "Seymour" / "sensors_wide.csv" csv_path = sample_path if sample_path.exists() else sensors_path print(f"Loading sensor data from: {csv_path.name}") df_raw = pd.read_csv(csv_path) print(f"Shape: {df_raw.shape} | Columns: {list(df_raw.columns)}\n") engineer = LLMDataEngineer(verbose=True) df_out, thresh, feat_spec = engineer.run_pipeline(df_raw) print("\n--- Anomaly Thresholds (from Gemini) ---") for col, t in thresh.items(): print( f" {col:35s} lower={t.get('lower_bound')} " f"upper={t.get('upper_bound')} " f"z={t.get('zscore_threshold')} " f"IQR×{t.get('iqr_multiplier')}" ) print(f" → {t.get('rationale', '')}") print("\n--- Stress Risk Score Spec (from Gemini) ---") for k, v in feat_spec.items(): print(f" {k}: {v}") print("\n--- Engineered DataFrame Head ---") eng_cols = ["time", "Air1_PAR_ref", "Air1_VPD_ref", "hour_sin", "hour_cos", "doy_sin", "doy_cos", "stress_risk_score"] show = [c for c in eng_cols if c in df_out.columns] print(df_out[show].head(6).to_string(index=False))