api / src /chatbot /llm_data_engineer.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
LLMDataEngineer: Gemini-assisted sensor data cleaning and feature engineering
for the SolarWine agrivoltaic pipeline.
Phase 8B tasks:
- llm-data-cleaning : Gemini analyzes sensor stats, returns Z-score/IQR
filter thresholds for automated anomaly detection.
- llm-feature-eng : Gemini confirms feature formulae; module generates
cyclical time features and a Stress Risk Score.
"""
from __future__ import annotations
from typing import Optional
import hashlib
import numpy as np
import pandas as pd
from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key
from src.time_features import add_cyclical_time_features
# ---------------------------------------------------------------------------
# Domain knowledge injected into Gemini prompts
# ---------------------------------------------------------------------------
SENSOR_CONTEXT = {
"Air1_PAR_ref": {
"description": "Photosynthetically Active Radiation (PAR)",
"unit": "μmol photons m⁻² s⁻¹",
"physical_range": [0, 2500],
"notes": "Solar PAR at surface cannot exceed ~2200–2500 under any realistic sky. "
"Values above 3000 are sensor artefacts.",
},
"Air1_leafTemperature_ref": {
"description": "Leaf (canopy) temperature",
"unit": "°C",
"physical_range": [-5, 55],
"notes": "Grape leaf temperature in the Negev can reach ~45°C on extreme days, "
"but values above 55°C are physiologically impossible for a living leaf.",
},
"Air1_airTemperature_ref": {
"description": "Air temperature near canopy",
"unit": "°C",
"physical_range": [0, 50],
"notes": "Sde Boker record high is ~47°C. Values above 50°C or below 0°C "
"during the growing season (May–Sep) are sensor faults.",
},
"Air1_VPD_ref": {
"description": "Vapour Pressure Deficit",
"unit": "kPa",
"physical_range": [0, 7],
"notes": "Desert VPD rarely exceeds 6–7 kPa even in extreme heat. "
"Negative values and values above 8 kPa are sensor errors.",
},
"Air1_airHumidity_ref": {
"description": "Relative Humidity",
"unit": "%",
"physical_range": [0, 100],
"notes": "Must be in [0, 100]. Values outside this range are invalid.",
},
"Air1_CO2_ref": {
"description": "CO₂ concentration (raw sensor, corrected ×0.7 by SensorDataLoader)",
"unit": "ppm (raw)",
"physical_range": [400, 4000],
"notes": "Raw sensor reads ~30% too high (corrected ×0.7 in the data pipeline). "
"Raw values above 4000 ppm or below 400 ppm are sensor artefacts. "
"Post-correction (~280–2800 ppm) values above 2000 ppm indicate sensor drift.",
},
}
_SYSTEM_PROMPT_CLEANING = (
"You are a precision-agriculture sensor data quality engineer. "
"You are given descriptive statistics for sensor columns from a vineyard "
"in the Negev desert, Israel (Sde Boker region, Semillon grapevine, May–September). "
"Your task: for each column, propose anomaly filter thresholds to flag "
"or remove invalid readings. "
"Return ONLY a JSON object (no markdown, no explanation) with the following schema:\n"
"{\n"
' "<column_name>": {\n'
' "lower_bound": <float or null>,\n'
' "upper_bound": <float or null>,\n'
' "zscore_threshold": <float>,\n'
' "iqr_multiplier": <float>,\n'
' "rationale": "<one sentence>"\n'
" },\n"
" ...\n"
"}"
)
_SYSTEM_PROMPT_FEATURES = (
"You are a precision-agriculture feature engineering expert specialising in "
"grapevine physiology and agrivoltaic systems. "
"Given the available sensor columns, propose the exact mathematical formulae "
"for a Stress Risk Score that combines VPD and (optionally) CWSI. "
"Return ONLY a JSON object (no markdown, no explanation) with schema:\n"
"{\n"
' "stress_risk_score": {\n'
' "formula_description": "<one sentence>",\n'
' "vpd_weight": <float>,\n'
' "cwsi_weight": <float>,\n'
' "vpd_clip_max": <float>,\n'
' "cwsi_clip_max": <float>,\n'
' "rationale": "<one or two sentences on biological justification>"\n'
" }\n"
"}"
)
# ---------------------------------------------------------------------------
# Helper: robust JSON extraction from LLM response
# ---------------------------------------------------------------------------
def _extract_json(text: str) -> dict:
"""Thin wrapper around the shared genai_utils implementation."""
return extract_json_object(text)
# ---------------------------------------------------------------------------
# Main class
# ---------------------------------------------------------------------------
class LLMDataEngineer:
"""
Gemini-assisted sensor data cleaning and feature engineering.
Usage
-----
engineer = LLMDataEngineer()
df_clean, thresholds, features_meta = engineer.run_pipeline(df)
"""
def __init__(
self,
model_name: str = "gemini-2.5-flash",
api_key: Optional[str] = None,
verbose: bool = True,
):
self.model_name = model_name
self._api_key = api_key
self._client = None
self.verbose = verbose
# Caches keyed by content hash — avoids repeated Gemini calls
self._threshold_cache: dict[str, dict] = {}
self._feature_spec_cache: dict[str, dict] = {}
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@property
def api_key(self) -> str:
return get_google_api_key(self._api_key)
@property
def client(self):
if self._client is None:
self._client = get_genai_client(self._api_key)
return self._client
def _call_gemini(self, system_prompt: str, user_prompt: str) -> str:
"""Send a prompt to Gemini and return the raw text response."""
response = self.client.models.generate_content(
model=self.model_name,
contents=user_prompt,
config={"system_instruction": system_prompt},
)
return response.text
@staticmethod
def _hash_key(*parts: str) -> str:
"""Create a short hash from string parts for cache keying."""
return hashlib.md5("|".join(parts).encode()).hexdigest()[:12]
def _log(self, msg: str) -> None:
if self.verbose:
print(f"[LLMDataEngineer] {msg}")
# ------------------------------------------------------------------
# Step 1: Anomaly detection — ask Gemini for filter thresholds
# ------------------------------------------------------------------
def analyze_anomalies(
self,
df: pd.DataFrame,
columns: Optional[list[str]] = None,
) -> dict:
"""
Send descriptive statistics to Gemini and receive per-column
anomaly filter thresholds.
Parameters
----------
df : DataFrame with sensor measurements
columns : subset of columns to analyze; defaults to SENSOR_CONTEXT keys
Returns
-------
dict mapping column_name → {lower_bound, upper_bound,
zscore_threshold, iqr_multiplier, rationale}
"""
target_cols = [
c for c in (columns or list(SENSOR_CONTEXT.keys())) if c in df.columns
]
if not target_cols:
raise ValueError("No recognized sensor columns found in DataFrame.")
stats = df[target_cols].describe(percentiles=[0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99])
# Build prompt with stats + domain context
lines = [
"Analyze the following sensor columns from a vineyard dataset.",
"For each column, the physical context and expected range are provided.",
"",
]
for col in target_cols:
ctx = SENSOR_CONTEXT.get(col, {})
lines.append(f"Column: {col}")
if ctx:
lines.append(f" Description : {ctx['description']} ({ctx['unit']})")
lines.append(f" Expected range : {ctx['physical_range']}")
lines.append(f" Domain notes : {ctx['notes']}")
lines.append(" Observed statistics:")
for stat_name, val in stats[col].items():
lines.append(f" {stat_name:10s}: {val:.4f}")
lines.append("")
user_prompt = "\n".join(lines)
# Check cache (same stats → same thresholds)
cache_key = self._hash_key(user_prompt)
if cache_key in self._threshold_cache:
self._log("Using cached anomaly thresholds (same data fingerprint).")
return self._threshold_cache[cache_key]
self._log("Querying Gemini for anomaly thresholds …")
try:
raw = self._call_gemini(_SYSTEM_PROMPT_CLEANING, user_prompt)
thresholds = _extract_json(raw)
except Exception as exc:
self._log(f"Gemini API error: {exc}. Using statistical fallback.")
thresholds = self._fallback_thresholds(df, target_cols)
self._threshold_cache[cache_key] = thresholds
self._log(f"Received thresholds for {len(thresholds)} columns.")
return thresholds
@staticmethod
def _fallback_thresholds(df: pd.DataFrame, cols: list[str]) -> dict:
"""Conservative statistical fallback used when API is unavailable."""
result = {}
for col in cols:
ctx = SENSOR_CONTEXT.get(col, {})
phys = ctx.get("physical_range", [None, None])
result[col] = {
"lower_bound": phys[0],
"upper_bound": phys[1],
"zscore_threshold": 3.5,
"iqr_multiplier": 3.0,
"rationale": "Statistical fallback (Gemini unavailable).",
}
return result
# ------------------------------------------------------------------
# Step 2: Apply cleaning
# ------------------------------------------------------------------
def apply_cleaning(
self,
df: pd.DataFrame,
thresholds: dict,
strategy: str = "clip",
) -> pd.DataFrame:
"""
Apply Gemini-generated thresholds to clean the sensor DataFrame.
Parameters
----------
df : raw sensor DataFrame
thresholds : dict from analyze_anomalies()
strategy : 'clip' — clamp values to [lower_bound, upper_bound]
'drop' — drop rows where any column is out of bounds
'nan' — replace out-of-bounds values with NaN
Returns
-------
Cleaned DataFrame (copy).
"""
result = df.copy()
report_lines = ["Anomaly cleaning report:"]
for col, thresh in thresholds.items():
if col not in result.columns:
continue
series = result[col]
lower = thresh.get("lower_bound")
upper = thresh.get("upper_bound")
# Count violations before cleaning
mask_low = (series < lower) if lower is not None else pd.Series(False, index=series.index)
mask_high = (series > upper) if upper is not None else pd.Series(False, index=series.index)
# Z-score based detection (secondary flag)
z_thresh = thresh.get("zscore_threshold", 3.5)
z_scores = (series - series.mean()) / (series.std() + 1e-9)
mask_zscore = z_scores.abs() > z_thresh
# IQR-based detection (tertiary flag)
iqr_mult = thresh.get("iqr_multiplier", 3.0)
q1, q3 = series.quantile(0.25), series.quantile(0.75)
iqr = q3 - q1
mask_iqr = (series < q1 - iqr_mult * iqr) | (series > q3 + iqr_mult * iqr)
# Union of all anomaly flags
mask_anomaly = mask_low | mask_high | (mask_zscore & mask_iqr)
n_anomalies = int(mask_anomaly.sum())
if n_anomalies > 0:
report_lines.append(
f" {col}: {n_anomalies} anomalies ({n_anomalies / len(series) * 100:.2f}%)"
)
if strategy == "clip":
result[col] = series.clip(
lower=lower if lower is not None else -np.inf,
upper=upper if upper is not None else np.inf,
)
elif strategy == "nan":
result.loc[mask_anomaly, col] = np.nan
elif strategy == "drop":
result = result.loc[~mask_anomaly].copy()
else:
raise ValueError(f"Unknown strategy '{strategy}'. Use 'clip', 'nan', or 'drop'.")
self._log("\n".join(report_lines))
return result
# ------------------------------------------------------------------
# Step 3: Feature engineering
# ------------------------------------------------------------------
def get_feature_spec(
self,
available_cols: list[str],
) -> dict:
"""
Ask Gemini to confirm the Stress Risk Score formula given available columns.
Returns a feature spec dict with vpd_weight, cwsi_weight, etc.
Falls back to a biologically motivated default if API is unavailable.
"""
has_cwsi = any("cwsi" in c.lower() or "CWSI" in c for c in available_cols)
# Cache key: just depends on whether CWSI is available
cache_key = f"cwsi={has_cwsi}"
if cache_key in self._feature_spec_cache:
self._log("Using cached feature spec.")
return self._feature_spec_cache[cache_key]
user_prompt = (
f"Available sensor columns: {available_cols}.\n"
f"CWSI column available: {has_cwsi}.\n"
"Propose weights and clip bounds for a Stress Risk Score that linearly "
"combines normalised VPD and (if available) normalised CWSI. "
"The score should be in [0, 1] and reflect acute heat/drought stress "
"for Semillon grapevine in a desert agrivoltaic system."
)
self._log("Querying Gemini for Stress Risk Score formula …")
try:
raw = self._call_gemini(_SYSTEM_PROMPT_FEATURES, user_prompt)
spec = _extract_json(raw).get("stress_risk_score", {})
except Exception as exc:
self._log(f"Gemini API error: {exc}. Using default feature spec.")
spec = {}
# Merge with defaults so the dict is always complete
defaults = {
"formula_description": "Normalised weighted sum of VPD and CWSI stress signals",
"vpd_weight": 0.6,
"cwsi_weight": 0.4,
"vpd_clip_max": 6.0,
"cwsi_clip_max": 1.0,
"rationale": (
"VPD dominates stomatal response (weight 0.6); "
"CWSI captures cumulative water status (weight 0.4)."
),
}
for k, v in defaults.items():
spec.setdefault(k, v)
self._feature_spec_cache[cache_key] = spec
return spec
def engineer_features(
self,
df: pd.DataFrame,
timestamp_col: str = "time",
cwsi_col: Optional[str] = None,
vpd_col: str = "Air1_VPD_ref",
feature_spec: Optional[dict] = None,
) -> pd.DataFrame:
"""
Add engineered features to the sensor DataFrame.
New columns added
-----------------
hour_sin, hour_cos – cyclical encoding of hour-of-day
doy_sin, doy_cos – cyclical encoding of day-of-year
stress_risk_score – weighted VPD (+ CWSI) stress index in [0, 1]
Parameters
----------
df : sensor DataFrame (original unmodified)
timestamp_col : name of the datetime column (or index if not a column)
cwsi_col : optional CWSI column name; if None, stress score uses VPD only
vpd_col : VPD column name
feature_spec : pre-fetched spec from get_feature_spec(); fetched if None
Returns
-------
DataFrame copy with additional feature columns.
"""
result = df.copy()
# --- Cyclical time features (via shared utility) ---
ts_col = timestamp_col if timestamp_col in result.columns else None
use_index = ts_col is None and isinstance(result.index, pd.DatetimeIndex)
if ts_col is not None or use_index:
result = add_cyclical_time_features(
result,
timestamp_col=ts_col,
index_is_timestamp=use_index,
)
self._log("Added cyclical time features: hour_sin, hour_cos, doy_sin, doy_cos")
else:
self._log("Warning: no timestamp found; skipping cyclical features.")
# --- Stress Risk Score ---
if vpd_col in result.columns:
if feature_spec is None:
feature_spec = self.get_feature_spec(list(result.columns))
vpd_w = float(feature_spec.get("vpd_weight", 0.6))
cwsi_w = float(feature_spec.get("cwsi_weight", 0.4))
vpd_max = float(feature_spec.get("vpd_clip_max", 6.0))
cwsi_max = float(feature_spec.get("cwsi_clip_max", 1.0))
vpd_norm = (result[vpd_col].clip(0, vpd_max) / vpd_max).fillna(0.0)
if cwsi_col and cwsi_col in result.columns:
cwsi_norm = (result[cwsi_col].clip(0, cwsi_max) / cwsi_max).fillna(0.0)
effective_cwsi_w = cwsi_w
effective_vpd_w = vpd_w
else:
# No CWSI — redistribute weight entirely to VPD
cwsi_norm = pd.Series(0.0, index=result.index)
effective_cwsi_w = 0.0
effective_vpd_w = 1.0
score = (effective_vpd_w * vpd_norm + effective_cwsi_w * cwsi_norm).clip(0, 1)
result["stress_risk_score"] = score.round(4)
self._log(
f"Added stress_risk_score (vpd_weight={effective_vpd_w:.2f}, "
f"cwsi_weight={effective_cwsi_w:.2f})"
)
else:
self._log(f"Warning: VPD column '{vpd_col}' not found; skipping stress_risk_score.")
return result
# ------------------------------------------------------------------
# Full pipeline
# ------------------------------------------------------------------
def run_pipeline(
self,
df: pd.DataFrame,
cleaning_strategy: str = "clip",
timestamp_col: str = "time",
cwsi_col: Optional[str] = None,
vpd_col: str = "Air1_VPD_ref",
) -> tuple[pd.DataFrame, dict, dict]:
"""
Execute the full LLM data engineering pipeline.
Steps
-----
1. Gemini analyzes column stats → anomaly thresholds
2. Apply cleaning (clip / nan / drop)
3. Gemini confirms feature spec → engineer features
Returns
-------
(df_engineered, thresholds, feature_spec)
"""
self._log("=== LLM Data Engineering Pipeline ===")
# Step 1: anomaly thresholds
thresholds = self.analyze_anomalies(df)
# Step 2: clean
df_clean = self.apply_cleaning(df, thresholds, strategy=cleaning_strategy)
# Step 3: feature spec + engineering
feature_spec = self.get_feature_spec(list(df_clean.columns))
df_engineered = self.engineer_features(
df_clean,
timestamp_col=timestamp_col,
cwsi_col=cwsi_col,
vpd_col=vpd_col,
feature_spec=feature_spec,
)
new_cols = [c for c in df_engineered.columns if c not in df.columns]
self._log(f"Pipeline complete. New columns: {new_cols}")
return df_engineered, thresholds, feature_spec
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
from pathlib import Path
DATA_DIR = Path(__file__).resolve().parent.parent / "Data"
sample_path = DATA_DIR / "Seymour" / "sensors_wide_sample.csv"
sensors_path = DATA_DIR / "Seymour" / "sensors_wide.csv"
csv_path = sample_path if sample_path.exists() else sensors_path
print(f"Loading sensor data from: {csv_path.name}")
df_raw = pd.read_csv(csv_path)
print(f"Shape: {df_raw.shape} | Columns: {list(df_raw.columns)}\n")
engineer = LLMDataEngineer(verbose=True)
df_out, thresh, feat_spec = engineer.run_pipeline(df_raw)
print("\n--- Anomaly Thresholds (from Gemini) ---")
for col, t in thresh.items():
print(
f" {col:35s} lower={t.get('lower_bound')} "
f"upper={t.get('upper_bound')} "
f"z={t.get('zscore_threshold')} "
f"IQR×{t.get('iqr_multiplier')}"
)
print(f" → {t.get('rationale', '')}")
print("\n--- Stress Risk Score Spec (from Gemini) ---")
for k, v in feat_spec.items():
print(f" {k}: {v}")
print("\n--- Engineered DataFrame Head ---")
eng_cols = ["time", "Air1_PAR_ref", "Air1_VPD_ref",
"hour_sin", "hour_cos", "doy_sin", "doy_cos", "stress_risk_score"]
show = [c for c in eng_cols if c in df_out.columns]
print(df_out[show].head(6).to_string(index=False))