File size: 5,065 Bytes
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
Preprocessor: merge IMS with Stage 1 labels, time features, temporal split,
and scaling (fitted on train only). No sensor data in features.
"""

from typing import Optional

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

from src.time_features import add_cyclical_time_features


class Preprocessor:
    """
    Merge IMS weather with labels (A), create time features, temporal train/test
    split, and scale features using training set only.
    """

    def __init__(self, train_ratio: Optional[float] = None):
        from config import settings
        self.train_ratio = settings.TRAIN_RATIO if train_ratio is None else train_ratio
        self.scaler = StandardScaler()
        self._fitted = False
        self._feature_cols: Optional[list[str]] = None

    def merge_ims_with_labels(
        self,
        ims_df: pd.DataFrame,
        labels: pd.Series,
        timestamp_col_ims: str = "timestamp_utc",
        timestamp_index_labels: bool = True,
    ) -> pd.DataFrame:
        """
        Inner join IMS and labels on timestamp. labels can be Series with
        datetime index or a column; if timestamp_index_labels, labels.index
        is used as timestamp.
        """
        if ims_df.empty or labels.empty:
            return pd.DataFrame()
        if timestamp_col_ims not in ims_df.columns:
            return pd.DataFrame()
        if timestamp_index_labels:
            lab = labels.copy()
            lab.name = "A"
            lab = lab.reset_index()
            ts_lab = lab.columns[0]
            merged = ims_df.merge(lab, left_on=timestamp_col_ims, right_on=ts_lab, how="inner")
            if ts_lab != timestamp_col_ims and ts_lab in merged.columns:
                merged = merged.drop(columns=[ts_lab])
            if "timestamp_utc" not in merged.columns and ts_lab == timestamp_col_ims and ts_lab in merged.columns:
                merged = merged.rename(columns={ts_lab: timestamp_col_ims})
        else:
            raise ValueError(
                "merge_ims_with_labels: labels must have a datetime index "
                "(use timestamp_index_labels=True). Position-based alignment "
                "is not supported because it silently produces incorrect joins "
                "when IMS and label row counts differ."
            )
        return merged

    def create_time_features(
        self,
        df: pd.DataFrame,
        timestamp_col: str = "timestamp_utc",
    ) -> pd.DataFrame:
        """Add cyclical + raw time features for ML models."""
        if timestamp_col not in df.columns:
            return df
        out = add_cyclical_time_features(df, timestamp_col=timestamp_col)
        ts = pd.to_datetime(out[timestamp_col], utc=True)
        # Raw integers (good for tree-based models that can split on thresholds)
        out["month"] = ts.dt.month
        out["day_of_year"] = ts.dt.dayofyear
        return out

    def temporal_split(
        self,
        df: pd.DataFrame,
        target_col: str = "A",
        feature_cols: Optional[list[str]] = None,
    ) -> tuple[pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
        """
        Split by time: first train_ratio for train, rest for test.
        Returns (X_train, y_train, X_test, y_test). If feature_cols is None,
        use IMS numeric columns + time features (exclude timestamp and target).
        """
        if df.empty or target_col not in df.columns:
            return (
                pd.DataFrame(), pd.Series(dtype=float),
                pd.DataFrame(), pd.Series(dtype=float),
            )
        exclude = {target_col, "timestamp_utc", "time", "source"}
        if feature_cols is None:
            feature_cols = [
                c for c in df.select_dtypes(include=[np.number]).columns
                if c not in exclude
            ]
        self._feature_cols = feature_cols
        # Drop rows with NaN in features or target
        subset = feature_cols + [target_col]
        df = df.dropna(subset=subset).reset_index(drop=True)
        X = df[feature_cols].copy()
        y = df[target_col]
        n = int(len(df) * self.train_ratio)
        if n <= 0 or n >= len(df):
            return X, y, pd.DataFrame(), pd.Series(dtype=float)
        X_train, X_test = X.iloc[:n], X.iloc[n:]
        y_train, y_test = y.iloc[:n], y.iloc[n:]
        return X_train, y_train, X_test, y_test

    def fit_transform_train(self, X_train: pd.DataFrame) -> pd.DataFrame:
        """Fit scaler on X_train and return scaled X_train."""
        self.scaler.fit(X_train)
        self._fitted = True
        return pd.DataFrame(
            self.scaler.transform(X_train),
            index=X_train.index,
            columns=X_train.columns,
        )

    def transform_test(self, X_test: pd.DataFrame) -> pd.DataFrame:
        """Transform X_test with fitted scaler."""
        if not self._fitted:
            return X_test
        return pd.DataFrame(
            self.scaler.transform(X_test),
            index=X_test.index,
            columns=X_test.columns,
        )