|
import os |
|
import json |
|
import datetime |
|
import gradio as gr |
|
import torch |
|
from transformers import AutoTokenizer, AutoModel |
|
|
|
|
|
if torch.cuda.is_available(): |
|
device = 'cuda' |
|
else: |
|
device = 'cpu' |
|
|
|
tokenizer = AutoTokenizer.from_pretrained('THUDM/chatglm-6b-int4', trust_remote_code=True) |
|
|
|
if device == 'cuda': |
|
model = AutoModel.from_pretrained('THUDM/chatglm-6b-int4', trust_remote_code=True).half().cuda() |
|
else: |
|
model = AutoModel.from_pretrained('THUDM/chatglm-6b-int4', trust_remote_code=True).float() |
|
|
|
model = model.eval() |
|
|
|
|
|
def parse_text(text): |
|
lines = text.split('\n') |
|
for i, line in enumerate(lines): |
|
if '```' in line: |
|
item = line.split('`')[-1] |
|
if item: |
|
lines[i] = f'<pre><code class="{item}">' |
|
else: |
|
lines[i] = '</code></pre>' |
|
else: |
|
if i > 0: |
|
line = line.replace('<', '<').replace('>', '>') |
|
lines[i] = f'<br/>{line}' |
|
return ''.join(lines) |
|
|
|
|
|
def chat_wrapper(query, styled_history, history, max_length, top_p, temperature, memory_limit): |
|
if query == '': |
|
return [], [], '', *gr_hide() |
|
if memory_limit == 0: |
|
history = [] |
|
styled_history = [] |
|
elif memory_limit > 0: |
|
history = history[-memory_limit:] |
|
styled_history = styled_history[-memory_limit:] |
|
flag = True |
|
styled_history_pos = 0 |
|
for message, history in model.stream_chat(tokenizer, query, history=history, |
|
max_length=max_length, top_p=top_p, temperature=temperature): |
|
if flag: |
|
styled_history.append((parse_text(query), parse_text(message))) |
|
styled_history_pos = len(styled_history) - 1 |
|
flag = False |
|
else: |
|
styled_history[styled_history_pos] = (parse_text(query), parse_text(message)) |
|
yield styled_history, history, '', *gr_hide() |
|
|
|
|
|
def regenerate_wrapper(styled_history, history, max_length, top_p, temperature, memory_limit): |
|
if not history: |
|
return [], [], '', *gr_hide() |
|
|
|
styled_history, history, query, _, _, _ = edit_wrapper(styled_history, history) |
|
for ret in chat_wrapper(query, styled_history, history, max_length, top_p, temperature, memory_limit): |
|
yield ret |
|
|
|
|
|
def edit_wrapper(styled_history, history): |
|
if len(history) == 0: |
|
return [], [], '' |
|
query = history[-1][0] |
|
history = history[:-1] |
|
styled_history = styled_history[:-1] |
|
return styled_history, history, query, *gr_hide() |
|
|
|
|
|
def reset_history(): |
|
return [], [], '', *gr_hide() |
|
|
|
|
|
def save_history(history): |
|
os.makedirs('log', exist_ok=True) |
|
dict_list = [{'input': q, 'output': a} for q, a in history] |
|
with open(f'log/{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}.json', 'w', encoding='utf-8') as f: |
|
json.dump(dict_list, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
def save_config(max_length=2048, top_p=0.7, temperature=0.7, memory_limit=0.0): |
|
with open('config.json', 'w') as f: |
|
json.dump({'max_length': max_length, 'top_p': top_p, 'temperature': temperature, 'memory_limit': memory_limit}, f, indent=2) |
|
|
|
|
|
def load_history(file, styled_history, history): |
|
current_styled_history, current_history = styled_history.copy(), history.copy() |
|
try: |
|
with open(file.name, 'r', encoding='utf-8') as f: |
|
dict_list = json.load(f) |
|
history = [(item['input'], item['output']) for item in dict_list] |
|
styled_history = [(parse_text(item['input']), parse_text(item['output'])) for item in dict_list] |
|
except BaseException: |
|
return current_styled_history, current_history, '' |
|
return styled_history, history, '', *gr_hide() |
|
|
|
|
|
def gr_show_and_load(history, evt: gr.SelectData): |
|
if evt.index[1] == 0: |
|
label = f'Editing Q{evt.index[0]}:' |
|
else: |
|
label = f'Editing Q{evt.index[0]}:' |
|
return {'visible': True, '__type__': 'update'}, {'value': history[evt.index[0]][evt.index[1]], 'label': label, '__type__': 'update'}, evt.index |
|
|
|
|
|
def update_history(styled_history, history, log, idx): |
|
if log == '': |
|
return styled_history, history, {'visible': True, '__type__': 'update'}, {'value': history[idx[0]][idx[1]], '__type__': 'update'}, idx |
|
|
|
def swap_value(lst, idx, value): |
|
lst[idx[0]] = tuple(value if j == idx[1] else elem for j, elem in enumerate(lst[idx[0]])) |
|
return lst |
|
styled_history = swap_value(styled_history, idx, parse_text(log)) |
|
history = swap_value(history, idx, log) |
|
return styled_history, history, *gr_hide() |
|
|
|
|
|
def gr_hide(): |
|
return {'visible': False, '__type__': 'update'}, {'value': '', 'label': '', '__type__': 'update'}, [] |
|
|
|
|
|
with gr.Blocks() as demo: |
|
if not os.path.isfile('config.json'): |
|
save_config() |
|
|
|
with open('config.json', 'r', encoding='utf-8') as f: |
|
configs = json.loads(f.read()) |
|
|
|
gr.Markdown('''<h1><center>ChatGLM Demo</center></h1>''') |
|
|
|
with gr.Row(): |
|
max_length = gr.Slider(minimum=4.0, maximum=4096.0, step=4.0, label='Max Length', value=configs['max_length']) |
|
top_p = gr.Slider(minimum=0.01, maximum=1.0, step=0.01, label='Top P', value=configs['top_p']) |
|
temperature = gr.Slider(minimum=0.01, maximum=2.0, step=0.01, label='Temperature', value=configs['temperature']) |
|
memory_limit = gr.Slider(minimum=0.0, maximum=20.0, step=1.0, label='Memory Limit', value=configs['memory_limit']) |
|
save_conf = gr.Button('保存设置', visible=False) |
|
|
|
gr.Markdown('''<h2>Hint: click on a chat bubble to edit chat history</h2>''') |
|
|
|
state = gr.State([]) |
|
chatbot = gr.Chatbot(elem_id='chatbot', show_label=False) |
|
with gr.Row(visible=False) as edit_log: |
|
with gr.Column(): |
|
log = gr.Textbox() |
|
with gr.Row(): |
|
submit_log = gr.Button('保存') |
|
cancel_log = gr.Button('取消') |
|
log_idx = gr.State([]) |
|
|
|
message = gr.Textbox(placeholder='Input your message', label='Q:') |
|
|
|
with gr.Row(): |
|
submit = gr.Button('Submit') |
|
edit = gr.Button('Edit last question') |
|
regen = gr.Button('Re-generate') |
|
|
|
delete = gr.Button('Reset chat') |
|
|
|
with gr.Row(visible=False): |
|
save = gr.Button('保存对话(在 `log` 文件夹下)') |
|
load = gr.UploadButton('读取对话', file_types=['file'], file_count='single') |
|
|
|
input_list = [message, chatbot, state, max_length, top_p, temperature, memory_limit] |
|
output_list = [chatbot, state, message] |
|
edit_list = [edit_log, log, log_idx] |
|
|
|
save_conf.click(save_config, inputs=input_list[3:]) |
|
load.upload(load_history, inputs=[load, chatbot, state], outputs=output_list + edit_list) |
|
save.click(save_history, inputs=[state]) |
|
message.submit(chat_wrapper, inputs=input_list, outputs=output_list + edit_list) |
|
submit.click(chat_wrapper, inputs=input_list, outputs=output_list + edit_list) |
|
edit.click(edit_wrapper, inputs=input_list[1:3], outputs=output_list + edit_list) |
|
regen.click(regenerate_wrapper, inputs=input_list[1:], outputs=output_list + edit_list) |
|
delete.click(reset_history, outputs=output_list + edit_list) |
|
chatbot.select(gr_show_and_load, inputs=[state], outputs=edit_list) |
|
edit_kwargs = {'inputs': [chatbot, state, log, log_idx], 'outputs': [chatbot, state] + edit_list} |
|
log.submit(update_history, **edit_kwargs) |
|
submit_log.click(update_history, **edit_kwargs) |
|
cancel_log.click(gr_hide, outputs=edit_list) |
|
|
|
|
|
if __name__ == '__main__': |
|
demo.queue(concurrency_count=5, max_size=20).launch(debug=True) |
|
|