| | import torch |
| | from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score |
| | from sklearn.model_selection import KFold |
| | from sklearn.preprocessing import StandardScaler |
| | from tqdm import tqdm |
| | import numpy as np |
| | import os |
| | from datetime import datetime |
| | from pathlib import Path |
| |
|
| | |
| | from torch.amp.autocast_mode import autocast |
| |
|
| | ROOT = Path(__file__).parent.parent.resolve().__str__() |
| | LOG_ROOT = Path(ROOT + "/" + "logs_hyperparameter") |
| | if not os.path.exists(LOG_ROOT): |
| | os.makedirs(LOG_ROOT, exist_ok=False) |
| |
|
| |
|
| | def setup_log_file(model_name, rep_name, dataset_name): |
| | from pathlib import Path |
| | from datetime import datetime |
| |
|
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | fname = f"{model_name}_{rep_name}_{dataset_name}_{timestamp}.txt" |
| | parent = Path(__file__).parent.parent.resolve().__str__() |
| | log_dir = Path(parent + "/" + "logs_hyperparameter") |
| | if not os.path.exists(log_dir): |
| | os.makedirs(LOG_ROOT, exist_ok=False) |
| |
|
| | log_path = log_dir / fname |
| | print(f"[Logging] Writing to: {log_path}") |
| | return open(log_path, "w") |
| |
|
| |
|
| | def write_log(log_file, text): |
| | print(text) |
| | log_file.write(text + "\n") |
| | log_file.flush() |
| |
|
| |
|
| | def train_gnn_model(model, loader, optimizer, log_file, loss_fn=torch.nn.MSELoss()): |
| | total_loss = 0 |
| | for _ in range(20): |
| | model.train() |
| | total_loss = 0 |
| | for batch in loader: |
| | batch = batch.to(next(model.parameters()).device) |
| | optimizer.zero_grad() |
| | out = model(batch).squeeze() |
| | loss = loss_fn(out, batch.y) |
| | loss.backward() |
| | optimizer.step() |
| | total_loss += loss.item() |
| | avg_loss = total_loss / len(loader) |
| | write_log(log_file, f"GNN Train Loss: {avg_loss:.4f}") |
| | return avg_loss |
| |
|
| |
|
| | def eval_gnn_model(model, loader, log_file, scaler, return_preds=False): |
| | model.eval() |
| | y_true, y_pred = [], [] |
| | with torch.no_grad(): |
| | for batch in tqdm(loader, desc="Evaluating GNN"): |
| | batch = batch.to(next(model.parameters()).device) |
| | out = model(batch).view(-1) |
| | y_true.append(batch.y.cpu()) |
| | y_pred.append(out.cpu()) |
| | y_true = scaler.inverse_transform(torch.cat(y_true).numpy().reshape(-1, 1)).ravel() |
| | y_pred = scaler.inverse_transform(torch.cat(y_pred).numpy().reshape(-1, 1)).ravel() |
| | metrics = report_metrics(y_true, y_pred, log_file) |
| | if return_preds: |
| | return metrics, y_true, y_pred |
| | return metrics |
| |
|
| |
|
| | def train_gp_model(gp_model, X_train, y_train, log_file): |
| | write_log(log_file, "Training GP model...") |
| | gp_model.fit(X_train, y_train) |
| | return gp_model |
| |
|
| |
|
| | def eval_gp_model(gp_model, X_test, y_test, log_file, scaler, return_preds=False): |
| | write_log(log_file, "Evaluating GP model...") |
| | y_pred, _ = gp_model.predict(X_test) |
| | y_test = scaler.inverse_transform(y_test.reshape(-1, 1)).ravel() |
| | y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1)).ravel() |
| | metrics = report_metrics(y_test, y_pred, log_file) |
| | if return_preds: |
| | return metrics, y_test, y_pred |
| | return metrics |
| |
|
| |
|
| | def report_metrics(y_true, y_pred, log_file): |
| | rmse = np.sqrt(mean_squared_error(y_true, y_pred)) |
| | mae = mean_absolute_error(y_true, y_pred) |
| | r2 = r2_score(y_true, y_pred) |
| | write_log(log_file, f"RMSE: {rmse:.4f}, MAE: {mae:.4f}, R2: {r2:.4f}") |
| | return {"rmse": rmse, "mae": mae, "r2": r2} |
| |
|
| |
|
| | def bootstrap_ci(arr, n_boot=1000, ci=95): |
| | boot_means = [ |
| | np.mean(np.random.choice(arr, size=len(arr), replace=True)) |
| | for _ in range(n_boot) |
| | ] |
| | lower = np.percentile(boot_means, (100 - ci) / 2) |
| | upper = np.percentile(boot_means, 100 - (100 - ci) / 2) |
| | return np.mean(boot_means), (lower, upper) |
| |
|
| |
|
| | def bootstrap_metric_ci(metric_fn, y_true, y_pred, n_boot=1000, ci=95, rng=None): |
| | rng = np.random.default_rng(rng) |
| | y_true = np.asarray(y_true) |
| | y_pred = np.asarray(y_pred) |
| | n = len(y_true) |
| | boot_vals = [] |
| | for _ in range(n_boot): |
| | idx = rng.choice(n, n, replace=True) |
| | boot_vals.append(metric_fn(y_true[idx], y_pred[idx])) |
| | mean_val = np.mean(boot_vals) |
| | lo, hi = np.percentile(boot_vals, [(100 - ci) / 2, 100 - (100 - ci) / 2]) |
| | return mean_val, (lo, hi) |
| |
|
| |
|
| | def train_polyatomic( |
| | model, loader, optimizer, loss_fn, scaler_grad, device, scheduler, accum_steps=1 |
| | ): |
| | """ |
| | custom training loop for polyatomic GNNs |
| | uses mixed precision training with autocast |
| | accum_steps allows gradient accumulation for larger effective batch size |
| | this was designed for GPU training, but here is in CPU mode |
| | """ |
| | use_amp = torch.cuda.is_available() |
| | for _ in range(20): |
| | model.train() |
| | total_loss = 0.0 |
| | optimizer.zero_grad() |
| |
|
| | for i, batch in enumerate(loader): |
| | batch = batch.to(device) |
| | batch.x = batch.x.float() |
| | batch.edge_attr = batch.edge_attr.float() |
| | batch.graph_feats = batch.graph_feats.float() |
| | batch.y = batch.y.float() |
| |
|
| | if use_amp: |
| | with autocast(device_type="cuda", dtype=torch.float16): |
| | output = model(batch) |
| | loss = loss_fn(output, batch.y.view(-1)) / accum_steps |
| | else: |
| | output = model(batch) |
| | loss = loss_fn(output, batch.y.view(-1)) / accum_steps |
| |
|
| | if use_amp: |
| | scaler_grad.scale(loss).backward() |
| | else: |
| | loss.backward() |
| |
|
| | torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) |
| |
|
| | if (i + 1) % accum_steps == 0 or (i + 1 == len(loader)): |
| | if use_amp: |
| | scaler_grad.step(optimizer) |
| | scaler_grad.update() |
| | else: |
| | optimizer.step() |
| |
|
| | optimizer.zero_grad() |
| |
|
| | total_loss += loss.item() * batch.num_graphs * accum_steps |
| |
|
| | avg_loss = total_loss / len(loader.dataset) |
| | if scheduler is not None: |
| | scheduler.step(avg_loss) |
| |
|
| | return model |
| |
|
| |
|
| | def evaluate_polyatomic(model, loader, device, log_file, scaler, return_preds=False): |
| | """ |
| | uses autocast for mixed precision evaluation |
| | this was designed for GPU training, but here is in CPU mode |
| | """ |
| | model.eval() |
| | preds, trues = [], [] |
| | with torch.no_grad(), autocast( |
| | device_type="cpu", dtype=torch.float16 |
| | ): |
| | for batch in loader: |
| | batch = batch.to(device) |
| | out = model(batch) |
| | preds.append(out.view(-1)) |
| | trues.append(batch.y.view(-1)) |
| | y_pred = torch.cat(preds) |
| | y_test = torch.cat(trues) |
| | y_pred = scaler.inverse_transform(y_pred.numpy().reshape(-1, 1)).ravel() |
| | y_test = scaler.inverse_transform(y_test.numpy().reshape(-1, 1)).ravel() |
| | metrics = report_metrics(y_test, y_pred, log_file) |
| | if return_preds: |
| | return metrics, y_test, y_pred |
| | return metrics |
| |
|
| |
|
| | def k_fold_eval( |
| | train_fn, |
| | eval_fn, |
| | X_train, |
| | y_train, |
| | model_name, |
| | rep_name, |
| | dataset_name, |
| | X_test, |
| | y_test, |
| | k=5, |
| | seed=42, |
| | log_file=None, |
| | ): |
| | log_file = setup_log_file(model_name, rep_name, dataset_name) |
| | write_log(log_file, f"Experiment: {model_name}+{rep_name} on {dataset_name}") |
| |
|
| | kf = KFold(n_splits=k, shuffle=True, random_state=seed) |
| | fold_metrics, fold_models = [], [] |
| |
|
| | for fold, (tr_idx, val_idx) in enumerate(kf.split(X_train)): |
| | write_log(log_file, f"\nFOLD {fold+1}/{k}") |
| | X_tr, X_val = X_train[tr_idx], X_train[val_idx] |
| | y_tr, y_val = y_train[tr_idx], y_train[val_idx] |
| |
|
| | scaler = StandardScaler() |
| | y_tr_s = scaler.fit_transform(y_tr.reshape(-1, 1)).ravel() |
| | y_val_s = scaler.transform(y_val.reshape(-1, 1)).ravel() |
| |
|
| | model = train_fn(X_tr, y_tr_s, log_file) |
| | m = eval_fn(model, X_val, y_val_s, log_file, scaler) |
| | fold_metrics.append(m) |
| | fold_models.append((model, m["rmse"])) |
| |
|
| | write_log(log_file, "\n====== K-FOLD SUMMARY ======") |
| | for key in fold_metrics[0]: |
| | vals = [m[key] for m in fold_metrics] |
| | mean, (lo, hi) = bootstrap_ci(vals) |
| | write_log(log_file, f"{key.upper()}: {mean:.4f} [{lo:.4f}, {hi:.4f}]") |
| |
|
| | best_idx = np.argmin([rm for (_, rm) in fold_models]) |
| | best_model = fold_models[best_idx][0] |
| | write_log(log_file, f"\n★ Using fold {best_idx+1} model for test inference") |
| |
|
| | test_scaler = StandardScaler().fit(y_train.reshape(-1, 1)) |
| | y_test_s = test_scaler.transform(y_test.reshape(-1, 1)).ravel() |
| | test_metrics, y_true_test, y_pred_test = eval_fn( |
| | best_model, X_test, y_test_s, log_file, test_scaler, return_preds=True |
| | ) |
| |
|
| | write_log(log_file, f"\n====== HELD-OUT TEST METRICS ======\n{test_metrics}") |
| | rmse_mean, (rmse_lo, rmse_hi) = bootstrap_metric_ci( |
| | lambda a, b: np.sqrt(mean_squared_error(a, b)), y_true_test, y_pred_test |
| | ) |
| | mae_mean, (mae_lo, mae_hi) = bootstrap_metric_ci( |
| | mean_absolute_error, y_true_test, y_pred_test |
| | ) |
| | r2_mean, (r2_lo, r2_hi) = bootstrap_metric_ci(r2_score, y_true_test, y_pred_test) |
| | write_log( |
| | log_file, |
| | f"Test RMSE: {rmse_mean:.4f} (95 % CI: {rmse_lo:.4f}–{rmse_hi:.4f})\n", |
| | ) |
| | write_log( |
| | log_file, |
| | f"Test MAE : {mae_mean :.4f} (95 % CI: {mae_lo :.4f}–{mae_hi :.4f})\n", |
| | ) |
| | write_log( |
| | log_file, |
| | f"Test R² : {r2_mean :.4f} (95 % CI: {r2_lo :.4f}–{r2_hi :.4f})\n", |
| | ) |
| | log_file.close() |
| | return fold_metrics, test_metrics |
| |
|