import os import time import ftplib import threading from tqdm.notebook import tqdm import zipfile import gradio as gr import torch # from transformers import T5Tokenizer, T5ForConditionalGeneration from transformers import AutoTokenizer, AutoModel def get_model_ftp(model_path, model_name): ftp = ftplib.FTP('10.209.16.22') ftp.login('soltest', 'soltest') folder_path = '/ftp/3D/ai-model/ChatYuan/ClueAI/' ftp.cwd(folder_path) file_list = ftp.nlst(folder_path) if os.path.join(folder_path, model_name) in file_list: # 获取远程文件的大小 file_size = ftp.size(model_name) # 创建本地文件,并用二进制写模式打开 with open(os.path.join(model_path, model_name), 'wb') as f: # 下载文件并显示进度条 with tqdm.wrapattr(f, 'write', desc="Download " + model_name, total=file_size, unit='B', unit_scale=True) as pbar: ftp.retrbinary('RETR ' + model_name, pbar.write) ftp.quit() unzip(model_path, model_name) def unzip(path, file_name): try: stop_unzip = threading.Event() thread = threading.Thread(target=print_flush, args=(stop_unzip, "start decompression ")) thread.start() zip_file = zipfile.ZipFile(os.path.join(path, file_name)) for names in zip_file.namelist(): zip_file.extract(names, path) zip_file.close() stop_unzip.set() thread.join() except Exception as ex: stop_unzip.set() thread.join() os.remove(os.path.join(path, file_name)) raise Exception(f"\nunzip失败:" + str(ex)) def prepare_model(model_dir): model_path = model_dir.split('/')[0] model_name = model_dir.split('/')[1] if not os.path.exists(model_dir): os.makedirs("ClueAI", exist_ok=True) get_model_ftp(model_path, model_name + '.zip') os.remove(os.path.join(model_path, model_name + '.zip')) def print_flush(stop_event, str): loading_strings = [str + ".", str + "..", str + "...", str + ".", str + "..", str + "..."] index = 0 while not stop_event.is_set(): loading_str = loading_strings[index] print(loading_str, end="\r") index = (index + 1) % len(loading_strings) time.sleep(0.5) # Refresh the loading string every three cycles if index == 0: print(" " * len(loading_str), end="\r") time.sleep(0.2) print(loading_strings[index], end="\r") print("\n" + str.split(" ")[1] + " finish.") model_dir = 'ClueAI/ChatYuan-large-v2' prepare_model(model_dir) tokenizer = AutoTokenizer.from_pretrained(model_dir) model = AutoModel.from_pretrained(model_dir, trust_remote_code=True) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) # model.half() def preprocess(text): base_info = "" text = f"{base_info}{text}" text = text.replace("\n", "\\n").replace("\t", "\\t") return text def postprocess(text): return text.replace("\\n", "\n").replace("\\t", "\t").replace( '%20', ' ') # .replace(" ", " ") generate_config = { 'do_sample': True, 'top_p': 0.9, 'top_k': 50, 'temperature': 0.7, 'num_beams': 1, 'max_length': 1024, 'min_length': 3, 'no_repeat_ngram_size': 5, 'length_penalty': 0.6, 'return_dict_in_generate': True, 'output_scores': True } def answer( text, top_p, temperature, sample=True, ): ''' sample:是否抽样。生成任务,可以设置为True; top_p:0-1之间,生成的内容越多样 ''' text = preprocess(text) encoding = tokenizer(text=[text], truncation=True, padding=True, max_length=1024, return_tensors="pt").to(device) if not sample: out = model.generate(**encoding, return_dict_in_generate=True, output_scores=False, max_new_tokens=1024, num_beams=1, length_penalty=0.6) else: out = model.generate(**encoding, return_dict_in_generate=True, output_scores=False, max_new_tokens=1024, do_sample=True, top_p=top_p, temperature=temperature, no_repeat_ngram_size=12) # out=model.generate(**encoding, **generate_config) out_text = tokenizer.batch_decode(out["sequences"], skip_special_tokens=True) return postprocess(out_text[0]) def clear_session(): return '', None def chatyuan_bot(input, history, top_p, temperature, num): history = history or [] if len(history) > num: history = history[-num:] context = "\n".join([ f"用户:{input_text}\n小元:{answer_text}" for input_text, answer_text in history ]) input_text = context + "\n用户:" + input + "\n小元:" input_text = input_text.strip() output_text = answer(input_text, top_p, temperature) print("open_model".center(20, "=")) print(f"{input_text}\n{output_text}") history.append((input, output_text)) return '', history, history def chatyuan_bot_regenerate(input, history, top_p, temperature, num): history = history or [] if history: input = history[-1][0] history = history[:-1] if len(history) > num: history = history[-num:] context = "\n".join([ f"用户:{input_text}\n小元:{answer_text}" for input_text, answer_text in history ]) input_text = context + "\n用户:" + input + "\n小元:" input_text = input_text.strip() output_text = answer(input_text, top_p, temperature) print("open_model".center(20, "=")) print(f"{input_text}\n{output_text}") history.append((input, output_text)) return '', history, history block = gr.Blocks() with block as demo: gr.Markdown("""