""" Preprocessor: merge IMS with Stage 1 labels, time features, temporal split, and scaling (fitted on train only). No sensor data in features. """ from typing import Optional import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler from src.time_features import add_cyclical_time_features class Preprocessor: """ Merge IMS weather with labels (A), create time features, temporal train/test split, and scale features using training set only. """ def __init__(self, train_ratio: Optional[float] = None): from config import settings self.train_ratio = settings.TRAIN_RATIO if train_ratio is None else train_ratio self.scaler = StandardScaler() self._fitted = False self._feature_cols: Optional[list[str]] = None def merge_ims_with_labels( self, ims_df: pd.DataFrame, labels: pd.Series, timestamp_col_ims: str = "timestamp_utc", timestamp_index_labels: bool = True, ) -> pd.DataFrame: """ Inner join IMS and labels on timestamp. labels can be Series with datetime index or a column; if timestamp_index_labels, labels.index is used as timestamp. """ if ims_df.empty or labels.empty: return pd.DataFrame() if timestamp_col_ims not in ims_df.columns: return pd.DataFrame() if timestamp_index_labels: lab = labels.copy() lab.name = "A" lab = lab.reset_index() ts_lab = lab.columns[0] merged = ims_df.merge(lab, left_on=timestamp_col_ims, right_on=ts_lab, how="inner") if ts_lab != timestamp_col_ims and ts_lab in merged.columns: merged = merged.drop(columns=[ts_lab]) if "timestamp_utc" not in merged.columns and ts_lab == timestamp_col_ims and ts_lab in merged.columns: merged = merged.rename(columns={ts_lab: timestamp_col_ims}) else: raise ValueError( "merge_ims_with_labels: labels must have a datetime index " "(use timestamp_index_labels=True). Position-based alignment " "is not supported because it silently produces incorrect joins " "when IMS and label row counts differ." ) return merged def create_time_features( self, df: pd.DataFrame, timestamp_col: str = "timestamp_utc", ) -> pd.DataFrame: """Add cyclical + raw time features for ML models.""" if timestamp_col not in df.columns: return df out = add_cyclical_time_features(df, timestamp_col=timestamp_col) ts = pd.to_datetime(out[timestamp_col], utc=True) # Raw integers (good for tree-based models that can split on thresholds) out["month"] = ts.dt.month out["day_of_year"] = ts.dt.dayofyear return out def temporal_split( self, df: pd.DataFrame, target_col: str = "A", feature_cols: Optional[list[str]] = None, ) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]: """ Split by time: first train_ratio for train, rest for test. Returns (X_train, y_train, X_test, y_test). If feature_cols is None, use IMS numeric columns + time features (exclude timestamp and target). """ if df.empty or target_col not in df.columns: return ( pd.DataFrame(), pd.Series(dtype=float), pd.DataFrame(), pd.Series(dtype=float), ) exclude = {target_col, "timestamp_utc", "time", "source"} if feature_cols is None: feature_cols = [ c for c in df.select_dtypes(include=[np.number]).columns if c not in exclude ] self._feature_cols = feature_cols # Drop rows with NaN in features or target subset = feature_cols + [target_col] df = df.dropna(subset=subset).reset_index(drop=True) X = df[feature_cols].copy() y = df[target_col] n = int(len(df) * self.train_ratio) if n <= 0 or n >= len(df): return X, y, pd.DataFrame(), pd.Series(dtype=float) X_train, X_test = X.iloc[:n], X.iloc[n:] y_train, y_test = y.iloc[:n], y.iloc[n:] return X_train, y_train, X_test, y_test def fit_transform_train(self, X_train: pd.DataFrame) -> pd.DataFrame: """Fit scaler on X_train and return scaled X_train.""" self.scaler.fit(X_train) self._fitted = True return pd.DataFrame( self.scaler.transform(X_train), index=X_train.index, columns=X_train.columns, ) def transform_test(self, X_test: pd.DataFrame) -> pd.DataFrame: """Transform X_test with fitted scaler.""" if not self._fitted: return X_test return pd.DataFrame( self.scaler.transform(X_test), index=X_test.index, columns=X_test.columns, )