|
from transformers import AutoTokenizer ,AutoModelForCausalLM |
|
import re |
|
|
|
import os |
|
import yaml |
|
import torch |
|
from torch import package |
|
|
|
import textwrap |
|
from textwrap3 import wrap |
|
|
|
|
|
def get_length_param(text: str, tokenizer) -> str: |
|
tokens_count = len(tokenizer.encode(text)) |
|
if tokens_count <= 15: |
|
len_param = '1' |
|
elif tokens_count <= 50: |
|
len_param = '2' |
|
elif tokens_count <= 256: |
|
len_param = '3' |
|
else: |
|
len_param = '-' |
|
return len_param |
|
|
|
def remove_duplicates(S): |
|
S = re.sub(r'[a-zA-Z]+', '', S) |
|
S = S.split() |
|
result = "" |
|
for subst in S: |
|
if subst not in result: |
|
result += subst+" " |
|
return result.rstrip() |
|
|
|
def removeSigns(S): |
|
last_index = max(S.rfind("."), S.rfind("!")) |
|
if last_index >= 0: |
|
S = S[:last_index+1] |
|
return S |
|
|
|
def prepare_punct(): |
|
torch.hub.download_url_to_file('https://raw.githubusercontent.com/snakers4/silero-models/master/models.yml', |
|
'latest_silero_models.yml', |
|
progress=False) |
|
|
|
with open('latest_silero_models.yml', 'r') as yaml_file: |
|
models = yaml.load(yaml_file, Loader=yaml.SafeLoader) |
|
model_conf = models.get('te_models').get('latest') |
|
|
|
|
|
model_url = model_conf.get('package') |
|
|
|
model_dir = "downloaded_model" |
|
os.makedirs(model_dir, exist_ok=True) |
|
model_path = os.path.join(model_dir, os.path.basename(model_url)) |
|
|
|
if not os.path.isfile(model_path): |
|
torch.hub.download_url_to_file(model_url, |
|
model_path, |
|
progress=True) |
|
|
|
imp = package.PackageImporter(model_path) |
|
model_punct = imp.load_pickle("te_model", "model") |
|
|
|
return model_punct |
|
|
|
def initialize(): |
|
""" Loading the model """ |
|
torch.backends.quantized.engine = 'qnnpack' |
|
fit_checkpoint = "WarBot" |
|
tokenizer = AutoTokenizer.from_pretrained(fit_checkpoint) |
|
model = AutoModelForCausalLM.from_pretrained(fit_checkpoint) |
|
model_punсt = prepare_punct() |
|
return (model,tokenizer,model_punсt) |
|
|
|
def split_string(string,n=256): |
|
return [string[i:i+n] for i in range(0, len(string), n)] |
|
|
|
def get_response(quote:str,model,tokenizer,model_punct): |
|
|
|
user_inpit_ids = tokenizer.encode(f"|0|{get_length_param(quote, tokenizer)}|" \ |
|
+ quote + tokenizer.eos_token, return_tensors="pt") |
|
|
|
chat_history_ids = user_inpit_ids |
|
|
|
tokens_count = len(tokenizer.encode(quote)) |
|
if tokens_count < 15: |
|
no_repeat_ngram_size = 2 |
|
else: |
|
no_repeat_ngram_size = 1 |
|
|
|
output_id = model.generate( |
|
chat_history_ids, |
|
num_return_sequences=1, |
|
max_length=200, |
|
no_repeat_ngram_size=no_repeat_ngram_size, |
|
do_sample=True, |
|
top_k=50, |
|
top_p=0.9, |
|
temperature = 0.4, |
|
|
|
eos_token_id=tokenizer.eos_token_id, |
|
|
|
pad_token_id=tokenizer.pad_token_id, |
|
|
|
|
|
) |
|
|
|
response = tokenizer.decode(output_id[0], skip_special_tokens=True) |
|
response = removeSigns(response) |
|
response = response.split(quote)[-1] |
|
response = re.sub(r'[^0-9А-Яа-яЁёa-zA-z;., !()/\-+:?]', '', |
|
response) |
|
response = remove_duplicates(re.sub(r"\d{4,}", "", response)) |
|
response = re.sub(r'\.\.+', '', response) |
|
|
|
maxLen = 170 |
|
|
|
try: |
|
if len(response)>maxLen: |
|
resps = wrap(response,maxLen) |
|
for i in range(len(resps)): |
|
resps[i] = model_punct.enhance_text(resps[i], lan='ru') |
|
response = ''.join(resps) |
|
else: |
|
response = model_punct.enhance_text(response, lan='ru') |
|
except: |
|
pass |
|
|
|
response = re.sub(r'[UNK]', '', response) |
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|