|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import json |
|
|
import random |
|
|
import tarfile |
|
|
import requests |
|
|
import datetime |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from ..utils import make_parent_folder |
|
|
from ...core.logging import logger |
|
|
from ...core.module_utils import load_json, save_json |
|
|
|
|
|
|
|
|
AFLOW_DATASET_FILES_MAP = { |
|
|
"hotpotqa": {"train": None, "dev": "hotpotqa_validate.jsonl", "test": "hotpotqa_test.jsonl"}, |
|
|
"humaneval": {"train": None, "dev": "humaneval_validate.jsonl", "test": "humaneval_test.jsonl", "test_cases": "humaneval_public_test.jsonl"}, |
|
|
"mbpp": {"train": None, "dev": "mbpp_validate.jsonl", "test": "mbpp_test.jsonl", "test_cases": "mbpp_public_test.jsonl"}, |
|
|
"gsm8k": {"train": None, "dev": "gsm8k_validate.jsonl", "test": "gsm8k_test.jsonl"}, |
|
|
"math": {"train": None, "dev": "math_validate.jsonl", "test": "math_test.jsonl"}, |
|
|
} |
|
|
|
|
|
def extract_tar_gz(filename: str, extract_path: str) -> None: |
|
|
"""Extract a tar.gz file to the specified path.""" |
|
|
with tarfile.open(filename, "r:gz") as tar: |
|
|
tar.extractall(path=extract_path) |
|
|
|
|
|
|
|
|
def download_aflow_benchmark_data(dataset: str, save_folder: str): |
|
|
|
|
|
candidate_datasets = list(AFLOW_DATASET_FILES_MAP.keys()) + ["all"] |
|
|
lower_candidate_datasets = [dataset.lower() for dataset in candidate_datasets] |
|
|
if dataset.lower() not in lower_candidate_datasets: |
|
|
raise ValueError(f"Invalid value for dataset: {dataset}. Available choices: {candidate_datasets}") |
|
|
|
|
|
url = "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e" |
|
|
logger.info(f"Downloading AFlow benchmark data from {url} ...") |
|
|
aflow_data_save_file = os.path.join(save_folder, "aflow_data.tar.gz") |
|
|
|
|
|
make_parent_folder(aflow_data_save_file) |
|
|
response = requests.get(url, stream=True) |
|
|
response.raise_for_status() |
|
|
with open(aflow_data_save_file, "wb") as file: |
|
|
for chunk in response.iter_content(chunk_size=1024): |
|
|
if chunk: |
|
|
file.write(chunk) |
|
|
|
|
|
logger.info(f"Extracting data for {dataset} dataset(s) from {aflow_data_save_file} ...") |
|
|
extract_tar_gz(aflow_data_save_file, extract_path=save_folder) |
|
|
|
|
|
if dataset != "all": |
|
|
dataset_files = [file for file in list(AFLOW_DATASET_FILES_MAP[dataset].values()) if file is not None] |
|
|
for file in os.listdir(save_folder): |
|
|
if file not in dataset_files: |
|
|
os.remove(os.path.join(save_folder, file)) |
|
|
|
|
|
if os.path.exists(aflow_data_save_file): |
|
|
logger.info(f"Remove {aflow_data_save_file}") |
|
|
os.remove(aflow_data_save_file) |
|
|
|
|
|
|
|
|
class DataUtils: |
|
|
|
|
|
def __init__(self, root_path: str): |
|
|
self.root_path = root_path |
|
|
self.top_scores = [] |
|
|
|
|
|
def load_results(self, path: str) -> list: |
|
|
result_path = os.path.join(path, "results.json") |
|
|
if os.path.exists(result_path): |
|
|
with open(result_path, "r") as json_file: |
|
|
try: |
|
|
return json.load(json_file) |
|
|
except json.JSONDecodeError: |
|
|
return [] |
|
|
return [] |
|
|
|
|
|
def get_top_rounds(self, sample: int, path=None, mode="Graph"): |
|
|
|
|
|
self._load_scores(path, mode) |
|
|
unique_rounds = set() |
|
|
unique_top_scores = [] |
|
|
|
|
|
first_round = next((item for item in self.top_scores if item["round"] == 0), None) |
|
|
if first_round: |
|
|
unique_top_scores.append(first_round) |
|
|
unique_rounds.add(0) |
|
|
|
|
|
for item in self.top_scores: |
|
|
if item["round"] not in unique_rounds: |
|
|
unique_top_scores.append(item) |
|
|
unique_rounds.add(item["round"]) |
|
|
|
|
|
if len(unique_top_scores) >= sample: |
|
|
break |
|
|
|
|
|
return unique_top_scores |
|
|
|
|
|
def select_round(self, items): |
|
|
|
|
|
if not items: |
|
|
raise ValueError("Item list is empty.") |
|
|
|
|
|
sorted_items = sorted(items, key=lambda x: x["score"], reverse=True) |
|
|
scores = [item["score"] * 100 for item in sorted_items] |
|
|
|
|
|
probabilities = self._compute_probabilities(scores) |
|
|
logger.info(f"\nMixed probability distribution: {probabilities}") |
|
|
logger.info(f"\nSorted rounds: {sorted_items}") |
|
|
|
|
|
selected_index = np.random.choice(len(sorted_items), p=probabilities) |
|
|
logger.info(f"\nSelected index: {selected_index}, Selected item: {sorted_items[selected_index]}") |
|
|
|
|
|
return sorted_items[selected_index] |
|
|
|
|
|
def _compute_probabilities(self, scores, alpha=0.2, lambda_=0.3): |
|
|
|
|
|
scores = np.array(scores, dtype=np.float64) |
|
|
n = len(scores) |
|
|
|
|
|
if n == 0: |
|
|
raise ValueError("Score list is empty.") |
|
|
|
|
|
uniform_prob = np.full(n, 1.0 / n, dtype=np.float64) |
|
|
|
|
|
max_score = np.max(scores) |
|
|
shifted_scores = scores - max_score |
|
|
exp_weights = np.exp(alpha * shifted_scores) |
|
|
|
|
|
sum_exp_weights = np.sum(exp_weights) |
|
|
if sum_exp_weights == 0: |
|
|
raise ValueError("Sum of exponential weights is 0, cannot normalize.") |
|
|
|
|
|
score_prob = exp_weights / sum_exp_weights |
|
|
|
|
|
mixed_prob = lambda_ * uniform_prob + (1 - lambda_) * score_prob |
|
|
|
|
|
total_prob = np.sum(mixed_prob) |
|
|
if not np.isclose(total_prob, 1.0): |
|
|
mixed_prob = mixed_prob / total_prob |
|
|
|
|
|
return mixed_prob |
|
|
|
|
|
def load_log(self, cur_round, path=None, mode: str = "Graph"): |
|
|
if mode == "Graph": |
|
|
log_dir = os.path.join(self.root_path, f"round_{cur_round}", "log.json") |
|
|
else: |
|
|
log_dir = path |
|
|
|
|
|
if not os.path.exists(log_dir): |
|
|
return "" |
|
|
logger.info(log_dir) |
|
|
|
|
|
data = load_json(log_dir, type="json") |
|
|
|
|
|
if isinstance(data, dict): |
|
|
data = [data] |
|
|
elif not isinstance(data, list): |
|
|
data = list(data) |
|
|
|
|
|
if not data: |
|
|
return "" |
|
|
|
|
|
sample_size = min(3, len(data)) |
|
|
random_samples = random.sample(data, sample_size) |
|
|
|
|
|
log = "" |
|
|
for sample in random_samples: |
|
|
log += json.dumps(sample, indent=4, ensure_ascii=False) + "\n\n" |
|
|
|
|
|
return log |
|
|
|
|
|
def get_results_file_path(self, graph_path: str) -> str: |
|
|
return os.path.join(graph_path, "results.json") |
|
|
|
|
|
def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict: |
|
|
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now} |
|
|
|
|
|
def save_results(self, json_file_path: str, data: list): |
|
|
save_json(data, json_file_path, type="json", use_indent=True) |
|
|
|
|
|
def _load_scores(self, path=None, mode="Graph"): |
|
|
if mode == "Graph": |
|
|
rounds_dir = self.root_path |
|
|
else: |
|
|
rounds_dir = path |
|
|
|
|
|
result_file = os.path.join(rounds_dir, "results.json") |
|
|
self.top_scores = [] |
|
|
|
|
|
data = load_json(result_file, type="json") |
|
|
df = pd.DataFrame(data) |
|
|
|
|
|
scores_per_round = df.groupby("round")["score"].mean().to_dict() |
|
|
|
|
|
for round_number, average_score in scores_per_round.items(): |
|
|
self.top_scores.append({"round": round_number, "score": average_score}) |
|
|
|
|
|
self.top_scores.sort(key=lambda x: x["score"], reverse=True) |
|
|
|
|
|
return self.top_scores |
|
|
|
|
|
|
|
|
def test_case_2_test_function(solution: str, test_case: str, entry_point: str): |
|
|
tester_function = f""" |
|
|
{solution} |
|
|
|
|
|
|
|
|
def check(candidate): |
|
|
{test_case} |
|
|
|
|
|
def test_check(): |
|
|
check({entry_point}) |
|
|
|
|
|
test_check() |
|
|
""" |
|
|
return tester_function |
|
|
|