nekoniii3's picture
update
1a0c6e0
raw
history blame
16.4 kB
import gradio as gr
import time
import os
import datetime
# from zoneinfo import ZoneInfo
from openai import OpenAI
from openai.types.beta.threads.runs import (
ToolCallsStepDetails,
)
# GPT用設定
SYS_PROMPT_DEFAULT = "あなたは優秀なアシスタントです。質問をされた場合は、質問に答えるコードを作成して実行します。回答は日本語でお願いします。"
DUMMY = "********************"
file_format = {".txt", ".csv", ".pdf"}
# 各種出力フォルダ
IMG_FOLDER = "sample_data"
ANT_FOLDER = "sample_data"
# 各種メッセージ
IMG_MSG = "(画像ファイルを追加しました。送信ボタンの下に表示されています。)"
ANT_MSG = "(下部の[出力ファイル]にファイルを追加しました。)"
# 各種設定値
MAX_TRIAL = 50 # メッセージ取得最大試行数
INTER_SEC = 3 # 試行間隔(秒)
# サンプル用情報
examples = ["sample_data/東京都年別人口.csv", "sample_data/練馬区年齢別人口.csv", "sample_data/桃太郎あらすじ.txt"]
example_toid = {"東京都年別人口.csv" : "file-GOEk4X4WpU5gBJAuHCMtiJrn"
, "練馬区年齢別人口.csv" : "file-YAFPMMqG3Zl5DRx5hTLjCfFa"
, "桃太郎あらすじ.txt" : "file-oDjAzL3G4ktwAUEkcGDCQMuQ"}
# file_id = "file-0Ly64DA2jzE9mOFYayOKJJK0"
# file_id = "file-aVnVcpEVpsy77xQ8SlTp1WoX" # ライ麦
# file_id = "file-HFCaJbf3k7j0fhBqh1Rwf2VV" # 練馬区
# コード出力用
code_mode = {'ON': True, 'OFF': False}
def set_state(openai_key, sys_prompt, code_output, state):
""" 設定タブの情報をセッションに保存する関数 """
state["openai_key"] = openai_key
state["system_prompt"] = sys_prompt
state["code_mode"] = code_mode[code_output]
return state
def init(state, text, file):
""" 入力チェックを行う関数
※ここで例外を起こすと入力できなくなるので次の関数でエラーにする """
err_msg = ""
file_id = None
# if state["openai_key"] == "" or state["openai_key"] is None:
# # OpenAI API Key未入力
# err_msg = "OpenAI API Keyを入力してください。(設定タブ)"
if not text:
# テキスト未入力
err_msg = "テキストを入力して下さい。"
return state, text, file, file_id, err_msg
elif file:
# 入力画像のファイル形式チェック
root, ext = os.path.splitext(file)
if ext not in file_format:
# ファイル形式チェック
err_msg = "指定した形式のファイルをアップしてください。(注意事項タブに記載)"
return state, text, gr.Image(value=None,type="filepath", interactive=False), file_id, err_msg
if state["client"] is None:
# 初回起動時は初期処理をする
os.environ["OPENAI_API_KEY"] = os.environ["TEST_OPENAI_KEY"] # テスト時
# os.environ["OPENAI_API_KEY"] = state["openai_key"]
client = OpenAI()
# client作成後は消す
os.environ["OPENAI_API_KEY"] = ""
# アシスタント作成
# assistant = client.beta.assistants.create(
# name="codeinter_test",
# instructions=state["system_prompt"],
# # model="gpt-4-1106-preview",
# model="gpt-3.5-turbo-1106",
# tools=[{"type": "code_interpreter"}]
# )
# print(assistant.id)
# スレッド作成
thread = client.beta.threads.create()
# セッションにセット
state["client"] = client
# state["assistant_id"] = assistant.id
state["assistant_id"] = os.environ["ASSIST_ID"] # テスト中アシスタントは固定
state["thread_id"] = thread.id
if file:
# ファイル名取得
basename = os.path.basename(file)
if example_toid.get(basename):
# サンプルの場合は用意したIDをセット
file_id = example_toid.get(basename)
else:
# ファイルのアップ
# file_response = client.files.create(
# purpose="assistants",
# file=open(file,"rb"),
# )
# if file_response.status != 'processed':
# # 失敗時
# err_msg = "ファイルのアップロードに失敗しました"
# else
# # ファイルのIDをセット
# file_id = file_response.id
# file_id = "file-0Ly64DA2jzE9mOFYayOKJJK0"
# file_id = "file-aVnVcpEVpsy77xQ8SlTp1WoX" # ライ麦
# file_id = "file-HFCaJbf3k7j0fhBqh1Rwf2VV" # 練馬区
file_id = ""
# print(file_id)
return state, text, file, file_id, err_msg
def raise_exception(err_msg):
""" エラーの場合例外を起こす関数 """
if err_msg != "":
raise Exception("これは入力チェックでの例外です。")
return
def add_history(history, text, file_id):
""" Chat履歴"history"に追加を行う関数 """
# print("前:")
# print(history)
err_msg = ""
new_row_flg = False
# 新しい行を追加するか判定
# if len(history) == 0:
# new_row_flg = True
# elif history[-1][0] is not None:
# # 前回がアシスタントでない場合も追加
# new_row_flg = True
new_row_flg = True
if file_id is None or file_id == "":
if new_row_flg:
# テキストだけの場合そのまま追加
history = history + [(text, None)]
else:
history[-1][0] = text
elif file_id is not None:
if new_row_flg:
# ファイルがあればファイルIDとテキストを追加
history = history + [("file:" + file_id, DUMMY)]
history = history + [(text, None)]
else:
history[-1][0] = "file:" + file_id
history = history + [(text, None)]
print(history)
# テキストだけ初期化
new_text = gr.Textbox(value="", interactive=True)
return history, new_text, err_msg
def bot(state, history, file_id):
err_msg = ""
image_file = None
ant_file = None
# new_row_flg = False
# セッション情報取得
system_prompt = state["system_prompt"]
client = state["client"]
assistant_id = state["assistant_id"]
thread_id = state["thread_id"]
last_msg_id = state["last_msg_id"]
code_mode = state["code_mode"]
print("system_prompt")
if file_id is None or file_id == "":
# ファイルがない場合
message = client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=history[-1][0],
)
else:
# ファイルがあるときはIDをセット
message = client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=history[-1][0],
file_ids=[file_id]
)
print(message)
# RUNスタート
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id,
instructions=system_prompt
)
# "completed"となるまで繰り返す(指定秒おき)
for i in range(0, MAX_TRIAL, 1):
if i > 0:
time.sleep(INTER_SEC)
# メッセージ受け取り
run = client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
# 前回のメッセージより後を昇順で取り出す
messages = client.beta.threads.messages.list(
thread_id=thread_id,
after=last_msg_id,
order="asc"
)
print(last_msg_id)
# print(messages.data)
msg_log = client.beta.threads.messages.list(
thread_id=thread_id,
# after=last_msg_id,
order="asc"
)
print(msg_log)
# messageを取り出す
for msg in messages:
# msg_id = msg.id
if msg.role == "assistant":
for content in msg.content:
res_text = ""
file_id = ""
ant_file = None
cont_dict = content.model_dump() # 辞書型に変換
ct_image_file = cont_dict.get("image_file")
if ct_image_file:
# imageファイルがあるならIDセット
res_file_id = ct_image_file.get("file_id")
# ファイルをダウンロード
image_file = file_download(client, res_file_id, IMG_FOLDER , ".png")
if image_file is None:
err_msg = "ファイルのダウンロードに失敗しました。"
else:
print("画像ファイル追加")
res_text = IMG_MSG
history = history + [[None, res_text]]
# 最終メッセージID更新
last_msg_id = msg.id
else:
# 返答テキスト取得
res_text = cont_dict["text"].get("value")
# 注釈(参照ファイル)ががある場合取得
if len(cont_dict.get("text").get("annotations")) > 0:
ct_ant = cont_dict.get("text").get("annotations")
if ct_ant[0].get("file_path") is not None:
# 参照ファイルのID取得
ant_file_id = ct_ant[0].get("file_path").get("file_id")
if ct_ant[0].get("text") is not None:
# ファイル形式(拡張子)取得
ext = "." + ct_ant[0].get("text")[ct_ant[0].get("text").rfind('.') + 1:]
# ファイルダウンロード
ant_file = file_download(client, ant_file_id, ANT_FOLDER, ext)
if ant_file is None:
err_msg = "参照ファイルのダウンロードに失敗しました。"
else:
# 参照ファイルがある旨のメッセージを追加
res_text = res_text + "\n\n" + ANT_MSG
print(res_text)
if res_text != "":
# Chat画面更新
if history[-1][1] is not None:
# 新しい行を追加
history = history + [[None, res_text]]
else:
history[-1][1] = res_text
# 最終メッセージID更新
last_msg_id = msg.id
# yield gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg
print(run.status)
state["last_msg_id"] = last_msg_id
# 完了なら終了
if run.status == "completed":
if not code_mode:
yield gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg
break
else:
# コードモードがONの場合
run_steps = client.beta.threads.runs.steps.list(
thread_id=thread_id, run_id=run.id
)
# コードを取得
input_code = get_code(run_steps)
if len(input_code) > 0:
for code in input_code:
code = "[input_code]\n\n" + code
# コードを追加
history = history + [[None, code]]
yield gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg
break
elif run.status == "failed":
# エラーとして終了
err_msg = "※メッセージ取得に失敗しました。"
return gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg
elif i == MAX_TRIAL:
# エラーとして終了
err_msg = "※メッセージ取得の際にタイムアウトしました。"
return gr.Chatbot(label=run.status ,value=history), image_file, ant_file, err_msg
else:
if i > 3:
# 作業中とわかるようにする
# history = history + [[None, "………"]]
yield gr.Chatbot(label=run.status + " (Request:" + str(i) + ")" ,value=history), image_file, ant_file, err_msg
def get_code(run_steps):
input_code = []
for data in run_steps.data:
if isinstance(data.step_details, ToolCallsStepDetails):
for tool_call in data.step_details.tool_calls:
input_code.append(tool_call.code_interpreter.input)
return input_code
def file_download(client, file_id, folder, ext):
""" OpenAIからファイルをダウンロードしてパスを返す """
api_response = client.files.with_raw_response.retrieve_content(file_id)
if api_response.status_code == 200:
content = api_response.content
file_path = folder + "/" + file_id + ext
with open(file_path, 'wb') as f:
f.write(content)
return file_path
else:
return None
def finally_proc():
""" 最終処理用関数 """
new_up_file = gr.File(value=None, interactive = True)
new_file_id = gr.Textbox(value="")
return new_up_file, new_file_id
with gr.Blocks() as demo:
gr.Markdown("<h2>GPT Code Interpreter対応チャット</h2>")
# セッションの宣言
state = gr.State({
"system_prompt": SYS_PROMPT_DEFAULT,
"openai_key" : None,
"code_mode" : False,
"client" : None,
"assistant_id" : None,
"thread_id" : None,
"last_msg_id" : ""
})
with gr.Tab("Chat画面") as chat:
# 各コンポーネント定義
chatbot = gr.Chatbot(label="チャット画面")
text_msg = gr.Textbox(label="テキスト")
with gr.Row():
up_file = gr.File(label="ファイルアップロード", type="filepath",interactive = True)
result_image = gr.Image(label="出力画像", type="filepath", interactive = False)
gr.Examples(label="サンプルデータ", examples=examples, inputs=[up_file])
with gr.Row():
btn = gr.Button(value="送信")
# btn_download = gr.Button(value="画像のダウンロード") # 保留中
btn_clear = gr.ClearButton(value="リセット", components=[chatbot, text_msg, up_file, state])
sys_msg = gr.Textbox(label="システムメッセージ", interactive = False)
result_file = gr.File(label="出力ファイル", type="filepath",interactive = False)
# ファイルID保存用
file_id = gr.Textbox(visible=False)
# 送信ボタンクリック時の処理
bc = btn.click(init, [state, text_msg, up_file], [state, text_msg, up_file, file_id, sys_msg], queue=False).success(
raise_exception, sys_msg, None).success(
add_history, [chatbot, text_msg, file_id], [chatbot, text_msg, sys_msg], queue=False).success(
bot, [state, chatbot, file_id],[chatbot, result_image, result_file, sys_msg]).then(
finally_proc, None, [up_file, file_id], queue=False
)
# クリア時でもOpenAIKeyなどは再セット
# btn_clear.click(set_state, [openai_key, system_prompt, code_output, state], state)
# テキスト入力Enter時の処理
# txt_msg = text_msg.submit(respond, inputs=[text_msg, image, chatbot], outputs=[text_msg, image, chatbot])
with gr.Tab("設定") as set:
openai_key = gr.Textbox(label="OpenAI API Key")
# language = gr.Dropdown(choices=["Japanese", "English"], value = "Japanese", label="Language", interactive = True)
system_prompt = gr.Textbox(value = SYS_PROMPT_DEFAULT,lines = 5, label="Custom instructions", interactive = True)
# Enter不使用
code_output = gr.Dropdown(label="コード出力", choices=["OFF", "ON"], value = "OFF", interactive = True)
# 設定タブからChatタブに戻った時の処理
chat.select(set_state, [openai_key, system_prompt, code_output, state], state)
demo.queue()
demo.launch(debug=True)