qingxu99's picture
提升稳定性
e6cf553
raw
history blame
18.4 kB
import traceback
from toolbox import update_ui
def input_clipping(inputs, history, max_token_limit):
import tiktoken
import numpy as np
from toolbox import get_conf
enc = tiktoken.encoding_for_model(*get_conf('LLM_MODEL'))
def get_token_num(txt): return len(enc.encode(txt))
mode = 'input-and-history'
# 当 输入部分的token占比 小于 全文的一半时,只裁剪历史
input_token_num = get_token_num(inputs)
if input_token_num < max_token_limit//2:
mode = 'only-history'
max_token_limit = max_token_limit - input_token_num
everything = [inputs] if mode == 'input-and-history' else ['']
everything.extend(history)
n_token = get_token_num('\n'.join(everything))
everything_token = [get_token_num(e) for e in everything]
delta = max(everything_token) // 16 # 截断时的颗粒度
while n_token > max_token_limit:
where = np.argmax(everything_token)
encoded = enc.encode(everything[where])
clipped_encoded = encoded[:len(encoded)-delta]
everything[where] = enc.decode(clipped_encoded)[:-1] # -1 to remove the may-be illegal char
everything_token[where] = get_token_num(everything[where])
n_token = get_token_num('\n'.join(everything))
if mode == 'input-and-history':
inputs = everything[0]
else:
pass
history = everything[1:]
return inputs, history
def request_gpt_model_in_new_thread_with_ui_alive(
inputs, inputs_show_user, top_p, temperature,
chatbot, history, sys_prompt, refresh_interval=0.2,
handle_token_exceed=True,
retry_times_at_unknown_error=2,
):
"""
Request GPT model,请求GPT模型同时维持用户界面活跃。
输入参数 Args (以_array结尾的输入变量都是列表,列表长度为子任务的数量,执行时,会把列表拆解,放到每个子线程中分别执行):
inputs (string): List of inputs (输入)
inputs_show_user (string): List of inputs to show user(展现在报告中的输入,借助此参数,在汇总报告中隐藏啰嗦的真实输入,增强报告的可读性)
top_p (float): Top p value for sampling from model distribution (GPT参数,浮点数)
temperature (float): Temperature value for sampling from model distribution(GPT参数,浮点数)
chatbot: chatbot inputs and outputs (用户界面对话窗口句柄,用于数据流可视化)
history (list): List of chat history (历史,对话历史列表)
sys_prompt (string): List of system prompts (系统输入,列表,用于输入给GPT的前提提示,比如你是翻译官怎样怎样)
refresh_interval (float, optional): Refresh interval for UI (default: 0.2) (刷新时间间隔频率,建议低于1,不可高于3,仅仅服务于视觉效果)
handle_token_exceed:是否自动处理token溢出的情况,如果选择自动处理,则会在溢出时暴力截断,默认开启
retry_times_at_unknown_error:失败时的重试次数
输出 Returns:
future: 输出,GPT返回的结果
"""
import time
from concurrent.futures import ThreadPoolExecutor
from request_llm.bridge_chatgpt import predict_no_ui_long_connection
# 用户反馈
chatbot.append([inputs_show_user, ""])
msg = '正常'
yield from update_ui(chatbot=chatbot, history=[])
executor = ThreadPoolExecutor(max_workers=16)
mutable = ["", time.time()]
def _req_gpt(inputs, history, sys_prompt):
retry_op = retry_times_at_unknown_error
exceeded_cnt = 0
while True:
try:
# 【第一种情况】:顺利完成
result = predict_no_ui_long_connection(
inputs=inputs, top_p=top_p, temperature=temperature,
history=history, sys_prompt=sys_prompt, observe_window=mutable)
return result
except ConnectionAbortedError as token_exceeded_error:
# 【第二种情况】:Token溢出,
if handle_token_exceed:
exceeded_cnt += 1
# 【选择处理】 尝试计算比例,尽可能多地保留文本
from toolbox import get_reduce_token_percent
p_ratio, n_exceed = get_reduce_token_percent(str(token_exceeded_error))
MAX_TOKEN = 4096
EXCEED_ALLO = 512 + 512 * exceeded_cnt
inputs, history = input_clipping(inputs, history, max_token_limit=MAX_TOKEN-EXCEED_ALLO)
mutable[0] += f'[Local Message] 警告,文本过长将进行截断,Token溢出数:{n_exceed}。\n\n'
continue # 返回重试
else:
# 【选择放弃】
tb_str = '```\n' + traceback.format_exc() + '```'
mutable[0] += f"[Local Message] 警告,在执行过程中遭遇问题, Traceback:\n\n{tb_str}\n\n"
return mutable[0] # 放弃
except:
# 【第三种情况】:其他错误
tb_str = '```\n' + traceback.format_exc() + '```'
mutable[0] += f"[Local Message] 警告,在执行过程中遭遇问题, Traceback:\n\n{tb_str}\n\n"
if retry_op > 0:
retry_op -= 1
mutable[0] += f"[Local Message] 重试中 {retry_times_at_unknown_error-retry_op}/{retry_times_at_unknown_error}:\n\n"
time.sleep(5)
continue # 返回重试
else:
time.sleep(5)
return mutable[0] # 放弃
future = executor.submit(_req_gpt, inputs, history, sys_prompt)
while True:
# yield一次以刷新前端页面
time.sleep(refresh_interval)
# “喂狗”(看门狗)
mutable[1] = time.time()
if future.done():
break
chatbot[-1] = [chatbot[-1][0], mutable[0]]
msg = "正常"
yield chatbot, [], msg
return future.result()
def request_gpt_model_multi_threads_with_very_awesome_ui_and_high_efficiency(
inputs_array, inputs_show_user_array, top_p, temperature,
chatbot, history_array, sys_prompt_array,
refresh_interval=0.2, max_workers=10, scroller_max_len=30,
handle_token_exceed=True, show_user_at_complete=False,
retry_times_at_unknown_error=2,
):
"""
Request GPT model using multiple threads with UI and high efficiency
请求GPT模型的[多线程]版。
具备以下功能:
实时在UI上反馈远程数据流
使用线程池,可调节线程池的大小避免openai的流量限制错误
处理中途中止的情况
网络等出问题时,会把traceback和已经接收的数据转入输出
输入参数 Args (以_array结尾的输入变量都是列表,列表长度为子任务的数量,执行时,会把列表拆解,放到每个子线程中分别执行):
inputs_array (list): List of inputs (每个子任务的输入)
inputs_show_user_array (list): List of inputs to show user(每个子任务展现在报告中的输入,借助此参数,在汇总报告中隐藏啰嗦的真实输入,增强报告的可读性)
top_p (float): Top p value for sampling from model distribution (GPT参数,浮点数)
temperature (float): Temperature value for sampling from model distribution(GPT参数,浮点数)
chatbot: chatbot (用户界面对话窗口句柄,用于数据流可视化)
history_array (list): List of chat history (历史对话输入,双层列表,第一层列表是子任务分解,第二层列表是对话历史)
sys_prompt_array (list): List of system prompts (系统输入,列表,用于输入给GPT的前提提示,比如你是翻译官怎样怎样)
refresh_interval (float, optional): Refresh interval for UI (default: 0.2) (刷新时间间隔频率,建议低于1,不可高于3,仅仅服务于视觉效果)
max_workers (int, optional): Maximum number of threads (default: 10) (最大线程数,如果子任务非常多,需要用此选项防止高频地请求openai导致错误)
scroller_max_len (int, optional): Maximum length for scroller (default: 30)(数据流的显示最后收到的多少个字符,仅仅服务于视觉效果)
handle_token_exceed (bool, optional): (是否在输入过长时,自动缩减文本)
handle_token_exceed:是否自动处理token溢出的情况,如果选择自动处理,则会在溢出时暴力截断,默认开启
show_user_at_complete (bool, optional): (在结束时,把完整输入-输出结果显示在聊天框)
retry_times_at_unknown_error:子任务失败时的重试次数
输出 Returns:
list: List of GPT model responses (每个子任务的输出汇总,如果某个子任务出错,response中会携带traceback报错信息,方便调试和定位问题。)
"""
import time, random
from concurrent.futures import ThreadPoolExecutor
from request_llm.bridge_chatgpt import predict_no_ui_long_connection
assert len(inputs_array) == len(history_array)
assert len(inputs_array) == len(sys_prompt_array)
executor = ThreadPoolExecutor(max_workers=max_workers)
n_frag = len(inputs_array)
# 用户反馈
chatbot.append(["请开始多线程操作。", ""])
msg = '正常'
yield chatbot, [], msg
# 异步原子
mutable = [["", time.time(), "等待中"] for _ in range(n_frag)]
def _req_gpt(index, inputs, history, sys_prompt):
gpt_say = ""
retry_op = retry_times_at_unknown_error
exceeded_cnt = 0
mutable[index][2] = "执行中"
while True:
try:
# 【第一种情况】:顺利完成
# time.sleep(10); raise RuntimeError("测试")
gpt_say = predict_no_ui_long_connection(
inputs=inputs, top_p=top_p, temperature=temperature, history=history,
sys_prompt=sys_prompt, observe_window=mutable[index], console_slience=True
)
mutable[index][2] = "已成功"
return gpt_say
except ConnectionAbortedError as token_exceeded_error:
# 【第二种情况】:Token溢出,
if handle_token_exceed:
exceeded_cnt += 1
# 【选择处理】 尝试计算比例,尽可能多地保留文本
from toolbox import get_reduce_token_percent
p_ratio, n_exceed = get_reduce_token_percent(str(token_exceeded_error))
MAX_TOKEN = 4096
EXCEED_ALLO = 512 + 512 * exceeded_cnt
inputs, history = input_clipping(inputs, history, max_token_limit=MAX_TOKEN-EXCEED_ALLO)
gpt_say += f'[Local Message] 警告,文本过长将进行截断,Token溢出数:{n_exceed}。\n\n'
mutable[index][2] = f"截断重试"
continue # 返回重试
else:
# 【选择放弃】
tb_str = '```\n' + traceback.format_exc() + '```'
gpt_say += f"[Local Message] 警告,线程{index}在执行过程中遭遇问题, Traceback:\n\n{tb_str}\n\n"
if len(mutable[index][0]) > 0: gpt_say += "此线程失败前收到的回答:\n\n" + mutable[index][0]
mutable[index][2] = "输入过长已放弃"
return gpt_say # 放弃
except:
# 【第三种情况】:其他错误
tb_str = '```\n' + traceback.format_exc() + '```'
gpt_say += f"[Local Message] 警告,线程{index}在执行过程中遭遇问题, Traceback:\n\n{tb_str}\n\n"
if len(mutable[index][0]) > 0: gpt_say += "此线程失败前收到的回答:\n\n" + mutable[index][0]
if retry_op > 0:
retry_op -= 1
wait = random.randint(5, 20)
for i in range(wait):# 也许等待十几秒后,情况会好转
mutable[index][2] = f"等待重试 {wait-i}"; time.sleep(1)
mutable[index][2] = f"重试中 {retry_times_at_unknown_error-retry_op}/{retry_times_at_unknown_error}"
continue # 返回重试
else:
mutable[index][2] = "已失败"
wait = 5
time.sleep(5)
return gpt_say # 放弃
# 异步任务开始
futures = [executor.submit(_req_gpt, index, inputs, history, sys_prompt) for index, inputs, history, sys_prompt in zip(
range(len(inputs_array)), inputs_array, history_array, sys_prompt_array)]
cnt = 0
while True:
# yield一次以刷新前端页面
time.sleep(refresh_interval)
cnt += 1
worker_done = [h.done() for h in futures]
if all(worker_done):
executor.shutdown()
break
# 更好的UI视觉效果
observe_win = []
# print([mutable[thread_index][2] for thread_index, _ in enumerate(worker_done)])
# 每个线程都要“喂狗”(看门狗)
for thread_index, _ in enumerate(worker_done):
mutable[thread_index][1] = time.time()
# 在前端打印些好玩的东西
for thread_index, _ in enumerate(worker_done):
print_something_really_funny = "[ ...`"+mutable[thread_index][0][-scroller_max_len:].\
replace('\n', '').replace('```', '...').replace(
' ', '.').replace('<br/>', '.....').replace('$', '.')+"`... ]"
observe_win.append(print_something_really_funny)
stat_str = ''.join([f'`{mutable[thread_index][2]}`: {obs}\n\n'
if not done else f'`{mutable[thread_index][2]}`\n\n'
for thread_index, done, obs in zip(range(len(worker_done)), worker_done, observe_win)])
chatbot[-1] = [chatbot[-1][0], f'多线程操作已经开始,完成情况: \n\n{stat_str}' + ''.join(['.']*(cnt % 10+1))]
msg = "正常"
yield chatbot, [], msg
# 异步任务结束
gpt_response_collection = []
for inputs_show_user, f in zip(inputs_show_user_array, futures):
gpt_res = f.result()
gpt_response_collection.extend([inputs_show_user, gpt_res])
if show_user_at_complete:
for inputs_show_user, f in zip(inputs_show_user_array, futures):
gpt_res = f.result()
chatbot.append([inputs_show_user, gpt_res])
yield chatbot, [], msg
time.sleep(1)
return gpt_response_collection
def WithRetry(f):
"""
装饰器函数,用于自动重试。
"""
def decorated(retry, res_when_fail, *args, **kwargs):
assert retry >= 0
while True:
try:
res = yield from f(*args, **kwargs)
return res
except:
retry -= 1
if retry<0:
print("达到最大重试次数")
break
else:
print("重试中……")
continue
return res_when_fail
return decorated
def breakdown_txt_to_satisfy_token_limit(txt, get_token_fn, limit):
def cut(txt_tocut, must_break_at_empty_line): # 递归
if get_token_fn(txt_tocut) <= limit:
return [txt_tocut]
else:
lines = txt_tocut.split('\n')
estimated_line_cut = limit / get_token_fn(txt_tocut) * len(lines)
estimated_line_cut = int(estimated_line_cut)
for cnt in reversed(range(estimated_line_cut)):
if must_break_at_empty_line:
if lines[cnt] != "":
continue
print(cnt)
prev = "\n".join(lines[:cnt])
post = "\n".join(lines[cnt:])
if get_token_fn(prev) < limit:
break
if cnt == 0:
print('what the fuck ?')
raise RuntimeError("存在一行极长的文本!")
# print(len(post))
# 列表递归接龙
result = [prev]
result.extend(cut(post, must_break_at_empty_line))
return result
try:
return cut(txt, must_break_at_empty_line=True)
except RuntimeError:
return cut(txt, must_break_at_empty_line=False)
def breakdown_txt_to_satisfy_token_limit_for_pdf(txt, get_token_fn, limit):
def cut(txt_tocut, must_break_at_empty_line): # 递归
if get_token_fn(txt_tocut) <= limit:
return [txt_tocut]
else:
lines = txt_tocut.split('\n')
estimated_line_cut = limit / get_token_fn(txt_tocut) * len(lines)
estimated_line_cut = int(estimated_line_cut)
cnt = 0
for cnt in reversed(range(estimated_line_cut)):
if must_break_at_empty_line:
if lines[cnt] != "":
continue
print(cnt)
prev = "\n".join(lines[:cnt])
post = "\n".join(lines[cnt:])
if get_token_fn(prev) < limit:
break
if cnt == 0:
# print('what the fuck ? 存在一行极长的文本!')
raise RuntimeError("存在一行极长的文本!")
# print(len(post))
# 列表递归接龙
result = [prev]
result.extend(cut(post, must_break_at_empty_line))
return result
try:
return cut(txt, must_break_at_empty_line=True)
except RuntimeError:
try:
return cut(txt, must_break_at_empty_line=False)
except RuntimeError:
# 这个中文的句号是故意的,作为一个标识而存在
res = cut(txt.replace('.', '。\n'), must_break_at_empty_line=False)
return [r.replace('。\n', '.') for r in res]