mkw18
fix
f7c5b52
import gradio as gr
import mdtex2html
import random as rd
import os
import json
import time
import openai
import requests
from nltk.translate.bleu_score import sentence_bleu
import time
openai.api_key = os.environ.get('APIKEY')
rd.seed(time.time())
def postprocess(self, y):
if y is None:
return []
for i, (message, response) in enumerate(y):
y[i] = (
None if message is None else mdtex2html.convert((message)),
None if response is None else mdtex2html.convert(response),
)
return y
gr.Chatbot.postprocess = postprocess
def parse_text(text):
"""copy from https://github.com/GaiZhenbiao/ChuanhuChatGPT/"""
lines = text.split("\n")
lines = [line for line in lines if line != ""]
count = 0
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split('`')
if count % 2 == 1:
lines[i] = f'<pre><code class="language-{items[-1]}">'
else:
lines[i] = f'<br></code></pre>'
else:
if i > 0:
if count % 2 == 1:
line = line.replace("`", "\`")
line = line.replace("<", "&lt;")
line = line.replace(">", "&gt;")
line = line.replace(" ", "&nbsp;")
line = line.replace("*", "&ast;")
line = line.replace("_", "&lowbar;")
line = line.replace("-", "&#45;")
line = line.replace(".", "&#46;")
line = line.replace("!", "&#33;")
line = line.replace("(", "&#40;")
line = line.replace(")", "&#41;")
line = line.replace("$", "&#36;")
lines[i] = "<br>"+line
text = "".join(lines)
return text
def showInput(input, chatbot):
chatbot.append((parse_text(input), ""))
return chatbot
def predict(input, chatbot, messages, idx, answer, story_key, answer_key, known, bingo, reasoning, history, zh):
start = time.time()
chatbot.append((parse_text(input), ""))
messages1 = messages[:10].copy()
if len(known) > 0:
messages1 += [{"role": 'user', "content": f"{' '.join(known)}\n请回答是或否或无关。"}, {"role": "assistant", "content": '是。'}, {"role": 'user', "content": f"{input}\n请回答是或否或无关。"}] if zh else [{"role": 'user', "content": f"{' '.join(known)}\nPlease answer with \"yes\", \"no\", or \"irrelevant\"."}, {"role": "assistant", "content": 'Yes.'}, {"role": 'user', "content": f"{input}\nPlease answer with \"yes\", \"no\", or \"irrelevant\"."}]
else:
messages1 += [{"role": 'user', "content": f"{input}\n请回答是或否或无关。"}] if zh else [{"role": 'user', "content": f"{input}\nPlease answer with \"yes\", \"no\", or \"irrelevant\"."}]
print(f"Init: {time.time() - start}")
messages.append({"role": 'user', "content": input})
llm = True
finished = False
response = ''
print(f"Start judge: {time.time() - start}")
for key in story_key:
key = key.strip()
if key == '':
continue
if key[1] == '.' or key[2] == '.' or key[0] == '-':
key = ' '.join(key.split(' ')[1:])
bleu = sentence_bleu([key], input.replace('?', '。').replace('?', '.'), weights=(1, 0, 0, 0))
if bleu >= 0.85:
response = '这是汤面中已有的信息,请提一个新问题。' if zh else 'This Information is already in the story, please ask a new question.'
llm = False
break
print(f"Filter story: {time.time() - start}")
if llm:
for key in history:
key = key.strip()
if key == '':
continue
bleu = sentence_bleu([key], input.replace('?', '。').replace('?', '.'), weights=(1, 0, 0, 0))
if bleu >= 0.95:
response = '这是已经提问过的内容,请提一个新问题。' if zh else 'This is a question that has already been asked. Please ask a new question.'
llm = False
break
print(f"Filter history: {time.time() - start}")
if llm:
history.append(input.replace('?', '。').replace('?', '.'))
data = {'predict': messages1, 'idx': idx, 'isfinished': False, 'answer': answer}
print(f"Start Request 1: {time.time() - start}")
completion=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
print(f"Request 1: {time.time() - start}")
if completion.status_code == 200:
response = str(completion.content, encoding="utf-8")
else:
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages1,
temperature=0
)
response=completion.choices[0].message.content.strip()
print(f"Request openai 1: {time.time() - start}")
relevant = False
if response.startswith("是") or response.startswith("Yes") or response.startswith("yes"):
decl_msg = [{"role": "user", "content": f"请将以下内容转述为陈述句,并简化为一句话:\n{input}" if zh else f"Please restate the following content as a declarative sentence and simplify it into one sentence:\n{input}"}]
data = {'predict': decl_msg, 'idx': idx, 'isfinished': False, 'answer': answer}
print(f"Start Request 2: {time.time() - start}")
completion=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
print(f"Request 2: {time.time() - start}")
if completion.status_code == 200:
summary = str(completion.content, encoding="utf-8")
else:
summary = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=decl_msg,
temperature=0
)
summary = summary.choices[0].message.content.strip()
print(f"Request openai 2: {time.time() - start}")
relevant = True
elif response.startswith("不是") or response.startswith("否") or response.startswith("No") or response.startswith("no"):
decl_msg = [{"role": "user", "content": f"请将以下内容取反义然后转述为陈述句,并简化为一句话:\n{input}" if zh else f"Please restate the following content as a declarative sentence by using the opposite meaning and then simplify it into one sentence:\n{input}"}]
data = {'predict': decl_msg, 'idx': idx, 'isfinished': False, 'answer': answer}
print(f"Start Request 2: {time.time() - start}")
completion=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
print(f"Request 2: {time.time() - start}")
if completion.status_code == 200:
summary = str(completion.content, encoding="utf-8")
else:
summary = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=decl_msg,
temperature=0
)
summary = summary.choices[0].message.content.strip()
print(f"Request openai 2: {time.time() - start}")
relevant = True
if relevant:
history.append(summary)
known.append(summary)
reasoning.append(summary)
if len(reasoning) >= 2:
simp_msg = [{"role": "user", "content": f"请将以下内容简化为一句话:\n{' '.join(reasoning)}" if zh else f"Please simplify the following content into one sentence:\n{' '.join(reasoning)}"}]
data = {'predict': simp_msg, 'idx': idx, 'isfinished': False, 'answer': answer}
print(f"Start Request 3: {time.time() - start}")
completion=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
print(f"Request 3: {time.time() - start}")
if completion.status_code == 200:
merge = str(completion.content, encoding="utf-8")
else:
merge = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=simp_msg,
temperature=0
)
merge = merge.choices[0].message.content.strip()
print(f"Request openai 3: {time.time() - start}")
else:
merge = summary
for key in answer_key:
key = key.strip()
if key == '':
continue
if key[1] == '.' or key[2] == '.' or key[0] == '-':
key1 = ' '.join(key.split(' ')[1:])
else:
key1 = key
if len(merge) < len(key1):
continue
comp_msg = [{"role": "user", "content": f"请对比第一句话和第二句话之间的信息,判断第二句话是否完整地概括了第一句话的全部信息,包括关键细节和描述。请用是或否回答。\n第一句话:{key1}\n第二句话:{merge}" if zh else f"Please compare the information between Sentence 1 and Sentence 2 to determine if Sentence 2 contains all the information in Sentence 1, including key details and descriptions. Please answer with \"yes\" or \"no\".\nSentence 1: {key1}\nSentence 2: {merge}"}]
data = {'predict': comp_msg, 'idx': idx, 'isfinished': False, 'answer': answer}
print(f"Start Request 4: {time.time() - start}")
completion=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
print(f"Request 4: {time.time() - start}")
if completion.status_code == 200:
compare = str(completion.content, encoding="utf-8")
else:
compare = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=comp_msg,
temperature=0
)
compare = compare.choices[0].message.content.strip()
print(f"Request openai 4: {time.time() - start}")
if compare.startswith('是') or compare.startswith('Yes') or compare.startswith('yes'):
bingo += 1
answer_key.remove(key)
print(key)
reasoning = []
break
print(f"Finish compare: {time.time() - start}")
if bingo >= len(answer_key):
finished = True
response += f'恭喜你猜到了汤底,汤底是:{answer}\n点击"再来一局"按钮开始下一局游戏。' if zh else f'Congratulations! You have guessed the truth, the truth is: {answer}\nClick the "New Game" button for another game.'
print(f"Finish bingo: {time.time() - start}")
messages.append({"role": "assistant", "content": response})
data = {'predict': messages, 'idx': idx, 'isfinished': finished, 'answer': answer}
print(f"Finish predict: {time.time() - start}")
requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8'))
chatbot[-1] = (parse_text(input), parse_text(response))
print(f"Finish save: {time.time() - start}")
return chatbot, messages, known, bingo, reasoning, history
def reset_user_input():
return gr.update(value='')
def reset_state(zh, request: gr.Request):
global host_cnt
host = request.client.host
if not host in host_cnt:
host_cnt[host] = time.time()
else:
elapse = time.time()-host_cnt[host]
if elapse < 10:
time.sleep(10-elapse)
host_cnt[host] = time.time()
data = {'refresh': zh}
data=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8')).content
data = json.loads(str(data, encoding="utf-8"))
chatbot = data['chatbot']
messages = data['messages']
answer = data['answer']
story_key = data['story_key']
answer_key = data['answer_key']
idx = data['idx']
return chatbot, messages, gr.update(value=""), gr.update(value="显示答案") if zh else gr.update(value="Show Answer"), answer, idx, gr.update(value=data['story'].strip()), False, story_key, answer_key, [], 0, [], [], zh
def zh_en_trans(zh, request: gr.Request):
global host_cnt
host = request.client.host
if not host in host_cnt:
host_cnt[host] = time.time()
else:
elapse = time.time()-host_cnt[host]
if elapse < 10:
time.sleep(10-elapse)
host_cnt[host] = time.time()
zh = not zh
data = {'refresh': zh}
data=requests.post(os.environ.get("URL"), data=json.dumps(data, ensure_ascii=False).encode('utf-8')).content
data = json.loads(str(data, encoding="utf-8"))
chatbot = data['chatbot']
messages = data['messages']
answer = data['answer']
story_key = data['story_key']
answer_key = data['answer_key']
idx = data['idx']
return chatbot, messages, gr.update(value=""), gr.update(value="显示答案") if zh else gr.update(value="Show Answer"), answer, idx, gr.update(value=data['story'].strip()), False, story_key, answer_key, [], 0, [], [], zh, gr.update(value="English") if zh else gr.update(value="中文"), gr.update(value="发送") if zh else gr.update(value="Submit"), gr.update(value="再来一局") if zh else gr.update(value="New Game"), gr.update(value="海龟汤是一个推理类游戏,游戏开始时会给出一段隐去关键信息的叙述,即汤面,玩家根据汤面推理,提出能够通过“是”或“否”来回答的问题,通过提问不同可能性,缩小真相的范围,直到最终猜到真相(即汤底)的关键信息。玩家可以点击“再来一局”按钮随机一场新的游戏,点击“显示答案”可查看汤底。") if zh else gr.update(value="Lateral Thinking Puzzle is a deductive reasoning game. At the beginning of the game, players receive a narrative with key information concealed, referred to as the \"story\". Based on the story, players deduce and ask questions that can be answered with \"yes\" or \"no\" to narrow down different possibilities and ultimately guess the key information, known as the \"truth\". Players can click the \"New Game\" button to start a new random game or click the \"Show Answer\" button to reveal the truth.")
def show_hide_answer(answer, show_ans, zh):
if show_ans:
show_ans = False
return gr.update(value=""), gr.update(value="显示答案") if zh else gr.update(value="Show Answer"), show_ans
else:
show_ans = True
return gr.update(value=answer), gr.update(value="隐藏答案") if zh else gr.update(value="Hide Answer"), show_ans
host_cnt = {}
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=4):
gr.HTML("""<h1 align="center">Lateral Thinking Puzzle</h1>""")
with gr.Column(scale=1):
zh_enBtn = gr.Button("中文", variant="primary")
with gr.Row():
rule = gr.Textbox(label='Rules', value='Lateral Thinking Puzzle is a deductive reasoning game. At the beginning of the game, players receive a narrative with key information concealed, referred to as the \"story\". Based on the story, players deduce and ask questions that can be answered with \"yes\" or \"no\" to narrow down different possibilities and ultimately guess the key information, known as the \"truth\". Players can click the \"New Game\" button to start a new random game or click the \"Show Answer\" button to reveal the truth.', lines=1, max_lines=3).style(container=False)
chatbot = gr.Chatbot([(None, 'Click the \"New Game\" button to get started.')])
messages = gr.State([])
answer = gr.State('Click the \"New Game\" button to get started.')
idx = gr.State(0)
show_ans = gr.State(False)
zh = gr.State(False)
known = gr.State([])
story_key = gr.State([])
answer_key = gr.State([])
bingo = gr.State(0)
reasoning = gr.State([])
history = gr.State([])
with gr.Row():
with gr.Column(scale=4):
question = gr.Textbox(label='Story', value='Click the \"New Game\" button to get started.',
lines=1, max_lines=3).style(container=False)
with gr.Row():
user_input = gr.Textbox(show_label=False, placeholder="Input your question...", lines=1, max_lines=3).style(
container=False)
with gr.Row():
with gr.Column(scale=2):
submitBtn = gr.Button("Submit", variant="primary")
with gr.Column(scale=2):
emptyBtn = gr.Button("New Game")
with gr.Column(scale=1):
answer_output = gr.Textbox(show_label=False, lines=6, max_lines=6).style(
container=False)
answerBtn = gr.Button("Show Answer")
zh_enBtn.click(zh_en_trans, [zh], [chatbot, messages, answer_output, answerBtn, answer, idx, question, show_ans, story_key, answer_key, known, bingo, reasoning, history, zh, zh_enBtn, submitBtn, emptyBtn, rule], show_progress=True)
user_input.submit(predict, [user_input, chatbot, messages, idx, answer, story_key, answer_key, known, bingo, reasoning, history, zh], [chatbot, messages, known, bingo, reasoning, history], show_progress=True)
user_input.submit(reset_user_input, [], [user_input])
submitBtn.click(predict, [user_input, chatbot, messages, idx, answer, story_key, answer_key, known, bingo, reasoning, history, zh], [chatbot, messages, known, bingo, reasoning, history], show_progress=True)
submitBtn.click(reset_user_input, [], [user_input])
emptyBtn.click(reset_state, [zh], [chatbot, messages, answer_output, answerBtn, answer, idx, question, show_ans, story_key, answer_key, known, bingo, reasoning, history, zh], show_progress=True)
answerBtn.click(show_hide_answer, [answer, show_ans, zh], [answer_output, answerBtn, show_ans], show_progress=True)
demo.queue().launch()