api / src /forecasting /preprocessor.py
Eli Safra
Deploy SolarWine API (FastAPI + Docker, port 7860)
938949f
"""
Preprocessor: merge IMS with Stage 1 labels, time features, temporal split,
and scaling (fitted on train only). No sensor data in features.
"""
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from src.time_features import add_cyclical_time_features
class Preprocessor:
"""
Merge IMS weather with labels (A), create time features, temporal train/test
split, and scale features using training set only.
"""
def __init__(self, train_ratio: Optional[float] = None):
from config import settings
self.train_ratio = settings.TRAIN_RATIO if train_ratio is None else train_ratio
self.scaler = StandardScaler()
self._fitted = False
self._feature_cols: Optional[list[str]] = None
def merge_ims_with_labels(
self,
ims_df: pd.DataFrame,
labels: pd.Series,
timestamp_col_ims: str = "timestamp_utc",
timestamp_index_labels: bool = True,
) -> pd.DataFrame:
"""
Inner join IMS and labels on timestamp. labels can be Series with
datetime index or a column; if timestamp_index_labels, labels.index
is used as timestamp.
"""
if ims_df.empty or labels.empty:
return pd.DataFrame()
if timestamp_col_ims not in ims_df.columns:
return pd.DataFrame()
if timestamp_index_labels:
lab = labels.copy()
lab.name = "A"
lab = lab.reset_index()
ts_lab = lab.columns[0]
merged = ims_df.merge(lab, left_on=timestamp_col_ims, right_on=ts_lab, how="inner")
if ts_lab != timestamp_col_ims and ts_lab in merged.columns:
merged = merged.drop(columns=[ts_lab])
if "timestamp_utc" not in merged.columns and ts_lab == timestamp_col_ims and ts_lab in merged.columns:
merged = merged.rename(columns={ts_lab: timestamp_col_ims})
else:
raise ValueError(
"merge_ims_with_labels: labels must have a datetime index "
"(use timestamp_index_labels=True). Position-based alignment "
"is not supported because it silently produces incorrect joins "
"when IMS and label row counts differ."
)
return merged
def create_time_features(
self,
df: pd.DataFrame,
timestamp_col: str = "timestamp_utc",
) -> pd.DataFrame:
"""Add cyclical + raw time features for ML models."""
if timestamp_col not in df.columns:
return df
out = add_cyclical_time_features(df, timestamp_col=timestamp_col)
ts = pd.to_datetime(out[timestamp_col], utc=True)
# Raw integers (good for tree-based models that can split on thresholds)
out["month"] = ts.dt.month
out["day_of_year"] = ts.dt.dayofyear
return out
def temporal_split(
self,
df: pd.DataFrame,
target_col: str = "A",
feature_cols: Optional[list[str]] = None,
) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
"""
Split by time: first train_ratio for train, rest for test.
Returns (X_train, y_train, X_test, y_test). If feature_cols is None,
use IMS numeric columns + time features (exclude timestamp and target).
"""
if df.empty or target_col not in df.columns:
return (
pd.DataFrame(), pd.Series(dtype=float),
pd.DataFrame(), pd.Series(dtype=float),
)
exclude = {target_col, "timestamp_utc", "time", "source"}
if feature_cols is None:
feature_cols = [
c for c in df.select_dtypes(include=[np.number]).columns
if c not in exclude
]
self._feature_cols = feature_cols
# Drop rows with NaN in features or target
subset = feature_cols + [target_col]
df = df.dropna(subset=subset).reset_index(drop=True)
X = df[feature_cols].copy()
y = df[target_col]
n = int(len(df) * self.train_ratio)
if n <= 0 or n >= len(df):
return X, y, pd.DataFrame(), pd.Series(dtype=float)
X_train, X_test = X.iloc[:n], X.iloc[n:]
y_train, y_test = y.iloc[:n], y.iloc[n:]
return X_train, y_train, X_test, y_test
def fit_transform_train(self, X_train: pd.DataFrame) -> pd.DataFrame:
"""Fit scaler on X_train and return scaled X_train."""
self.scaler.fit(X_train)
self._fitted = True
return pd.DataFrame(
self.scaler.transform(X_train),
index=X_train.index,
columns=X_train.columns,
)
def transform_test(self, X_test: pd.DataFrame) -> pd.DataFrame:
"""Transform X_test with fitted scaler."""
if not self._fitted:
return X_test
return pd.DataFrame(
self.scaler.transform(X_test),
index=X_test.index,
columns=X_test.columns,
)