# import gradio as gr import gradio # import lmdb # import base64 # import io import random import time import os import re import sys import json import copy # import sqlite3 import hashlib import uuid from urllib.parse import urljoin import openai def get_random_sleep(base_time, random_range): return (base_time + random.randint(-random_range, random_range))*0.001 def js_load(txt): try: return json.loads(txt) except Exception as error: print('') print('js_load:') print(str(error)) print('') return None def js_dump(thing): try: return json.dumps(thing) except Exception as error: print('') print('js_dump:') print(str(error)) print('') return None def filtered_history(history, num=0): if num > 0: filtered = list(filter(lambda it:(it['type'] in ['request', 'response']), history)) return filtered[-num:] return [] def filtered_history_messages(history, num=0): filtered = filtered_history(history, num) return list(map(lambda it:{'role': it.get('role'), 'content': it.get('content')}, filtered)) def make_md_line(role, content): return f"""\n##### `{role}`\n\n{content}\n""" def make_md_by_history(history): md = "" for item in history: md += make_md_line(item.get('role'), item.get('content')) return md def make_history_file_fn(history): uuid4 = str(uuid.uuid4()) json_file_path = None md_file_path = None try: # 如果目录不存在,则创建目录 os.makedirs('temp_files', exist_ok=True) json_file_content = json.dumps(history) json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') with open(json_file_path, 'w') as f: f.write(json_file_content) md_file_content = make_md_by_history(history) md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') with open(md_file_path, 'w') as f: f.write(md_file_content) return json_file_path, md_file_path, gradio.update(visible=True) except Exception as error: print(f"\n{error}\n") return json_file_path, md_file_path, gradio.update(visible=True) def make_history_file_fn__(history): uuid4 = str(uuid.uuid4()) try: json_file_content = json.dumps(history) json_file_path = os.path.join('temp_files', f'history[{uuid4}].json') with open(json_file_path, 'w') as f: f.write(json_file_content) except Exception as error: print(f"\n{error}\n") json_file_path = None try: md_file_content = make_md_by_history(history) md_file_path = os.path.join('temp_files', f'history[{uuid4}].md') with open(md_file_path, 'w') as f: f.write(md_file_content) except Exception as error: print(f"\n{error}\n") md_file_path = None return json_file_path, md_file_path, gradio.update(visible=True) def make_user_message_list_fn__( user_message_template, # 模板,套用到每一条消息上 user_message_template_mask, # 模板中要被替换的部分 user_message_template_mask_is_regex, # 决定如何构造用于替换的正则表达式 user_message_list_text, # 一段文本,包含了每一条用户消息 user_message_list_text_splitter, # 描述了应该以什么为线索来切分 user_message_list_text user_message_list_text_splitter_is_regex, # 决定如何进行切分 ) -> list: # 返回套用了模板的用户信息列表 # 这个实现首先根据是否使用正则表达式来切分用户消息列表文本,并将切分后的消息存储在一个列表中。 # 然后,针对每个消息,根据user_message_template_mask及user_message_template_mask_is_regex替换模板中的部分内容, # 并将替换后的结果添加到结果列表中。 # 最后,返回结果列表。 # 切分用户消息列表文本 if user_message_list_text_splitter_is_regex: user_messages = re.split(user_message_list_text_splitter, user_message_list_text) else: user_messages = user_message_list_text.split(user_message_list_text_splitter) # 生成套用模板的用户信息列表 user_message_result_list = [] for message in user_messages: # 替换模板内容 if user_message_template_mask_is_regex: transformed_message = re.sub(user_message_template_mask, message, user_message_template) else: transformed_message = user_message_template.replace(user_message_template_mask, message) user_message_result_list.append(transformed_message) return user_message_result_list def make_user_message_list_fn( user_message_template, user_message_template_mask, user_message_template_mask_is_regex, user_message_list_text, user_message_list_text_splitter, user_message_list_text_splitter_is_regex, ) -> list: # 实际上,只要保证在使用正则表达式进行替换或切分操作之前,已经将其编译为正则表达式对象即可。 # 在我的修改中,针对 xxx_is_regex 参数为 True 的情况,将这些参数编译成正则表达式。 # 对于替换操作和切分操作,只需检查是否已经编译为正则表达式,并使用相应的方法即可。 # 编译正则表达式 if user_message_template_mask_is_regex: user_message_template_mask = re.compile(user_message_template_mask) if user_message_list_text_splitter_is_regex: user_message_list_text_splitter = re.compile(user_message_list_text_splitter) # 切分用户消息列表文本 if user_message_list_text_splitter_is_regex: user_messages = user_message_list_text_splitter.split(user_message_list_text) else: user_messages = user_message_list_text.split(user_message_list_text_splitter) # 生成套用模板的用户信息列表 user_message_result_list = [] for message in user_messages: # 替换模板内容 if user_message_template_mask_is_regex: transformed_message = user_message_template_mask.sub(message, user_message_template) else: transformed_message = user_message_template.replace(user_message_template_mask, message) user_message_result_list.append(transformed_message) return user_message_result_list def sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): # print("\n\n") assistant_message = "" tips = "" try: openai.api_key = api_key_text completion = openai.ChatCompletion.create(**payload) if payload.get('stream'): print('assistant:') # print('->>>') is_first=True for chunk in completion: if is_first: is_first = False continue if chunk.choices[0].finish_reason is None: # sys.stdout.write("\r") print(chunk.choices[0].delta.content or '', end="") assistant_message += chunk.choices[0].delta.content or '' # print(f"\033[2K{assistant_message}", end="") history_md_stream = make_md_line('assistant', assistant_message) tips = 'streaming' yield assistant_message, history_md_stream, tips, history else: pass pass # print('=>>>') print('') pass else: assistant_message = completion.choices[0].message.content history_md_stream = make_md_line('assistant', assistant_message) tips = 'got' print('assistant:') print(assistant_message) yield assistant_message, history_md_stream, tips, history pass except Exception as error: tips = str(error) history.append({"role": "app", "content": tips}) print(f"\n{tips}\n") yield assistant_message, history_md_stream, tips, history pass # print("\n\n") def sequential_chat_fn( history, system_prompt_enabled, system_prompt, user_message_template, user_message_template_mask, user_message_template_mask_is_regex, user_message_list_text, user_message_list_text_splitter, user_message_list_text_splitter_is_regex, history_prompt_num, api_key_text, token_text, sleep_base, sleep_rand, prop_stream, prop_model, prop_temperature, prop_top_p, prop_choices_num, prop_max_tokens, prop_presence_penalty, prop_frequency_penalty, prop_logit_bias, ): # outputs=[ # history, # history_md_stable, # history_md_stream, # tips, # file_row, # ], history_md_stable = "" history_md_stream = "" tips = "" try: user_message_list = make_user_message_list_fn( user_message_template, user_message_template_mask, user_message_template_mask_is_regex, user_message_list_text, user_message_list_text_splitter, user_message_list_text_splitter_is_regex, ) payload = { 'model': prop_model, 'temperature': prop_temperature, 'top_p': prop_top_p, 'n': prop_choices_num, 'stream': prop_stream, 'presence_penalty': prop_presence_penalty, 'frequency_penalty': prop_frequency_penalty, 'user': token_text, } if prop_max_tokens>0: payload['max_tokens'] = prop_max_tokens # if prop_logit_bias is not None: # payload['logit_bias'] = prop_logit_bias # headers = { # "Content-Type": "application/json", # "Authorization": f"Bearer {api_key_text}" # } for user_message in user_message_list: print('') print(f'user({token_text}):') print(user_message) print('') # make the_messages to sent the_messages = [] if system_prompt_enabled: the_messages.append({"role": "system", "content": system_prompt}) for msg in filtered_history_messages(history, num=history_prompt_num): the_messages.append(msg) the_messages.append({"role": "user", "content": user_message}) payload['messages'] = the_messages history.append({"role": "user", "content": user_message, "type": "request", "payload": payload}) history_md_stable = make_md_by_history(history) history_md_stream = "" tips = "" yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) try: for (assistant_message, history_md_stream, tips, history) in sequential_chat_once_fn(payload, api_key_text, history, history_md_stable, history_md_stream, tips): yield history, history_md_stable, history_md_stream, tips, gradio.update() history.append({"role": "assistant", "content": assistant_message, "type": "request"}) history_md_stable += history_md_stream history_md_stream = "" tips = "fine" yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) except Exception as error: tips = f'error: {str(error)}' history.append({"role": "app", "content": tips}) print(f"\n{tips}\n") yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) time.sleep(get_random_sleep(sleep_base, sleep_rand)) pass except Exception as error: tips = str(error) history.append({"role": "app", "content": tips}) print(f"\n{tips}\n") yield history, history_md_stable, history_md_stream, tips, gradio.update(visible=False) pass def on_click_send_btn( global_state_json, api_key_text, chat_input_role, chat_input, prompt_table, chat_use_prompt, chat_use_history, chat_log, chat_model, temperature, top_p, choices_num, stream, max_tokens, presence_penalty, frequency_penalty, logit_bias, ): old_state = json.loads(global_state_json or "{}") print('\n\n\n\n\n') print(prompt_table) prompt_table = prompt_table or [] chat_log = chat_log or [] chat_log_md = '' if chat_use_prompt: chat_log_md += '