|
import gradio as gr |
|
import time |
|
import os |
|
import datetime |
|
|
|
from openai import OpenAI |
|
|
|
from openai.types.beta.threads.runs import ( |
|
ToolCallsStepDetails, |
|
) |
|
|
|
|
|
SYS_PROMPT_DEFAULT = "あなたは優秀なアシスタントです。質問をされた場合は、質問に答えるコードを作成して実行します。回答は日本語でお願いします。" |
|
DUMMY = "********************" |
|
file_format = {".txt", ".csv", ".pdf"} |
|
|
|
|
|
IMG_FOLDER = "sample_data" |
|
ANT_FOLDER = "sample_data" |
|
|
|
|
|
IMG_MSG = "(画像ファイルを追加しました。送信ボタンの下に表示されています。)" |
|
ANT_MSG = "(下部の[出力ファイル]にファイルを追加しました。)" |
|
|
|
|
|
MAX_TRIAL = 50 |
|
INTER_SEC = 3 |
|
|
|
|
|
examples = ["sample_data/東京都年別人口.csv", "sample_data/練馬区年齢別人口.csv", "sample_data/桃太郎あらすじ.txt"] |
|
example_toid = {"東京都年別人口.csv" : "file-GOEk4X4WpU5gBJAuHCMtiJrn" |
|
, "練馬区年齢別人口.csv" : "file-YAFPMMqG3Zl5DRx5hTLjCfFa" |
|
, "桃太郎あらすじ.txt" : "file-oDjAzL3G4ktwAUEkcGDCQMuQ"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
code_mode = {'ON': True, 'OFF': False} |
|
|
|
def set_state(openai_key, sys_prompt, code_output, state): |
|
""" 設定タブの情報をセッションに保存する関数 """ |
|
|
|
state["openai_key"] = openai_key |
|
state["system_prompt"] = sys_prompt |
|
state["code_mode"] = code_mode[code_output] |
|
|
|
return state |
|
|
|
|
|
def init(state, text, file): |
|
""" 入力チェックを行う関数 |
|
※ここで例外を起こすと入力できなくなるので次の関数でエラーにする """ |
|
|
|
err_msg = "" |
|
file_id = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
if not text: |
|
|
|
|
|
err_msg = "テキストを入力して下さい。" |
|
|
|
return state, text, file, file_id, err_msg |
|
|
|
elif file: |
|
|
|
|
|
root, ext = os.path.splitext(file) |
|
|
|
if ext not in file_format: |
|
|
|
|
|
err_msg = "指定した形式のファイルをアップしてください。(注意事項タブに記載)" |
|
|
|
return state, text, gr.Image(value=None,type="filepath", interactive=False), file_id, err_msg |
|
|
|
if state["client"] is None: |
|
|
|
|
|
os.environ["OPENAI_API_KEY"] = os.environ["TEST_OPENAI_KEY"] |
|
|
|
|
|
|
|
client = OpenAI() |
|
|
|
|
|
os.environ["OPENAI_API_KEY"] = "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
thread = client.beta.threads.create() |
|
|
|
|
|
state["client"] = client |
|
|
|
state["assistant_id"] = os.environ["ASSIST_ID"] |
|
state["thread_id"] = thread.id |
|
|
|
if file: |
|
|
|
|
|
basename = os.path.basename(file) |
|
|
|
if example_toid.get(basename): |
|
|
|
|
|
file_id = example_toid.get(basename) |
|
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
file_id = "" |
|
|
|
|
|
|
|
return state, text, file, file_id, err_msg |
|
|
|
def raise_exception(err_msg): |
|
""" エラーの場合例外を起こす関数 """ |
|
|
|
if err_msg != "": |
|
raise Exception("これは入力チェックでの例外です。") |
|
|
|
return |
|
|
|
|
|
def add_history(history, text, file_id): |
|
""" Chat履歴"history"に追加を行う関数 """ |
|
|
|
|
|
|
|
|
|
err_msg = "" |
|
new_row_flg = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
new_row_flg = True |
|
|
|
if file_id is None or file_id == "": |
|
|
|
if new_row_flg: |
|
|
|
|
|
history = history + [(text, None)] |
|
else: |
|
history[-1][0] = text |
|
|
|
elif file_id is not None: |
|
|
|
if new_row_flg: |
|
|
|
|
|
history = history + [("file:" + file_id, DUMMY)] |
|
history = history + [(text, None)] |
|
|
|
else: |
|
history[-1][0] = "file:" + file_id |
|
history = history + [(text, None)] |
|
|
|
print(history) |
|
|
|
|
|
new_text = gr.Textbox(value="", interactive=True) |
|
|
|
return history, new_text, err_msg |
|
|
|
|
|
def bot(state, history, file_id): |
|
|
|
err_msg = "" |
|
image_file = None |
|
ant_file = None |
|
|
|
|
|
|
|
system_prompt = state["system_prompt"] |
|
client = state["client"] |
|
assistant_id = state["assistant_id"] |
|
thread_id = state["thread_id"] |
|
last_msg_id = state["last_msg_id"] |
|
code_mode = state["code_mode"] |
|
|
|
print("system_prompt") |
|
|
|
if file_id is None or file_id == "": |
|
|
|
|
|
message = client.beta.threads.messages.create( |
|
thread_id=thread_id, |
|
role="user", |
|
content=history[-1][0], |
|
) |
|
else: |
|
|
|
|
|
message = client.beta.threads.messages.create( |
|
thread_id=thread_id, |
|
role="user", |
|
content=history[-1][0], |
|
file_ids=[file_id] |
|
) |
|
|
|
print(message) |
|
|
|
|
|
run = client.beta.threads.runs.create( |
|
thread_id=thread_id, |
|
assistant_id=assistant_id, |
|
instructions=system_prompt |
|
) |
|
|
|
|
|
for i in range(0, MAX_TRIAL, 1): |
|
|
|
if i > 0: |
|
time.sleep(INTER_SEC) |
|
|
|
|
|
run = client.beta.threads.runs.retrieve( |
|
thread_id=thread_id, |
|
run_id=run.id |
|
) |
|
|
|
|
|
messages = client.beta.threads.messages.list( |
|
thread_id=thread_id, |
|
after=last_msg_id, |
|
order="asc" |
|
) |
|
|
|
print(last_msg_id) |
|
|
|
|
|
msg_log = client.beta.threads.messages.list( |
|
thread_id=thread_id, |
|
|
|
order="asc" |
|
) |
|
|
|
print(msg_log) |
|
|
|
|
|
for msg in messages: |
|
|
|
|
|
|
|
if msg.role == "assistant": |
|
|
|
for content in msg.content: |
|
|
|
res_text = "" |
|
file_id = "" |
|
ant_file = None |
|
|
|
cont_dict = content.model_dump() |
|
|
|
ct_image_file = cont_dict.get("image_file") |
|
|
|
if ct_image_file: |
|
|
|
|
|
res_file_id = ct_image_file.get("file_id") |
|
|
|
|
|
image_file = file_download(client, res_file_id, IMG_FOLDER , ".png") |
|
|
|
if image_file is None: |
|
|
|
err_msg = "ファイルのダウンロードに失敗しました。" |
|
|
|
else: |
|
|
|
print("画像ファイル追加") |
|
|
|
res_text = IMG_MSG |
|
|
|
history = history + [[None, res_text]] |
|
|
|
|
|
last_msg_id = msg.id |
|
|
|
else: |
|
|
|
|
|
res_text = cont_dict["text"].get("value") |
|
|
|
|
|
if len(cont_dict.get("text").get("annotations")) > 0: |
|
|
|
ct_ant = cont_dict.get("text").get("annotations") |
|
|
|
if ct_ant[0].get("file_path") is not None: |
|
|
|
|
|
ant_file_id = ct_ant[0].get("file_path").get("file_id") |
|
|
|
if ct_ant[0].get("text") is not None: |
|
|
|
|
|
ext = "." + ct_ant[0].get("text")[ct_ant[0].get("text").rfind('.') + 1:] |
|
|
|
|
|
ant_file = file_download(client, ant_file_id, ANT_FOLDER, ext) |
|
|
|
if ant_file is None: |
|
|
|
err_msg = "参照ファイルのダウンロードに失敗しました。" |
|
|
|
else: |
|
|
|
|
|
res_text = res_text + "\n\n" + ANT_MSG |
|
|
|
print(res_text) |
|
|
|
if res_text != "": |
|
|
|
|
|
if history[-1][1] is not None: |
|
|
|
|
|
history = history + [[None, res_text]] |
|
else: |
|
|
|
history[-1][1] = res_text |
|
|
|
|
|
last_msg_id = msg.id |
|
|
|
|
|
|
|
print(run.status) |
|
state["last_msg_id"] = last_msg_id |
|
|
|
|
|
if run.status == "completed": |
|
|
|
if not code_mode: |
|
|
|
yield gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg |
|
|
|
break |
|
else: |
|
|
|
|
|
run_steps = client.beta.threads.runs.steps.list( |
|
thread_id=thread_id, run_id=run.id |
|
) |
|
|
|
|
|
input_code = get_code(run_steps) |
|
|
|
if len(input_code) > 0: |
|
|
|
for code in input_code: |
|
|
|
code = "[input_code]\n\n" + code |
|
|
|
|
|
history = history + [[None, code]] |
|
|
|
yield gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg |
|
|
|
break |
|
|
|
elif run.status == "failed": |
|
|
|
|
|
err_msg = "※メッセージ取得に失敗しました。" |
|
return gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg |
|
|
|
elif i == MAX_TRIAL: |
|
|
|
|
|
err_msg = "※メッセージ取得の際にタイムアウトしました。" |
|
return gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg |
|
|
|
else: |
|
if i > 3: |
|
|
|
|
|
|
|
yield gr.Chatbot(label=run.status + " (Request:" + str(i) + ")" ,value=history), image_file, ant_file, err_msg |
|
|
|
|
|
def get_code(run_steps): |
|
|
|
input_code = [] |
|
|
|
for data in run_steps.data: |
|
|
|
if isinstance(data.step_details, ToolCallsStepDetails): |
|
|
|
for tool_call in data.step_details.tool_calls: |
|
|
|
input_code.append(tool_call.code_interpreter.input) |
|
|
|
return input_code |
|
|
|
|
|
def file_download(client, file_id, folder, ext): |
|
""" OpenAIからファイルをダウンロードしてパスを返す """ |
|
api_response = client.files.with_raw_response.retrieve_content(file_id) |
|
|
|
if api_response.status_code == 200: |
|
|
|
content = api_response.content |
|
|
|
file_path = folder + "/" + file_id + ext |
|
|
|
with open(file_path, 'wb') as f: |
|
f.write(content) |
|
|
|
return file_path |
|
|
|
else: |
|
return None |
|
|
|
|
|
def finally_proc(): |
|
""" 最終処理用関数 """ |
|
|
|
new_up_file = gr.File(value=None, interactive = True) |
|
new_file_id = gr.Textbox(value="") |
|
|
|
return new_up_file, new_file_id |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
gr.Markdown("<h2>GPT Code Interpreter対応チャット</h2>") |
|
|
|
|
|
state = gr.State({ |
|
"system_prompt": SYS_PROMPT_DEFAULT, |
|
"openai_key" : None, |
|
"code_mode" : False, |
|
"client" : None, |
|
"assistant_id" : None, |
|
"thread_id" : None, |
|
"last_msg_id" : "" |
|
}) |
|
|
|
with gr.Tab("Chat画面") as chat: |
|
|
|
|
|
chatbot = gr.Chatbot(label="チャット画面") |
|
text_msg = gr.Textbox(label="テキスト") |
|
with gr.Row(): |
|
up_file = gr.File(label="ファイルアップロード", type="filepath",interactive = True) |
|
result_image = gr.Image(label="出力画像", type="filepath", interactive = False) |
|
gr.Examples(label="サンプルデータ", examples=examples, inputs=[up_file]) |
|
with gr.Row(): |
|
btn = gr.Button(value="送信") |
|
|
|
btn_clear = gr.ClearButton(value="リセット", components=[chatbot, text_msg, up_file, state]) |
|
sys_msg = gr.Textbox(label="システムメッセージ", interactive = False) |
|
result_file = gr.File(label="出力ファイル", type="filepath",interactive = False) |
|
|
|
|
|
file_id = gr.Textbox(visible=False) |
|
|
|
|
|
bc = btn.click(init, [state, text_msg, up_file], [state, text_msg, up_file, file_id, sys_msg], queue=False).success( |
|
raise_exception, sys_msg, None).success( |
|
add_history, [chatbot, text_msg, file_id], [chatbot, text_msg, sys_msg], queue=False).success( |
|
bot, [state, chatbot, file_id],[chatbot, result_image, result_file, sys_msg]).then( |
|
finally_proc, None, [up_file, file_id], queue=False |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Tab("設定") as set: |
|
openai_key = gr.Textbox(label="OpenAI API Key") |
|
|
|
system_prompt = gr.Textbox(value = SYS_PROMPT_DEFAULT,lines = 5, label="Custom instructions", interactive = True) |
|
|
|
code_output = gr.Dropdown(label="コード出力", choices=["OFF", "ON"], value = "OFF", interactive = True) |
|
|
|
|
|
chat.select(set_state, [openai_key, system_prompt, code_output, state], state) |
|
|
|
demo.queue() |
|
|
|
demo.launch(debug=True) |
|
|
|
|