from tokenizers import Tokenizer import torch import numpy as np import time import os from datetime import datetime def process_string_into_pairs(input_str: str) -> list[str]: result = [] i = 0 n = len(input_str) while i < n: char = input_str[i] # 检查当前字符是否为小写字母 if "a" <= char <= "z": # 检查是否有下一个字符,并且下一个字符也是小写字母(配对情况) if i + 1 < n and "a" <= input_str[i + 1] <= "z": result.append(char + input_str[i + 1]) i += 2 # 跳过两个字符 # 检查是否有下一个字符,并且下一个字符是空格(落单小写字母+空格 的特殊情况) elif i + 1 < n and input_str[i + 1] == " ": result.append(char) i += 2 # 跳过当前字母和后面的空格 # 其他情况(落单小写字母,后面是其他字符或已到末尾) else: result.append(char) i += 1 # 只跳过当前一个字符 # 如果当前字符不是小写字母 else: result.append(char) i += 1 # 只跳过当前一个字符 return result def get_mask_from_string(input_str: str, tokenizer) -> torch.Tensor: pairs = process_string_into_pairs(input_str) masks = [ f"<|mask_{pair}|>" if all(ord(i) < 128 for i in pair) else pair for pair in pairs ] mask_tensor = torch.tensor( [tokenizer.token_to_id(mask) for mask in masks], dtype=torch.long ) return mask_tensor def inference(model, input_str: str, tokenizer, device, threshold=0.9): model.eval() # Initialize NgramHashMapping engram_cfg = model.config.engram_config hash_mapping = None if engram_cfg is not None: from modeling_llada_engram import ModelConfig, EngramConfig, NgramHashMapping from dataclasses import fields # Prepare ModelConfig for NgramHashMapping backbone_config_dict = model.config.to_dict() # Filter out keys not in ModelConfig if necessary, but ModelConfig usually matches LLaDAConfig fields backbone_config = ModelConfig(**{k: v for k, v in backbone_config_dict.items() if k in [f.name for f in fields(ModelConfig)]}) hash_mapping = NgramHashMapping( engram_vocab_size = engram_cfg.get('engram_vocab_size', [129280*5, 129280*5]), max_ngram_size = engram_cfg.get('max_ngram_size', 3), n_embed_per_ngram = engram_cfg.get('n_embed_per_ngram', 512), n_head_per_ngram = engram_cfg.get('n_head_per_ngram', 8), layer_ids = engram_cfg.get('layer_ids', [1, 15]), pad_id = engram_cfg.get('pad_id', 2), seed = engram_cfg.get('seed', 0), config = backbone_config, ) with torch.no_grad(): mask_tensor = get_mask_from_string(input_str, tokenizer).unsqueeze(0).to(device) # is_masked = torch.ones(mask_tensor.shape, dtype=torch.bool, device=device) is_masked = mask_tensor >= tokenizer.token_to_id("<|mask|>") rounds = 0 while is_masked.any(): rounds += 1 output = model(input_ids=mask_tensor)[0] # Logit to probability output = torch.softmax(output, dim=-1) unmasked_any = False prob_info = [] most_certain_token = (0, 0, 0) # (probability, index, token_id) # Check each token that still is_masked for i in range(mask_tensor.shape[1]): if is_masked[0, i]: # Get the token with the highest probability predicted_token = output[0, i].argmax().item() prob_info.append( f"{output[0, i, predicted_token].item():.2f} {tokenizer.id_to_token(predicted_token)}" ) most_certain_token = max( most_certain_token, (output[0, i, predicted_token].item(), i, predicted_token) ) # If the probability is above the threshold, replace the mask if output[0, i, predicted_token].item() > threshold: mask_tensor[0, i] = predicted_token is_masked[0, i] = False unmasked_any = True else: prob_info.append("") if not unmasked_any: # Unmask the most certain one mask_tensor[0, most_certain_token[1]] = most_certain_token[2] is_masked[0, most_certain_token[1]] = False masked_str = "".join( ( tokenizer.id_to_token(mask_tensor[0, i].item()) if not is_masked[0, i] else tokenizer.id_to_token(mask_tensor[0, i].item())[7:-2] ) for i in range(mask_tensor.shape[1]) ) print(masked_str) if __name__ == "__main__": device = torch.device("cuda" if torch.cuda.is_available() else "cpu") tokenizer = Tokenizer.from_file("tokenizer.json") # Load from local directory using AutoModel # Note: Ensure you have transformers installed and trust_remote_code=True try: from transformers import AutoModelForCausalLM model = AutoModelForCausalLM.from_pretrained(".", trust_remote_code=True).to(device) except Exception as e: print(f"Failed to load with AutoModel: {e}") print("Falling back to manual loading (if needed, but prefer AutoModel for validation)") # Fallback code removed for clarity as we want to enforce AutoModel structure raise e # To bfloat16 model = model.to(torch.bfloat16) if device.type == "cuda" else model.float() print("Loaded model. Parameters:", sum(p.numel() for p in model.parameters())) threshold = 0.9 while True: input_str = input("Enter a string to process: ") inference(model, input_str, tokenizer, device, threshold=threshold) print("") # 空行分隔 # Input example: nhkzotdgjvdmleunkmiekz。 # Output: 黄河是中华民族的母亲河。 # Input example: mdflswsyelfl,eyxxmdswsyelfl,raxxmdelfl,otfixdzhfnjrugfoirmbisunswsyelfl。zhldxxdgun“mdfl”uvelflqhnvxtmdunkmpbofvjcjnnmdunsoirpbucheel。 # Output: 大型语言模型,也称大语言模型,简称大模型,是一种基于人工神经网络的语言模型。其名称中的“大型”指模型具有庞大的参数量以及巨大的训练数据规模。 # Input example: hgzz(Go o g l e )otfiwjpmrnxjuchkaf,hdidjifngmrnsdoovsoggn. # Output: # 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城. # 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。 # 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。 # 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。 # 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。 # Input example: jxvuygvbotghtusvwtvbdt。auwvvbotcbghwhtkshdl? # Output: # 天对地,雨对风。大陆对长空。山lj对ke树,赤日对ljeb。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨tq晚霞红。 # 天对地,雨对风。大陆对长空。山lj对杂树,赤日对苍eb。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨雷晚霞红。 # 天对地,雨对风。大陆对长空。山lj对杂树,赤日对苍穹。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨雷晚霞红。 # 天对地,雨对风。大陆对长空。山苍对杂树,赤日对苍穹。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨雷晚霞红。 # (Expected Output: 天对地,雨对风。大陆对长空。山花对海树,赤日对苍穹。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨霁晚霞红。)