import json import gradio as gr import openai import os import sys import traceback # import markdown my_api_key = "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" # 在这里输入你的 API 密钥 initial_prompt = "You are a helpful assistant." if my_api_key == "empty": print("Please give a api key!") sys.exit(1) if my_api_key == "": initial_keytxt = None elif len(str(my_api_key)) == 51: # initial_keytxt = "默认api-key(未验证):" + str(my_api_key[:4] + "..." + my_api_key[-4:]) initial_keytxt = "Zhou Yang Assistant !" else: initial_keytxt = "默认api-key无效,请重新输入" def parse_text(text): lines = text.split("\n") count = 0 for i,line in enumerate(lines): if "```" in line: count += 1 items = line.split('`') if count % 2 == 1: lines[i] = f'
'
            else:
                lines[i] = f'
' else: if i > 0: if count % 2 == 1: line = line.replace("&", "&") line = line.replace("\"", """) line = line.replace("\'", "'") line = line.replace("<", "<") line = line.replace(">", ">") line = line.replace(" ", " ") lines[i] = '
'+line return "".join(lines) def get_response(system, context, myKey, raw = False): openai.api_key = myKey response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[system, *context], ) openai.api_key = "" if raw: return response else: statistics = f'本次对话Tokens用量【{response["usage"]["total_tokens"]} / 4096】 ( 提问+上文 {response["usage"]["prompt_tokens"]},回答 {response["usage"]["completion_tokens"]} )' message = response["choices"][0]["message"]["content"] message_with_stats = f'{message}\n\n================\n\n{statistics}' # message_with_stats = markdown.markdown(message_with_stats) return message, parse_text(message_with_stats) def predict(chatbot, input_sentence, system, context, myKey): if len(input_sentence) == 0: return [] context.append({"role": "user", "content": f"{input_sentence}"}) try: message, message_with_stats = get_response(system, context, myKey) except openai.error.AuthenticationError: chatbot.append((input_sentence, "请求失败,请检查API-key是否正确。")) return chatbot, context except openai.error.Timeout: chatbot.append((input_sentence, "请求超时,请检查网络连接。")) return chatbot, context except openai.error.APIConnectionError: chatbot.append((input_sentence, "连接失败,请检查网络连接。")) return chatbot, context except openai.error.RateLimitError: chatbot.append((input_sentence, "请求过于频繁,请5s后再试。")) return chatbot, context except: chatbot.append((input_sentence, "发生了未知错误Orz")) return chatbot, context context.append({"role": "assistant", "content": message}) chatbot.append((input_sentence, message_with_stats)) return chatbot, context def retry(chatbot, system, context, myKey): if len(context) == 0: return [], [] try: message, message_with_stats = get_response(system, context[:-1], myKey) except openai.error.AuthenticationError: chatbot.append(("重试请求", "请求失败,请检查API-key是否正确。")) return chatbot, context except openai.error.Timeout: chatbot.append(("重试请求", "请求超时,请检查网络连接。")) return chatbot, context except openai.error.APIConnectionError: chatbot.append(("重试请求", "连接失败,请检查网络连接。")) return chatbot, context except openai.error.RateLimitError: chatbot.append(("重试请求", "请求过于频繁,请5s后再试。")) return chatbot, context except: chatbot.append(("重试请求", "发生了未知错误Orz")) return chatbot, context context[-1] = {"role": "assistant", "content": message} chatbot[-1] = (context[-2]["content"], message_with_stats) return chatbot, context def delete_last_conversation(chatbot, context): if len(context) == 0: return [], [] chatbot = chatbot[:-1] context = context[:-2] return chatbot, context def reduce_token(chatbot, system, context, myKey): context.append({"role": "user", "content": "请帮我总结一下上述对话的内容,实现减少tokens的同时,保证对话的质量。在总结中不要加入这一句话。"}) response = get_response(system, context, myKey, raw=True) statistics = f'本次对话Tokens用量【{response["usage"]["completion_tokens"]+12+12+8} / 4096】' optmz_str = parse_text( f'好的,我们之前聊了:{response["choices"][0]["message"]["content"]}\n\n================\n\n{statistics}' ) chatbot.append(("请帮我总结一下上述对话的内容,实现减少tokens的同时,保证对话的质量。", optmz_str)) context = [] context.append({"role": "user", "content": "我们之前聊了什么?"}) context.append({"role": "assistant", "content": f'我们之前聊了:{response["choices"][0]["message"]["content"]}'}) return chatbot, context def save_chat_history(filepath, system, context): if filepath == "": return history = {"system": system, "context": context} with open(f"{filepath}.json", "w") as f: json.dump(history, f) def load_chat_history(fileobj): with open(fileobj.name, "r") as f: history = json.load(f) context = history["context"] chathistory = [] for i in range(0, len(context), 2): chathistory.append((parse_text(context[i]["content"]), parse_text(context[i+1]["content"]))) return chathistory , history["system"], context, history["system"]["content"] def get_history_names(): with open("history.json", "r") as f: history = json.load(f) return list(history.keys()) def reset_state(): return [], [] def update_system(new_system_prompt): return {"role": "system", "content": new_system_prompt} def set_apikey(new_api_key, myKey): old_api_key = myKey try: get_response(update_system(initial_prompt), [{"role": "user", "content": "test"}], new_api_key) except openai.error.AuthenticationError: return "无效的api-key", myKey except openai.error.Timeout: return "请求超时,请检查网络设置", myKey except openai.error.APIConnectionError: return "网络错误", myKey except: return "发生了未知错误Orz", myKey encryption_str = "验证成功,api-key已做遮挡处理:" + new_api_key[:4] + "..." + new_api_key[-4:] return encryption_str, new_api_key with gr.Blocks() as demo: keyTxt = gr.Button(initial_keytxt) chatbot = gr.Chatbot().style(color_map=("#1D51EE", "#585A5B")) context = gr.State([]) systemPrompt = gr.State(update_system(initial_prompt)) myKey = gr.State(my_api_key) topic = gr.State("未命名对话历史记录") with gr.Row(): with gr.Column(scale=12): txt = gr.Textbox(show_label=False, placeholder="在这里输入").style(container=False) with gr.Column(min_width=50, scale=1): submitBtn = gr.Button("🚀", variant="primary") with gr.Row(): emptyBtn = gr.Button("🧹 新的对话") retryBtn = gr.Button("🔄 重新生成") delLastBtn = gr.Button("🗑️ 删除上条对话") reduceTokenBtn = gr.Button("♻️ 优化Tokens") newSystemPrompt = gr.Textbox(show_label=True, placeholder=f"在这里输入新的System Prompt...", label="更改 System prompt").style(container=True) systemPromptDisplay = gr.Textbox(show_label=True, value=initial_prompt, interactive=False, label="目前的 System prompt").style(container=True) with gr.Accordion(label="保存/加载对话历史记录(在文本框中输入文件名,点击“保存对话”按钮,历史记录文件会被存储到本地)", open=False): with gr.Column(): with gr.Row(): with gr.Column(scale=6): saveFileName = gr.Textbox(show_label=True, placeholder=f"在这里输入保存的文件名...", label="保存对话", value="对话历史记录").style(container=True) with gr.Column(scale=1): saveBtn = gr.Button("💾 保存对话") uploadBtn = gr.UploadButton("📂 读取对话", file_count="single", file_types=["json"]) txt.submit(predict, [chatbot, txt, systemPrompt, context, myKey], [chatbot, context], show_progress=True) txt.submit(lambda :"", None, txt) submitBtn.click(predict, [chatbot, txt, systemPrompt, context, myKey], [chatbot, context], show_progress=True) submitBtn.click(lambda :"", None, txt) emptyBtn.click(reset_state, outputs=[chatbot, context]) newSystemPrompt.submit(update_system, newSystemPrompt, systemPrompt) newSystemPrompt.submit(lambda x: x, newSystemPrompt, systemPromptDisplay) newSystemPrompt.submit(lambda :"", None, newSystemPrompt) retryBtn.click(retry, [chatbot, systemPrompt, context, myKey], [chatbot, context], show_progress=True) delLastBtn.click(delete_last_conversation, [chatbot, context], [chatbot, context], show_progress=True) reduceTokenBtn.click(reduce_token, [chatbot, systemPrompt, context, myKey], [chatbot, context], show_progress=True) # keyTxt.submit(set_apikey, [keyTxt, myKey], [keyTxt, myKey], show_progress=False) uploadBtn.upload(load_chat_history, uploadBtn, [chatbot, systemPrompt, context, systemPromptDisplay], show_progress=True) saveBtn.click(save_chat_history, [saveFileName, systemPrompt, context], None, show_progress=True) # demo.launch(server_name="0.0.0.0", server_port=9000) demo.launch()