File size: 10,468 Bytes
9f9848c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
"""
该文件中主要包含三个函数
不具备多线程能力的函数:
1. predict: 正常对话时使用,具备完备的交互功能,不可多线程
具备多线程调用能力的函数
2. predict_no_ui:高级实验性功能模块调用,不会实时显示在界面上,参数简单,可以多线程并行,方便实现复杂的功能逻辑
3. predict_no_ui_long_connection:在实验过程中发现调用predict_no_ui处理长文档时,和openai的连接容易断掉,这个函数用stream的方式解决这个问题,同样支持多线程
"""
import logging
import traceback
import importlib
import openai
import time
# 读取config.py文件中关于AZURE OPENAI API的信息
from toolbox import get_conf, update_ui, clip_history, trimmed_format_exc
TIMEOUT_SECONDS, MAX_RETRY, AZURE_ENGINE, AZURE_ENDPOINT, AZURE_API_VERSION, AZURE_API_KEY = \
get_conf('TIMEOUT_SECONDS', 'MAX_RETRY',"AZURE_ENGINE","AZURE_ENDPOINT", "AZURE_API_VERSION", "AZURE_API_KEY")
def get_full_error(chunk, stream_response):
"""
获取完整的从Openai返回的报错
"""
while True:
try:
chunk += next(stream_response)
except:
break
return chunk
def predict(inputs, llm_kwargs, plugin_kwargs, chatbot, history=[], system_prompt='', stream = True, additional_fn=None):
"""
发送至azure openai api,流式获取输出。
用于基础的对话功能。
inputs 是本次问询的输入
top_p, temperature是chatGPT的内部调优参数
history 是之前的对话列表(注意无论是inputs还是history,内容太长了都会触发token数量溢出的错误)
chatbot 为WebUI中显示的对话列表,修改它,然后yeild出去,可以直接修改对话界面内容
additional_fn代表点击的哪个按钮,按钮见functional.py
"""
print(llm_kwargs["llm_model"])
if additional_fn is not None:
import core_functional
importlib.reload(core_functional) # 热更新prompt
core_functional = core_functional.get_core_functions()
if "PreProcess" in core_functional[additional_fn]: inputs = core_functional[additional_fn]["PreProcess"](inputs) # 获取预处理函数(如果有的话)
inputs = core_functional[additional_fn]["Prefix"] + inputs + core_functional[additional_fn]["Suffix"]
raw_input = inputs
logging.info(f'[raw_input] {raw_input}')
chatbot.append((inputs, ""))
yield from update_ui(chatbot=chatbot, history=history, msg="等待响应") # 刷新界面
payload = generate_azure_payload(inputs, llm_kwargs, history, system_prompt, stream)
history.append(inputs); history.append("")
retry = 0
while True:
try:
openai.api_type = "azure"
openai.api_version = AZURE_API_VERSION
openai.api_base = AZURE_ENDPOINT
openai.api_key = AZURE_API_KEY
response = openai.ChatCompletion.create(timeout=TIMEOUT_SECONDS, **payload);break
except:
retry += 1
chatbot[-1] = ((chatbot[-1][0], "获取response失败,重试中。。。"))
retry_msg = f",正在重试 ({retry}/{MAX_RETRY}) ……" if MAX_RETRY > 0 else ""
yield from update_ui(chatbot=chatbot, history=history, msg="请求超时"+retry_msg) # 刷新界面
if retry > MAX_RETRY: raise TimeoutError
gpt_replying_buffer = ""
is_head_of_the_stream = True
if stream:
stream_response = response
while True:
try:
chunk = next(stream_response)
except StopIteration:
from toolbox import regular_txt_to_markdown; tb_str = '```\n' + trimmed_format_exc() + '```'
chatbot[-1] = (chatbot[-1][0], f"[Local Message] 远程返回错误: \n\n{tb_str} \n\n{regular_txt_to_markdown(chunk)}")
yield from update_ui(chatbot=chatbot, history=history, msg="远程返回错误:" + chunk) # 刷新界面
return
if is_head_of_the_stream and (r'"object":"error"' not in chunk):
# 数据流的第一帧不携带content
is_head_of_the_stream = False; continue
if chunk:
#print(chunk)
try:
if "delta" in chunk["choices"][0]:
if chunk["choices"][0]["finish_reason"] == "stop":
logging.info(f'[response] {gpt_replying_buffer}')
break
status_text = f"finish_reason: {chunk['choices'][0]['finish_reason']}"
gpt_replying_buffer = gpt_replying_buffer + chunk["choices"][0]["delta"]["content"]
history[-1] = gpt_replying_buffer
chatbot[-1] = (history[-2], history[-1])
yield from update_ui(chatbot=chatbot, history=history, msg=status_text) # 刷新界面
except Exception as e:
traceback.print_exc()
yield from update_ui(chatbot=chatbot, history=history, msg="Json解析不合常规") # 刷新界面
chunk = get_full_error(chunk, stream_response)
error_msg = chunk
yield from update_ui(chatbot=chatbot, history=history, msg="Json异常" + error_msg) # 刷新界面
return
def predict_no_ui_long_connection(inputs, llm_kwargs, history=[], sys_prompt="", observe_window=None, console_slience=False):
"""
发送至AZURE OPENAI API,等待回复,一次性完成,不显示中间过程。但内部用stream的方法避免中途网线被掐。
inputs:
是本次问询的输入
sys_prompt:
系统静默prompt
llm_kwargs:
chatGPT的内部调优参数
history:
是之前的对话列表
observe_window = None:
用于负责跨越线程传递已经输出的部分,大部分时候仅仅为了fancy的视觉效果,留空即可。observe_window[0]:观测窗。observe_window[1]:看门狗
"""
watch_dog_patience = 5 # 看门狗的耐心, 设置5秒即可
payload = generate_azure_payload(inputs, llm_kwargs, history, system_prompt=sys_prompt, stream=True)
retry = 0
while True:
try:
openai.api_type = "azure"
openai.api_version = AZURE_API_VERSION
openai.api_base = AZURE_ENDPOINT
openai.api_key = AZURE_API_KEY
response = openai.ChatCompletion.create(timeout=TIMEOUT_SECONDS, **payload);break
except:
retry += 1
traceback.print_exc()
if retry > MAX_RETRY: raise TimeoutError
if MAX_RETRY!=0: print(f'请求超时,正在重试 ({retry}/{MAX_RETRY}) ……')
stream_response = response
result = ''
while True:
try: chunk = next(stream_response)
except StopIteration:
break
except:
chunk = next(stream_response) # 失败了,重试一次?再失败就没办法了。
if len(chunk)==0: continue
if not chunk.startswith('data:'):
error_msg = get_full_error(chunk, stream_response)
if "reduce the length" in error_msg:
raise ConnectionAbortedError("AZURE OPENAI API拒绝了请求:" + error_msg)
else:
raise RuntimeError("AZURE OPENAI API拒绝了请求:" + error_msg)
if ('data: [DONE]' in chunk): break
delta = chunk["delta"]
if len(delta) == 0: break
if "role" in delta: continue
if "content" in delta:
result += delta["content"]
if not console_slience: print(delta["content"], end='')
if observe_window is not None:
# 观测窗,把已经获取的数据显示出去
if len(observe_window) >= 1: observe_window[0] += delta["content"]
# 看门狗,如果超过期限没有喂狗,则终止
if len(observe_window) >= 2:
if (time.time()-observe_window[1]) > watch_dog_patience:
raise RuntimeError("用户取消了程序。")
else: raise RuntimeError("意外Json结构:"+delta)
if chunk['finish_reason'] == 'length':
raise ConnectionAbortedError("正常结束,但显示Token不足,导致输出不完整,请削减单次输入的文本量。")
return result
def generate_azure_payload(inputs, llm_kwargs, history, system_prompt, stream):
"""
整合所有信息,选择LLM模型,生成 azure openai api请求,为发送请求做准备
"""
conversation_cnt = len(history) // 2
messages = [{"role": "system", "content": system_prompt}]
if conversation_cnt:
for index in range(0, 2*conversation_cnt, 2):
what_i_have_asked = {}
what_i_have_asked["role"] = "user"
what_i_have_asked["content"] = history[index]
what_gpt_answer = {}
what_gpt_answer["role"] = "assistant"
what_gpt_answer["content"] = history[index+1]
if what_i_have_asked["content"] != "":
if what_gpt_answer["content"] == "": continue
messages.append(what_i_have_asked)
messages.append(what_gpt_answer)
else:
messages[-1]['content'] = what_gpt_answer['content']
what_i_ask_now = {}
what_i_ask_now["role"] = "user"
what_i_ask_now["content"] = inputs
messages.append(what_i_ask_now)
payload = {
"model": llm_kwargs['llm_model'],
"messages": messages,
"temperature": llm_kwargs['temperature'], # 1.0,
"top_p": llm_kwargs['top_p'], # 1.0,
"n": 1,
"stream": stream,
"presence_penalty": 0,
"frequency_penalty": 0,
"engine": AZURE_ENGINE
}
try:
print(f" {llm_kwargs['llm_model']} : {conversation_cnt} : {inputs[:100]} ..........")
except:
print('输入中可能存在乱码。')
return payload
|