| import argparse |
| import json |
| import re |
| import os |
| import unicodedata |
| from typing import Tuple, List |
| from multiprocessing import Pool |
|
|
| import fasttext |
| import pandas as pd |
| from tqdm import tqdm |
| from transformers import LlamaTokenizerFast |
|
|
|
|
| language_model_map = { |
| "en": "classifiers/ultra_fineweb_en.bin", |
| "zh": "classifiers/ultra_fineweb_zh.bin" |
| } |
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--language", type=str, required=True, help="Inference language, support: en, zh.") |
| parser.add_argument("--data-path", type=str, required=True, help="Data path.") |
| parser.add_argument("--save-path", type=str, required=True, help="Save path root.") |
| parser.add_argument("--content-key", type=str, required=True, help="Content key for inference.") |
| parser.add_argument("--tokenizer-path", type=str, default="local_tokenizer", help="Tokenizer path.") |
| parser.add_argument("--processes-num", type=int, default=64, help="Number of processes.") |
| parser.add_argument("--write-batch-size", type=int, default=100, help="Write batch size.") |
| parser.add_argument("--inplace", action="store_true", help="Inplace already processed data.") |
| return parser.parse_args() |
|
|
|
|
| def fasttext_preprocess_func(content: str, tokenizer: LlamaTokenizerFast) -> str: |
| """Fasttext preprocess function. |
| |
| Args: |
| content (str): Content to process. |
| |
| Returns: |
| str: Processed normalized content. |
| """ |
|
|
| |
| content = re.sub(r'\n{3,}', '\n\n', content) |
|
|
| |
| content = content.lower() |
|
|
| |
| content = ''.join( |
| c for c in unicodedata.normalize('NFKD', content) |
| if unicodedata.category(c) != 'Mn') |
|
|
| |
| token_ids = tokenizer.encode(content, add_special_tokens=False) |
| single_text_list = [] |
| for token_id in token_ids: |
| curr_text = tokenizer.decode([token_id]) |
| single_text_list.append(curr_text) |
|
|
| content = ' '.join(single_text_list) |
|
|
| |
| |
| content = re.sub(r'\n', '\\\\n', content) |
| content = re.sub(r'\r', '\\\\r', content) |
| content = re.sub(r'\t', '\\\\t', content) |
| content = re.sub(r' +', ' ', content) |
| content = content.strip() |
|
|
| return content |
|
|
|
|
| def fasttext_infer(norm_content: str, fasttext_model: fasttext.FastText) -> Tuple[str, float]: |
| """Fasttext inference function |
| |
| Args: |
| content (str): input text |
| |
| Returns: |
| str: json string with pred_label and pred_score |
| """ |
|
|
| pred_label, pred_prob = fasttext_model.predict(norm_content) |
| pred_label = pred_label[0] |
| _score = min(pred_prob.tolist()[0], 1) |
| if pred_label == "__label__neg": |
| _score = 1 - _score |
|
|
| return pred_label, _score |
|
|
|
|
| def load_data(file_path: str, content_key: str) -> List[str]: |
| """Load data from file path. |
| |
| Args: |
| file_path (str): File path. |
| content_key (str): Content key. |
| |
| Returns: |
| List[str]: List of content. |
| """ |
| samples = [] |
| if file_path.endswith(".jsonl") or file_path.endswith(".json"): |
| with open(file_path, "r", encoding="utf-8") as f: |
| for line in f: |
| data = json.loads(line.strip()) |
| if content_key in data: |
| if data[content_key] == "": |
| print("Empty text, continue") |
| continue |
| if data[content_key] is None: |
| print("None text, continue") |
| continue |
| samples.append(data[content_key]) |
| elif file_path.endswith(".parquet"): |
| df = pd.read_parquet(file_path) |
| for _, row in df.iterrows(): |
| if content_key in row: |
| if row[content_key] == "": |
| print("Empty text, continue") |
| continue |
| if row[content_key] is None: |
| print("None text, continue") |
| continue |
| samples.append(row[content_key]) |
| else: |
| raise ValueError(f"Unsupported file type: {file_path}") |
| return samples |
|
|
|
|
| def process_file( |
| file_path: str, |
| tokenizer_path: str, |
| fasttext_model_path: str, |
| save_path: str, |
| item: int, |
| content_key: str, |
| inplace: bool, |
| write_batch_size: int) -> None: |
| """Process a single file. |
| |
| Args: |
| file_path (str): File path to process. |
| tokenizer_path (str): Tokenizer path. |
| fasttext_model_path (str): Fasttext model path. |
| save_path (str): Save path. |
| item (int): Current process item index. |
| content_key (str): Content key. |
| write_batch_size (int): Write batch size. |
| """ |
|
|
| |
| tokenizer = LlamaTokenizerFast.from_pretrained(tokenizer_path) |
| fasttext_model = fasttext.load_model(fasttext_model_path) |
|
|
| |
| all_texts = load_data(file_path, content_key) |
|
|
| |
| file_name = os.path.basename(file_path) |
| curr_file_name = ".".join(file_name.split(".")[:-1]) |
|
|
| output_file = f"{curr_file_name}_fasttext_pos.jsonl" |
| output_file = os.path.join(save_path, output_file) |
|
|
| if inplace and os.path.exists(output_file): |
| print(f"File {output_file} already exists, skip") |
| return |
| |
| if os.path.exists(output_file): |
| |
| print(f"File {output_file} already exists, remove it") |
| os.remove(output_file) |
|
|
| results = [] |
| print(f"ID: {item}, Begin to process {file_path}, total {len(all_texts)} samples, results will be saved in {output_file}") |
| for text in tqdm(all_texts): |
| norm_content = fasttext_preprocess_func(text, tokenizer) |
| label, score = fasttext_infer(norm_content, fasttext_model) |
|
|
| |
| if label == "__label__pos": |
| curr_result = {"content": text, "pred_label": label, "pred_score": score} |
| results.append(curr_result) |
|
|
| if len(results) >= write_batch_size: |
| with open(output_file, "a", encoding="utf-8") as f: |
| f.write("\n".join(json.dumps(r, ensure_ascii=False) for r in results) + "\n") |
| results.clear() |
|
|
| |
| if results: |
| with open(output_file, "a", encoding="utf-8") as f: |
| f.write("\n".join(json.dumps(r, ensure_ascii=False) for r in results) + "\n") |
|
|
|
|
| def main(): |
| args = parse_args() |
| language = args.language |
| data_path = args.data_path |
| save_path = args.save_path |
| content_key = args.content_key |
| tokenizer_path = args.tokenizer_path |
| processes_num = args.processes_num |
| write_batch_size = args.write_batch_size |
| inplace = args.inplace |
|
|
| assert os.path.exists(data_path), f"Data path {data_path} not exists" |
| assert os.path.exists(tokenizer_path), f"Tokenizer path {tokenizer_path} not exists" |
|
|
| assert language in language_model_map, f"Language {language} not supported" |
| fasttext_model_path = language_model_map[language] |
|
|
| if not os.path.exists(save_path): |
| os.makedirs(save_path, exist_ok=True) |
| |
| data_path_list = os.listdir(data_path) |
| data_path_list = [os.path.join(data_path, file_name) for file_name in data_path_list] |
|
|
| print("=" * 100) |
| print(f"Begin processing\n" |
| f"- data path: {data_path}\n" |
| f"- save path: {save_path}\n" |
| f"- content key: {content_key}\n" |
| f"- tokenizer path: {tokenizer_path}\n" |
| f"- processes num: {processes_num}\n" |
| f"- write batch size: {write_batch_size}\n" |
| f"- inplace: {inplace}") |
| print("=" * 100) |
|
|
| print(f"Total {len(data_path_list)} files to process") |
|
|
| |
| with Pool(processes=processes_num) as pool: |
| pool.starmap(process_file, [( |
| file_path, tokenizer_path, fasttext_model_path, save_path, item, content_key, inplace, write_batch_size) |
| for item, file_path in enumerate(data_path_list)]) |
|
|
| print("Finished processing all files") |
|
|
| if __name__ == "__main__": |
| main() |