Spaces:
Runtime error
Runtime error
# import gradio as gr | |
import gradio | |
# import lmdb | |
# import base64 | |
# import io | |
import random | |
import time | |
import os | |
import re | |
import sys | |
import json | |
import copy | |
# import sqlite3 | |
import hashlib | |
import uuid | |
from urllib.parse import urljoin | |
import openai | |
def get_random_sleep(base_time, random_range): | |
return (base_time + random.randint(-random_range, random_range))*0.001 | |
def js_load(txt): | |
try: | |
return json.loads(txt) | |
except Exception as error: | |
print('') | |
print('js_load:') | |
print(str(error)) | |
print('') | |
return None | |
def js_dump(thing): | |
try: | |
return json.dumps(thing) | |
except Exception as error: | |
print('') | |
print('js_dump:') | |
print(str(error)) | |
print('') | |
return None | |
def filtered_history(history, num=0): | |
if num > 0: | |
filtered = list(filter(lambda it:(it['type'] in ['request', 'response']), history)) | |
return filtered[-num:] | |
return [] | |
def filtered_history_messages(history, num=0): | |
filtered = filtered_history(history, num) | |
return list(map(lambda it:{'role': it.get('role'), 'content': it.get('content')}, filtered)) | |
def make_md_line(role, content): | |
return f"""\n##### `{role}`\n\n{content}\n""" | |
def make_md_by_history(history): | |
md = "" | |
for item in history: | |
md += make_md_line(item.get('role'), item.get('content')) | |
return md | |
def make_history_file_fn(history): | |
uuid4 = str(uuid.uuid4()) | |
json_file_path = None | |
md_file_path = None | |
try: | |
# 如果目录不存在,则创建目录 | |
os.makedirs('temp_files', exist_ok=True) | |
json_file_content = json.dumps(history) | |
json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') | |
with open(json_file_path, 'w') as f: | |
f.write(json_file_content) | |
md_file_content = make_md_by_history(history) | |
md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') | |
with open(md_file_path, 'w') as f: | |
f.write(md_file_content) | |
return json_file_path, md_file_path, gradio.update(visible=True) | |
except Exception as error: | |
print(f"\n{error}\n") | |
return json_file_path, md_file_path, gradio.update(visible=True) | |
def make_history_file_fn__(history): | |
uuid4 = str(uuid.uuid4()) | |
try: | |
json_file_content = json.dumps(history) | |
json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') | |
with open(json_file_path, 'w') as f: | |
f.write(json_file_content) | |
except Exception as error: | |
print(f"\n{error}\n") | |
json_file_path = None | |
try: | |
md_file_content = make_md_by_history(history) | |
md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') | |
with open(md_file_path, 'w') as f: | |
f.write(md_file_content) | |
except Exception as error: | |
print(f"\n{error}\n") | |
md_file_path = None | |
return json_file_path, md_file_path, gradio.update(visible=True) | |
def make_user_message_list_fn__( | |
user_message_template, # 模板,套用到每一条消息上 | |
user_message_template_mask, # 模板中要被替换的部分 | |
user_message_template_mask_is_regex, # 决定如何构造用于替换的正则表达式 | |
user_message_list_text, # 一段文本,包含了每一条用户消息 | |
user_message_list_text_splitter, # 描述了应该以什么为线索来切分 user_message_list_text | |
user_message_list_text_splitter_is_regex, # 决定如何进行切分 | |
) -> list: | |
# 返回套用了模板的用户信息列表 | |
# 这个实现首先根据是否使用正则表达式来切分用户消息列表文本,并将切分后的消息存储在一个列表中。 | |
# 然后,针对每个消息,根据user_message_template_mask及user_message_template_mask_is_regex替换模板中的部分内容, | |
# 并将替换后的结果添加到结果列表中。 | |
# 最后,返回结果列表。 | |
# 切分用户消息列表文本 | |
if user_message_list_text_splitter_is_regex: | |
user_messages = re.split(user_message_list_text_splitter, user_message_list_text) | |
else: | |
user_messages = user_message_list_text.split(user_message_list_text_splitter) | |
# 生成套用模板的用户信息列表 | |
user_message_result_list = [] | |
for message in user_messages: | |
# 替换模板内容 | |
if user_message_template_mask_is_regex: | |
transformed_message = re.sub(user_message_template_mask, message, user_message_template) | |
else: | |
transformed_message = user_message_template.replace(user_message_template_mask, message) | |
user_message_result_list.append(transformed_message) | |
return user_message_result_list | |
def make_user_message_list_fn( | |
user_message_template, | |
user_message_template_mask, | |
user_message_template_mask_is_regex, | |
user_message_list_text, | |
user_message_list_text_splitter, | |
user_message_list_text_splitter_is_regex, | |
) -> list: | |
# 实际上,只要保证在使用正则表达式进行替换或切分操作之前,已经将其编译为正则表达式对象即可。 | |
# 在我的修改中,针对 xxx_is_regex 参数为 True 的情况,将这些参数编译成正则表达式。 | |
# 对于替换操作和切分操作,只需检查是否已经编译为正则表达式,并使用相应的方法即可。 | |
# 编译正则表达式 | |
if user_message_template_mask_is_regex: | |
user_message_template_mask = re.compile(user_message_template_mask) | |
if user_message_list_text_splitter_is_regex: | |
user_message_list_text_splitter = re.compile(user_message_list_text_splitter) | |
# 切分用户消息列表文本 | |
if user_message_list_text_splitter_is_regex: | |
user_messages = user_message_list_text_splitter.split(user_message_list_text) | |
else: | |
user_messages = user_message_list_text.split(user_message_list_text_splitter) | |
# 生成套用模板的用户信息列表 | |
user_message_result_list = [] | |
for message in user_messages: | |
# 替换模板内容 | |
if user_message_template_mask_is_regex: | |
transformed_message = user_message_template_mask.sub(message, user_message_template) | |
else: | |
transformed_message = user_message_template.replace(user_message_template_mask, message) | |
user_message_result_list.append(transformed_message) | |
return user_message_result_list | |
def sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): | |
# print("\n\n") | |
assistant_message = "" | |
tips = "" | |
try: | |
openai.api_key = api_key_text | |
completion = openai.ChatCompletion.create(**payload) | |
if payload.get('stream'): | |
print('assistant:') | |
# print('->>>') | |
is_first=True | |
for chunk in completion: | |
if is_first: | |
is_first = False | |
continue | |
if chunk.choices[0].finish_reason is None: | |
# sys.stdout.write("\r") | |
print(chunk.choices[0].delta.content or '', end="") | |
assistant_message += chunk.choices[0].delta.content or '' | |
# print(f"\033[2K{assistant_message}", end="") | |
history_md_stream = make_md_line('assistant', assistant_message) | |
tips = 'streaming' | |
yield assistant_message, history_md_stream, tips, history | |
else: | |
pass | |
pass | |
# print('=>>>') | |
print('') | |
pass | |
else: | |
assistant_message = completion.choices[0].message.content | |
history_md_stream = make_md_line('assistant', assistant_message) | |
tips = 'got' | |
print('assistant:') | |
print(assistant_message) | |
yield assistant_message, history_md_stream, tips, history | |
pass | |
except Exception as error: | |
tips = str(error) | |
history.append({"role": "app", "content": tips}) | |
print(f"\n{tips}\n") | |
yield assistant_message, history_md_stream, tips, history | |
pass | |
# print("\n\n") | |
def sequential_chat_fn( | |
history, | |
system_prompt_enabled, | |
system_prompt, | |
user_message_template, | |
user_message_template_mask, | |
user_message_template_mask_is_regex, | |
user_message_list_text, | |
user_message_list_text_splitter, | |
user_message_list_text_splitter_is_regex, | |
history_prompt_num, | |
api_key_text, token_text, | |
sleep_base, sleep_rand, | |
prop_stream, prop_model, prop_temperature, prop_top_p, prop_choices_num, prop_max_tokens, prop_presence_penalty, prop_frequency_penalty, prop_logit_bias, | |
): | |
# outputs=[ | |
# history, | |
# history_md_stable, | |
# history_md_stream, | |
# tips, | |
# file_row, | |
# ], | |
history_md_stable = "" | |
history_md_stream = "" | |
tips = "" | |
try: | |
user_message_list = make_user_message_list_fn( | |
user_message_template, | |
user_message_template_mask, | |
user_message_template_mask_is_regex, | |
user_message_list_text, | |
user_message_list_text_splitter, | |
user_message_list_text_splitter_is_regex, | |
) | |
payload = { | |
'model': prop_model, | |
'temperature': prop_temperature, | |
'top_p': prop_top_p, | |
'n': prop_choices_num, | |
'stream': prop_stream, | |
'presence_penalty': prop_presence_penalty, | |
'frequency_penalty': prop_frequency_penalty, | |
'user': token_text, | |
} | |
if prop_max_tokens>0: | |
payload['max_tokens'] = prop_max_tokens | |
# if prop_logit_bias is not None: | |
# payload['logit_bias'] = prop_logit_bias | |
# headers = { | |
# "Content-Type": "application/json", | |
# "Authorization": f"Bearer {api_key_text}" | |
# } | |
for user_message in user_message_list: | |
print('') | |
print(f'user({token_text}):') | |
print(user_message) | |
print('') | |
# make the_messages to sent | |
the_messages = [] | |
if system_prompt_enabled: | |
the_messages.append({"role": "system", "content": system_prompt}) | |
for msg in filtered_history_messages(history, num=history_prompt_num): | |
the_messages.append(msg) | |
the_messages.append({"role": "user", "content": user_message}) | |
payload['messages'] = the_messages | |
history.append({"role": "user", "content": user_message, "type": "request", "payload": payload}) | |
history_md_stable = make_md_by_history(history) | |
history_md_stream = "" | |
tips = "" | |
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
try: | |
for (assistant_message, history_md_stream, tips, history) in sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): | |
yield history, history_md_stable, history_md_stream, tips, gradio.update() | |
history.append({"role": "assistant", "content": assistant_message, "type": "request"}) | |
history_md_stable += history_md_stream | |
history_md_stream = "" | |
tips = "fine" | |
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
except Exception as error: | |
tips = f'error: {str(error)}' | |
history.append({"role": "app", "content": tips}) | |
print(f"\n{tips}\n") | |
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
time.sleep(get_random_sleep(sleep_base, sleep_rand)) | |
pass | |
except Exception as error: | |
tips = str(error) | |
history.append({"role": "app", "content": tips}) | |
print(f"\n{tips}\n") | |
yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) | |
pass | |
def on_click_send_btn( | |
global_state_json, api_key_text, chat_input_role, chat_input, prompt_table, chat_use_prompt, chat_use_history, chat_log, | |
chat_model, temperature, top_p, choices_num, stream, max_tokens, presence_penalty, frequency_penalty, logit_bias, | |
): | |
old_state = json.loads(global_state_json or "{}") | |
print('\n\n\n\n\n') | |
print(prompt_table) | |
prompt_table = prompt_table or [] | |
chat_log = chat_log or [] | |
chat_log_md = '' | |
if chat_use_prompt: | |
chat_log_md += '<center>(prompt)</center>\n\n' | |
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) | |
chat_log_md += '\n---\n' | |
if True: | |
chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n' | |
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) | |
chat_log_md += '\n---\n' | |
# if chat_input=='': | |
# return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input | |
print('\n') | |
print(chat_input) | |
print('') | |
try: | |
logit_bias_json = json.dumps(logit_bias) if logit_bias else None | |
except: | |
return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input | |
new_state = copy.deepcopy(old_state) or {} | |
req_hist = copy.deepcopy(prompt_table) if chat_use_prompt else [] | |
if chat_use_history: | |
for hh in (chat_log or []): | |
req_hist.append(hh) | |
if chat_input and chat_input!="": | |
req_hist.append([(chat_input_role or 'user'), chat_input]) | |
openai.api_key = api_key_text | |
props = { | |
'model': chat_model, | |
'messages': [xx for xx in map(lambda it: {'role':it[0], 'content':it[1]}, req_hist)], | |
'temperature': temperature, | |
'top_p': top_p, | |
'n': choices_num, | |
'stream': stream, | |
'presence_penalty': presence_penalty, | |
'frequency_penalty': frequency_penalty, | |
} | |
if max_tokens>0: | |
props['max_tokens'] = max_tokens | |
if logit_bias_json is not None: | |
props['logit_bias'] = logit_bias_json | |
props_json = json.dumps(props) | |
try: | |
completion = openai.ChatCompletion.create(**props) | |
print('') | |
# print(completion.choices) | |
# the_response_role = completion.choices[0].message.role | |
# the_response = completion.choices[0].message.content | |
# print(the_response) | |
# print('') | |
# chat_last_resp = json.dumps(completion.__dict__) | |
# chat_last_resp_dict = json.loads(chat_last_resp) | |
# chat_last_resp_dict['api_key'] = "hidden by UI" | |
# chat_last_resp_dict['organization'] = "hidden by UI" | |
# chat_last_resp = json.dumps(chat_last_resp_dict) | |
chat_log_md = '' | |
if chat_use_prompt: | |
chat_log_md += '<center>(prompt)</center>\n\n' | |
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) | |
chat_log_md += '\n---\n' | |
if True: | |
chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n' | |
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) | |
chat_log_md += '\n---\n' | |
if chat_input and chat_input!="": | |
chat_log.append([(chat_input_role or 'user'), chat_input]) | |
chat_log_md += f"##### `{(chat_input_role or 'user')}`\n\n{chat_input}\n\n" | |
partial_words = "" | |
counter=0 | |
if stream: | |
the_response = '' | |
the_response_role = '' | |
for chunk in completion: | |
#Skipping first chunk | |
if counter == 0: | |
the_response_role = chunk.choices[0].delta.role | |
chat_log_md += f"##### `{the_response_role}`\n\n" | |
counter += 1 | |
continue | |
# print(('chunk', chunk)) | |
if chunk.choices[0].finish_reason is None: | |
the_response_chunk = chunk.choices[0].delta.content | |
the_response += the_response_chunk | |
chat_log_md += f"{the_response_chunk}" | |
yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, "{}", props_json, '' | |
else: | |
chat_log.append([the_response_role, the_response]) | |
chat_log_md += f"\n\n" | |
yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, '{"msg": "stream模式不支持显示"}', props_json, '' | |
# chat_last_resp = json.dumps(completion.__dict__) | |
# chat_last_resp_dict = json.loads(chat_last_resp) | |
# chat_last_resp_dict['api_key'] = "hidden by UI" | |
# chat_last_resp_dict['organization'] = "hidden by UI" | |
# chat_last_resp = json.dumps(chat_last_resp_dict) | |
else: | |
the_response_role = completion.choices[0].message.role | |
the_response = completion.choices[0].message.content | |
print(the_response) | |
print('') | |
chat_log.append([the_response_role, the_response]) | |
chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n" | |
chat_last_resp = json.dumps(completion.__dict__) | |
chat_last_resp_dict = json.loads(chat_last_resp) | |
chat_last_resp_dict['api_key'] = "hidden by UI" | |
chat_last_resp_dict['organization'] = "hidden by UI" | |
chat_last_resp = json.dumps(chat_last_resp_dict) | |
return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, '' | |
# chat_log.append([the_response_role, the_response]) | |
# chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n" | |
# return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, '' | |
except Exception as error: | |
print(error) | |
print('error!!!!!!') | |
chat_log_md = '' | |
if chat_use_prompt: | |
chat_log_md += '<center>(prompt)</center>\n\n' | |
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) | |
chat_log_md += '\n---\n' | |
if True: | |
chat_log_md += '<center>(history)</center>\n\n' if chat_use_history else '<center>(not used history)</center>\n\n' | |
chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) | |
chat_log_md += '\n---\n' | |
# chat_log_md = '' | |
# chat_log_md = "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) if chat_use_prompt else '' | |
# chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", hist)]) | |
chat_log_md += "\n" | |
chat_log_md += str(error) | |
return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, None, props_json, chat_input | |
def clear_history(): | |
return [], "" | |
def copy_history(txt): | |
# print('\n\n copying') | |
# print(txt) | |
# print('\n\n') | |
pass | |
def update_saved_prompt_titles(global_state_json, selected_saved_prompt_title): | |
print('') | |
global_state = json.loads(global_state_json or "{}") | |
print(global_state) | |
print(selected_saved_prompt_title) | |
saved_prompts = global_state.get('saved_prompts') or [] | |
print(saved_prompts) | |
the_choices = [(it.get('title') or '[untitled]') for it in saved_prompts] | |
print(the_choices) | |
print('') | |
return gradio.Dropdown.update(choices=the_choices) | |
def save_prompt(global_state_json, saved_prompts, prompt_title, prompt_table): | |
the_choices = [] | |
global_state = json.loads(global_state_json or "{}") | |
saved_prompts = global_state.get('saved_prompts') or [] | |
if len(saved_prompts): | |
the_choices = [it.get('title') or '[untitled]' for it in saved_prompts] | |
pass | |
return global_state_json, gradio.Dropdown.update(choices=the_choices, value=prompt_title), prompt_title, prompt_table | |
def load_saved_prompt(title): | |
pass | |