Epsilonoid's picture
Init
db56acd verified
from tokenizers import Tokenizer
import torch
import numpy as np
import time
import os
from datetime import datetime
def process_string_into_pairs(input_str: str) -> list[str]:
result = []
i = 0
n = len(input_str)
while i < n:
char = input_str[i]
# 检查当前字符是否为小写字母
if "a" <= char <= "z":
# 检查是否有下一个字符,并且下一个字符也是小写字母(配对情况)
if i + 1 < n and "a" <= input_str[i + 1] <= "z":
result.append(char + input_str[i + 1])
i += 2 # 跳过两个字符
# 检查是否有下一个字符,并且下一个字符是空格(落单小写字母+空格 的特殊情况)
elif i + 1 < n and input_str[i + 1] == " ":
result.append(char)
i += 2 # 跳过当前字母和后面的空格
# 其他情况(落单小写字母,后面是其他字符或已到末尾)
else:
result.append(char)
i += 1 # 只跳过当前一个字符
# 如果当前字符不是小写字母
else:
result.append(char)
i += 1 # 只跳过当前一个字符
return result
def get_mask_from_string(input_str: str, tokenizer) -> torch.Tensor:
pairs = process_string_into_pairs(input_str)
masks = [
f"<|mask_{pair}|>" if all(ord(i) < 128 for i in pair) else pair
for pair in pairs
]
mask_tensor = torch.tensor(
[tokenizer.token_to_id(mask) for mask in masks], dtype=torch.long
)
return mask_tensor
def inference(model, input_str: str, tokenizer, device, threshold=0.9):
model.eval()
# Initialize NgramHashMapping
engram_cfg = model.config.engram_config
hash_mapping = None
if engram_cfg is not None:
from modeling_llada_engram import ModelConfig, EngramConfig, NgramHashMapping
from dataclasses import fields
# Prepare ModelConfig for NgramHashMapping
backbone_config_dict = model.config.to_dict()
# Filter out keys not in ModelConfig if necessary, but ModelConfig usually matches LLaDAConfig fields
backbone_config = ModelConfig(**{k: v for k, v in backbone_config_dict.items() if k in [f.name for f in fields(ModelConfig)]})
hash_mapping = NgramHashMapping(
engram_vocab_size = engram_cfg.get('engram_vocab_size', [129280*5, 129280*5]),
max_ngram_size = engram_cfg.get('max_ngram_size', 3),
n_embed_per_ngram = engram_cfg.get('n_embed_per_ngram', 512),
n_head_per_ngram = engram_cfg.get('n_head_per_ngram', 8),
layer_ids = engram_cfg.get('layer_ids', [1, 15]),
pad_id = engram_cfg.get('pad_id', 2),
seed = engram_cfg.get('seed', 0),
config = backbone_config,
)
with torch.no_grad():
mask_tensor = get_mask_from_string(input_str, tokenizer).unsqueeze(0).to(device)
# is_masked = torch.ones(mask_tensor.shape, dtype=torch.bool, device=device)
is_masked = mask_tensor >= tokenizer.token_to_id("<|mask|>")
rounds = 0
while is_masked.any():
rounds += 1
output = model(input_ids=mask_tensor)[0]
# Logit to probability
output = torch.softmax(output, dim=-1)
unmasked_any = False
prob_info = []
most_certain_token = (0, 0, 0) # (probability, index, token_id)
# Check each token that still is_masked
for i in range(mask_tensor.shape[1]):
if is_masked[0, i]:
# Get the token with the highest probability
predicted_token = output[0, i].argmax().item()
prob_info.append(
f"{output[0, i, predicted_token].item():.2f} {tokenizer.id_to_token(predicted_token)}"
)
most_certain_token = max(
most_certain_token,
(output[0, i, predicted_token].item(), i, predicted_token)
)
# If the probability is above the threshold, replace the mask
if output[0, i, predicted_token].item() > threshold:
mask_tensor[0, i] = predicted_token
is_masked[0, i] = False
unmasked_any = True
else:
prob_info.append("")
if not unmasked_any:
# Unmask the most certain one
mask_tensor[0, most_certain_token[1]] = most_certain_token[2]
is_masked[0, most_certain_token[1]] = False
masked_str = "".join(
(
tokenizer.id_to_token(mask_tensor[0, i].item())
if not is_masked[0, i]
else tokenizer.id_to_token(mask_tensor[0, i].item())[7:-2]
)
for i in range(mask_tensor.shape[1])
)
print(masked_str)
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = Tokenizer.from_file("tokenizer.json")
# Load from local directory using AutoModel
# Note: Ensure you have transformers installed and trust_remote_code=True
try:
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(".", trust_remote_code=True).to(device)
except Exception as e:
print(f"Failed to load with AutoModel: {e}")
print("Falling back to manual loading (if needed, but prefer AutoModel for validation)")
# Fallback code removed for clarity as we want to enforce AutoModel structure
raise e
# To bfloat16
model = model.to(torch.bfloat16) if device.type == "cuda" else model.float()
print("Loaded model. Parameters:", sum(p.numel() for p in model.parameters()))
threshold = 0.9
while True:
input_str = input("Enter a string to process: ")
inference(model, input_str, tokenizer, device, threshold=threshold)
print("") # 空行分隔
# Input example: nhkzotdgjvdmleunkmiekz。
# Output: 黄河是中华民族的母亲河。
# Input example: mdflswsyelfl,eyxxmdswsyelfl,raxxmdelfl,otfixdzhfnjrugfoirmbisunswsyelfl。zhldxxdgun“mdfl”uvelflqhnvxtmdunkmpbofvjcjnnmdunsoirpbucheel。
# Output: 大型语言模型,也称大语言模型,简称大模型,是一种基于人工神经网络的语言模型。其名称中的“大型”指模型具有庞大的参数量以及巨大的训练数据规模。
# Input example: hgzz(Go o g l e )otfiwjpmrnxjuchkaf,hdidjifngmrnsdoovsoggn.
# Output:
# 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城.
# 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。
# 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。
# 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。
# 谷歌(Google)是一家跨国科技公司,总部位于美国加州山景城。
# Input example: jxvuygvbotghtusvwtvbdt。auwvvbotcbghwhtkshdl?
# Output:
# 天对地,雨对风。大陆对长空。山lj对ke树,赤日对ljeb。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨tq晚霞红。
# 天对地,雨对风。大陆对长空。山lj对杂树,赤日对苍eb。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨雷晚霞红。
# 天对地,雨对风。大陆对长空。山lj对杂树,赤日对苍穹。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨雷晚霞红。
# 天对地,雨对风。大陆对长空。山苍对杂树,赤日对苍穹。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨雷晚霞红。
# (Expected Output: 天对地,雨对风。大陆对长空。山花对海树,赤日对苍穹。雷隐隐,雾蒙蒙。日下对天中。风高秋月白,雨霁晚霞红。)