| """ |
| Preprocessor: merge IMS with Stage 1 labels, time features, temporal split, |
| and scaling (fitted on train only). No sensor data in features. |
| """ |
|
|
| from typing import Optional |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.preprocessing import StandardScaler |
|
|
| from src.time_features import add_cyclical_time_features |
|
|
|
|
| class Preprocessor: |
| """ |
| Merge IMS weather with labels (A), create time features, temporal train/test |
| split, and scale features using training set only. |
| """ |
|
|
| def __init__(self, train_ratio: Optional[float] = None): |
| from config import settings |
| self.train_ratio = settings.TRAIN_RATIO if train_ratio is None else train_ratio |
| self.scaler = StandardScaler() |
| self._fitted = False |
| self._feature_cols: Optional[list[str]] = None |
|
|
| def merge_ims_with_labels( |
| self, |
| ims_df: pd.DataFrame, |
| labels: pd.Series, |
| timestamp_col_ims: str = "timestamp_utc", |
| timestamp_index_labels: bool = True, |
| ) -> pd.DataFrame: |
| """ |
| Inner join IMS and labels on timestamp. labels can be Series with |
| datetime index or a column; if timestamp_index_labels, labels.index |
| is used as timestamp. |
| """ |
| if ims_df.empty or labels.empty: |
| return pd.DataFrame() |
| if timestamp_col_ims not in ims_df.columns: |
| return pd.DataFrame() |
| if timestamp_index_labels: |
| lab = labels.copy() |
| lab.name = "A" |
| lab = lab.reset_index() |
| ts_lab = lab.columns[0] |
| merged = ims_df.merge(lab, left_on=timestamp_col_ims, right_on=ts_lab, how="inner") |
| if ts_lab != timestamp_col_ims and ts_lab in merged.columns: |
| merged = merged.drop(columns=[ts_lab]) |
| if "timestamp_utc" not in merged.columns and ts_lab == timestamp_col_ims and ts_lab in merged.columns: |
| merged = merged.rename(columns={ts_lab: timestamp_col_ims}) |
| else: |
| raise ValueError( |
| "merge_ims_with_labels: labels must have a datetime index " |
| "(use timestamp_index_labels=True). Position-based alignment " |
| "is not supported because it silently produces incorrect joins " |
| "when IMS and label row counts differ." |
| ) |
| return merged |
|
|
| def create_time_features( |
| self, |
| df: pd.DataFrame, |
| timestamp_col: str = "timestamp_utc", |
| ) -> pd.DataFrame: |
| """Add cyclical + raw time features for ML models.""" |
| if timestamp_col not in df.columns: |
| return df |
| out = add_cyclical_time_features(df, timestamp_col=timestamp_col) |
| ts = pd.to_datetime(out[timestamp_col], utc=True) |
| |
| out["month"] = ts.dt.month |
| out["day_of_year"] = ts.dt.dayofyear |
| return out |
|
|
| def temporal_split( |
| self, |
| df: pd.DataFrame, |
| target_col: str = "A", |
| feature_cols: Optional[list[str]] = None, |
| ) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]: |
| """ |
| Split by time: first train_ratio for train, rest for test. |
| Returns (X_train, y_train, X_test, y_test). If feature_cols is None, |
| use IMS numeric columns + time features (exclude timestamp and target). |
| """ |
| if df.empty or target_col not in df.columns: |
| return ( |
| pd.DataFrame(), pd.Series(dtype=float), |
| pd.DataFrame(), pd.Series(dtype=float), |
| ) |
| exclude = {target_col, "timestamp_utc", "time", "source"} |
| if feature_cols is None: |
| feature_cols = [ |
| c for c in df.select_dtypes(include=[np.number]).columns |
| if c not in exclude |
| ] |
| self._feature_cols = feature_cols |
| |
| subset = feature_cols + [target_col] |
| df = df.dropna(subset=subset).reset_index(drop=True) |
| X = df[feature_cols].copy() |
| y = df[target_col] |
| n = int(len(df) * self.train_ratio) |
| if n <= 0 or n >= len(df): |
| return X, y, pd.DataFrame(), pd.Series(dtype=float) |
| X_train, X_test = X.iloc[:n], X.iloc[n:] |
| y_train, y_test = y.iloc[:n], y.iloc[n:] |
| return X_train, y_train, X_test, y_test |
|
|
| def fit_transform_train(self, X_train: pd.DataFrame) -> pd.DataFrame: |
| """Fit scaler on X_train and return scaled X_train.""" |
| self.scaler.fit(X_train) |
| self._fitted = True |
| return pd.DataFrame( |
| self.scaler.transform(X_train), |
| index=X_train.index, |
| columns=X_train.columns, |
| ) |
|
|
| def transform_test(self, X_test: pd.DataFrame) -> pd.DataFrame: |
| """Transform X_test with fitted scaler.""" |
| if not self._fitted: |
| return X_test |
| return pd.DataFrame( |
| self.scaler.transform(X_test), |
| index=X_test.index, |
| columns=X_test.columns, |
| ) |
|
|