# import gradio as gr import gradio # import lmdb # import base64 # import io import random import time import os import re import sys import json import copy # import sqlite3 import hashlib import uuid from urllib.parse import urljoin import openai def get_random_sleep(base_time, random_range): return (base_time + random.randint(-random_range, random_range))*0.001 def js_load(txt): try: return json.loads(txt) except Exception as error: print('') print('js_load:') print(str(error)) print('') return None def js_dump(thing): try: return json.dumps(thing) except Exception as error: print('') print('js_dump:') print(str(error)) print('') return None def filtered_history(history, num=0): if num > 0: filtered = list(filter(lambda it:(it['type'] in ['request', 'response']), history)) return filtered[-num:] return [] def filtered_history_messages(history, num=0): filtered = filtered_history(history, num) return list(map(lambda it:{'role': it.get('role'), 'content': it.get('content')}, filtered)) def make_md_line(role, content): return f"""\n##### `{role}`\n\n{content}\n""" def make_md_by_history(history): md = "" for item in history: md += make_md_line(item.get('role'), item.get('content')) return md def make_history_file_fn(history): uuid4 = str(uuid.uuid4()) json_file_path = None md_file_path = None try: # 如果目录不存在,则创建目录 os.makedirs('temp_files', exist_ok=True) json_file_content = json.dumps(history, ensure_ascii=False) json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') with open(json_file_path, 'w') as f: f.write(json_file_content) md_file_content = make_md_by_history(history) md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') with open(md_file_path, 'w') as f: f.write(md_file_content) return json_file_path, md_file_path, gradio.update(visible=True) except Exception as error: print(f"\n{error}\n") return json_file_path, md_file_path, gradio.update(visible=True) def make_history_file_fn__(history): uuid4 = str(uuid.uuid4()) try: json_file_content = json.dumps(history, ensure_ascii=False) json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') with open(json_file_path, 'w') as f: f.write(json_file_content) except Exception as error: print(f"\n{error}\n") json_file_path = None try: md_file_content = make_md_by_history(history) md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') with open(md_file_path, 'w') as f: f.write(md_file_content) except Exception as error: print(f"\n{error}\n") md_file_path = None return json_file_path, md_file_path, gradio.update(visible=True) def make_user_message_list_fn__( user_message_template, # 模板,套用到每一条消息上 user_message_template_mask, # 模板中要被替换的部分 user_message_template_mask_is_regex, # 决定如何构造用于替换的正则表达式 user_message_list_text, # 一段文本,包含了每一条用户消息 user_message_list_text_splitter, # 描述了应该以什么为线索来切分 user_message_list_text user_message_list_text_splitter_is_regex, # 决定如何进行切分 ) -> list: # 返回套用了模板的用户信息列表 # 这个实现首先根据是否使用正则表达式来切分用户消息列表文本,并将切分后的消息存储在一个列表中。 # 然后,针对每个消息,根据user_message_template_mask及user_message_template_mask_is_regex替换模板中的部分内容, # 并将替换后的结果添加到结果列表中。 # 最后,返回结果列表。 # 切分用户消息列表文本 if user_message_list_text_splitter_is_regex: user_messages = re.split(user_message_list_text_splitter, user_message_list_text) else: user_messages = user_message_list_text.split(user_message_list_text_splitter) # 生成套用模板的用户信息列表 user_message_result_list = [] for message in user_messages: # 替换模板内容 if user_message_template_mask_is_regex: transformed_message = re.sub(user_message_template_mask, message, user_message_template) else: transformed_message = user_message_template.replace(user_message_template_mask, message) user_message_result_list.append(transformed_message) return user_message_result_list def make_user_message_list_fn( user_message_template, user_message_template_mask, user_message_template_mask_is_regex, user_message_list_text, user_message_list_text_splitter, user_message_list_text_splitter_is_regex, ) -> list: # 实际上,只要保证在使用正则表达式进行替换或切分操作之前,已经将其编译为正则表达式对象即可。 # 在我的修改中,针对 xxx_is_regex 参数为 True 的情况,将这些参数编译成正则表达式。 # 对于替换操作和切分操作,只需检查是否已经编译为正则表达式,并使用相应的方法即可。 # 编译正则表达式 if user_message_template_mask_is_regex: user_message_template_mask = re.compile(user_message_template_mask) if user_message_list_text_splitter_is_regex: user_message_list_text_splitter = re.compile(user_message_list_text_splitter) # 切分用户消息列表文本 if user_message_list_text_splitter_is_regex: user_messages = user_message_list_text_splitter.split(user_message_list_text) else: user_messages = user_message_list_text.split(user_message_list_text_splitter) # 生成套用模板的用户信息列表 user_message_result_list = [] for message in user_messages: # 替换模板内容 if user_message_template_mask_is_regex: transformed_message = user_message_template_mask.sub(message, user_message_template) else: transformed_message = user_message_template.replace(user_message_template_mask, message) user_message_result_list.append(transformed_message) return user_message_result_list def sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): # print("\n\n") assistant_message = "" tips = "" try: openai.api_key = api_key_text completion = openai.ChatCompletion.create(**payload) if payload.get('stream'): print('assistant:') # print('->>>') is_first=True for chunk in completion: if is_first: is_first = False continue if chunk.choices[0].finish_reason is None: # sys.stdout.write("\r") print(chunk.choices[0].delta.content or '', end="") assistant_message += chunk.choices[0].delta.content or '' # print(f"\033[2K{assistant_message}", end="") history_md_stream = make_md_line('assistant', assistant_message) tips = 'streaming' yield assistant_message, history_md_stream, tips, history else: pass pass # print('=>>>') print('') pass else: assistant_message = completion.choices[0].message.content history_md_stream = make_md_line('assistant', assistant_message) tips = 'got' print('assistant:') print(assistant_message) yield assistant_message, history_md_stream, tips, history pass except Exception as error: tips = str(error) history.append({"role": "app", "content": tips}) print(f"\n{tips}\n") yield assistant_message, history_md_stream, tips, history pass # print("\n\n") def sequential_chat_fn( history, system_prompt_enabled, system_prompt, user_message_template, user_message_template_mask, user_message_template_mask_is_regex, user_message_list_text, user_message_list_text_splitter, user_message_list_text_splitter_is_regex, history_prompt_num, api_key_text, token_text, sleep_base, sleep_rand, prop_stream, prop_model, prop_temperature, prop_top_p, prop_choices_num, prop_max_tokens, prop_presence_penalty, prop_frequency_penalty, prop_logit_bias, ): # outputs=[ # history, # history_md_stable, # history_md_stream, # tips, # file_row, # ], history_md_stable = "" history_md_stream = "" tips = "" try: user_message_list = make_user_message_list_fn( user_message_template, user_message_template_mask, user_message_template_mask_is_regex, user_message_list_text, user_message_list_text_splitter, user_message_list_text_splitter_is_regex, ) payload = { 'model': prop_model, 'temperature': prop_temperature, 'top_p': prop_top_p, 'n': prop_choices_num, 'stream': prop_stream, 'presence_penalty': prop_presence_penalty, 'frequency_penalty': prop_frequency_penalty, 'user': token_text, } if prop_max_tokens>0: payload['max_tokens'] = prop_max_tokens # if prop_logit_bias is not None: # payload['logit_bias'] = prop_logit_bias # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {api_key_text}" # } for user_message in user_message_list: print('') print(f'user({token_text}):') print(user_message) print('') # make the_messages to sent the_messages = [] if system_prompt_enabled: the_messages.append({"role": "system", "content": system_prompt}) for msg in filtered_history_messages(history, num=history_prompt_num): the_messages.append(msg) the_messages.append({"role": "user", "content": user_message}) payload['messages'] = the_messages history.append({"role": "user", "content": user_message, "type": "request", "payload": payload}) history_md_stable = make_md_by_history(history) history_md_stream = "" tips = "" yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) try: for (assistant_message, history_md_stream, tips, history) in sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): yield history, history_md_stable, history_md_stream, tips, gradio.update() history.append({"role": "assistant", "content": assistant_message, "type": "request"}) history_md_stable += history_md_stream history_md_stream = "" tips = "fine" yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) except Exception as error: tips = f'error: {str(error)}' history.append({"role": "app", "content": tips}) print(f"\n{tips}\n") yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) time.sleep(get_random_sleep(sleep_base, sleep_rand)) pass except Exception as error: tips = str(error) history.append({"role": "app", "content": tips}) print(f"\n{tips}\n") yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) pass def on_click_send_btn( global_state_json, api_key_text, chat_input_role, chat_input, prompt_table, chat_use_prompt, chat_use_history, chat_log, chat_model, temperature, top_p, choices_num, stream, max_tokens, presence_penalty, frequency_penalty, logit_bias, ): old_state = json.loads(global_state_json or "{}") print('\n\n\n\n\n') print(prompt_table) prompt_table = prompt_table or [] chat_log = chat_log or [] chat_log_md = '' if chat_use_prompt: chat_log_md += '
(prompt)
\n\n' chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) chat_log_md += '\n---\n' if True: chat_log_md += '
(history)
\n\n' if chat_use_history else '
(not used history)
\n\n' chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) chat_log_md += '\n---\n' # if chat_input=='': # return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input print('\n') print(chat_input) print('') try: logit_bias_json = json.dumps(logit_bias) if logit_bias else None except: return json.dumps(old_state), chat_log, chat_log_md, chat_log_md, None, None, chat_input new_state = copy.deepcopy(old_state) or {} req_hist = copy.deepcopy(prompt_table) if chat_use_prompt else [] if chat_use_history: for hh in (chat_log or []): req_hist.append(hh) if chat_input and chat_input!="": req_hist.append([(chat_input_role or 'user'), chat_input]) openai.api_key = api_key_text props = { 'model': chat_model, 'messages': [xx for xx in map(lambda it: {'role':it[0], 'content':it[1]}, req_hist)], 'temperature': temperature, 'top_p': top_p, 'n': choices_num, 'stream': stream, 'presence_penalty': presence_penalty, 'frequency_penalty': frequency_penalty, } if max_tokens>0: props['max_tokens'] = max_tokens if logit_bias_json is not None: props['logit_bias'] = logit_bias_json props_json = json.dumps(props) try: completion = openai.ChatCompletion.create(**props) print('') # print(completion.choices) # the_response_role = completion.choices[0].message.role # the_response = completion.choices[0].message.content # print(the_response) # print('') # chat_last_resp = json.dumps(completion.__dict__) # chat_last_resp_dict = json.loads(chat_last_resp) # chat_last_resp_dict['api_key'] = "hidden by UI" # chat_last_resp_dict['organization'] = "hidden by UI" # chat_last_resp = json.dumps(chat_last_resp_dict) chat_log_md = '' if chat_use_prompt: chat_log_md += '
(prompt)
\n\n' chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) chat_log_md += '\n---\n' if True: chat_log_md += '
(history)
\n\n' if chat_use_history else '
(not used history)
\n\n' chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) chat_log_md += '\n---\n' if chat_input and chat_input!="": chat_log.append([(chat_input_role or 'user'), chat_input]) chat_log_md += f"##### `{(chat_input_role or 'user')}`\n\n{chat_input}\n\n" partial_words = "" counter=0 if stream: the_response = '' the_response_role = '' for chunk in completion: #Skipping first chunk if counter == 0: the_response_role = chunk.choices[0].delta.role chat_log_md += f"##### `{the_response_role}`\n\n" counter += 1 continue # print(('chunk', chunk)) if chunk.choices[0].finish_reason is None: the_response_chunk = chunk.choices[0].delta.content the_response += the_response_chunk chat_log_md += f"{the_response_chunk}" yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, "{}", props_json, '' else: chat_log.append([the_response_role, the_response]) chat_log_md += f"\n\n" yield json.dumps(new_state), chat_log, chat_log_md, chat_log_md, '{"msg": "stream模式不支持显示"}', props_json, '' # chat_last_resp = json.dumps(completion.__dict__) # chat_last_resp_dict = json.loads(chat_last_resp) # chat_last_resp_dict['api_key'] = "hidden by UI" # chat_last_resp_dict['organization'] = "hidden by UI" # chat_last_resp = json.dumps(chat_last_resp_dict) else: the_response_role = completion.choices[0].message.role the_response = completion.choices[0].message.content print(the_response) print('') chat_log.append([the_response_role, the_response]) chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n" chat_last_resp = json.dumps(completion.__dict__) chat_last_resp_dict = json.loads(chat_last_resp) chat_last_resp_dict['api_key'] = "hidden by UI" chat_last_resp_dict['organization'] = "hidden by UI" chat_last_resp = json.dumps(chat_last_resp_dict) return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, '' # chat_log.append([the_response_role, the_response]) # chat_log_md += f"##### `{the_response_role}`\n\n{the_response}\n\n" # return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, chat_last_resp, props_json, '' except Exception as error: print(error) print('error!!!!!!') chat_log_md = '' if chat_use_prompt: chat_log_md += '
(prompt)
\n\n' chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) chat_log_md += '\n---\n' if True: chat_log_md += '
(history)
\n\n' if chat_use_history else '
(not used history)
\n\n' chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", chat_log)]) chat_log_md += '\n---\n' # chat_log_md = '' # chat_log_md = "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", prompt_table)]) if chat_use_prompt else '' # chat_log_md += "\n".join([xx for xx in map(lambda it: f"##### `{it[0]}`\n\n{it[1]}\n\n", hist)]) chat_log_md += "\n" chat_log_md += str(error) return json.dumps(new_state), chat_log, chat_log_md, chat_log_md, None, props_json, chat_input def clear_history(): return [], "" def copy_history(txt): # print('\n\n copying') # print(txt) # print('\n\n') pass def update_saved_prompt_titles(global_state_json, selected_saved_prompt_title): print('') global_state = json.loads(global_state_json or "{}") print(global_state) print(selected_saved_prompt_title) saved_prompts = global_state.get('saved_prompts') or [] print(saved_prompts) the_choices = [(it.get('title') or '[untitled]') for it in saved_prompts] print(the_choices) print('') return gradio.Dropdown.update(choices=the_choices) def save_prompt(global_state_json, saved_prompts, prompt_title, prompt_table): the_choices = [] global_state = json.loads(global_state_json or "{}") saved_prompts = global_state.get('saved_prompts') or [] if len(saved_prompts): the_choices = [it.get('title') or '[untitled]' for it in saved_prompts] pass return global_state_json, gradio.Dropdown.update(choices=the_choices, value=prompt_title), prompt_title, prompt_table def load_saved_prompt(title): pass