lawyer_assitance / main.py
qgyd2021's picture
[update]add function calling test
44e1a5b
raw
history blame
No virus
19.3 kB
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
https://huggingface.co/spaces/fffiloni/langchain-chat-with-pdf-openai
"""
import argparse
import httpx
import importlib
import json
import logging
import os
import platform
import shutil
import time
from typing import List, Tuple
logging.basicConfig(
level=logging.INFO if platform.system() == "Windows" else logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
import gradio as gr
import openai
from openai import OpenAI
from threading import Thread
import _queue
from queue import Queue
import project_settings as settings
from project_settings import project_path
logger = logging.getLogger(__name__)
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--examples_json_file",
default="examples.json",
type=str
)
parser.add_argument(
"--openai_api_key",
default=settings.environment.get("openai_api_key", default=None, dtype=str),
type=str
)
args = parser.parse_args()
return args
def dynamic_import_function(package_name: str, function_name: str):
try:
lib = importlib.import_module("functions.{}".format(package_name))
print(lib)
except ModuleNotFoundError as e:
raise e
function = getattr(lib, function_name)
return function
def click_create_assistant(openai_api_key: str,
name: str,
instructions: str,
description: str,
tools: str,
files: List[str],
file_ids: str,
model: str,
):
logger.info("click create assistant, name: {}".format(name))
client = OpenAI(
api_key=openai_api_key,
)
# tools
tools = str(tools).strip()
if tools is not None and len(tools) != 0:
tools = tools.split("\n")
tools = [json.loads(tool) for tool in tools if len(tool.strip()) != 0]
else:
tools = list()
# files
if files is not None and len(files) != 0:
files = [
client.files.create(
file=open(file, "rb"),
purpose='assistants'
) for file in files
]
else:
files = list()
# file_ids
file_ids = str(file_ids).strip()
if file_ids is not None and len(file_ids) != 0:
file_ids = file_ids.split("\n")
file_ids = [file_id.strip() for file_id in file_ids if len(file_id.strip()) != 0]
else:
file_ids = list()
# assistant
assistant = client.beta.assistants.create(
name=name,
instructions=instructions,
description=description,
tools=tools,
file_ids=file_ids + [file.id for file in files],
model=model,
)
assistant_id = assistant.id
return assistant_id, None
def click_list_assistant(openai_api_key: str) -> str:
client = OpenAI(
api_key=openai_api_key,
)
assistant_list = client.beta.assistants.list()
assistant_list = assistant_list.model_dump(mode="json")
result = ""
for a in assistant_list["data"]:
assis = "id: \n{}\nname: \n{}\ndescription: \n{}\n\n".format(a["id"], a["name"], a["description"])
result += assis
return result
def click_delete_assistant(openai_api_key: str,
assistant_id: str) -> str:
assistant_id = assistant_id.strip()
logger.info("click delete assistant, assistant_id: {}".format(assistant_id))
client = OpenAI(
api_key=openai_api_key,
)
try:
assistant_deleted = client.beta.assistants.delete(assistant_id=assistant_id)
result = "success" if assistant_deleted.deleted else "failed"
except openai.NotFoundError as e:
result = e.message
return result
def click_delete_all_assistant(openai_api_key: str):
client = OpenAI(
api_key=openai_api_key,
)
assistant_list = client.beta.assistants.list()
for a in assistant_list.data:
client.beta.assistants.delete(a.id)
return None
def click_list_file(openai_api_key: str):
client = OpenAI(
api_key=openai_api_key,
)
file_list = client.files.list()
file_list = file_list.model_dump(mode="json")
result = ""
for f in file_list["data"]:
file = "id: \n{}\nfilename: \n{}\nbytes: \n{}\nstatus: \n{}\n\n".format(
f["id"], f["filename"], f["bytes"], f["status"]
)
result += file
return result
def click_delete_file(openai_api_key: str,
file_id: str) -> str:
file_id = file_id.strip()
logger.info("click delete file, file_id: {}".format(file_id))
client = OpenAI(
api_key=openai_api_key,
)
try:
assistant_deleted = client.files.delete(file_id=file_id)
result = "success" if assistant_deleted.deleted else "failed"
except openai.NotFoundError as e:
result = e.message
except httpx.InvalidURL as e:
result = str(e)
return result
def click_upload_files(openai_api_key: str,
files: List[str],
):
logger.info("click upload files, files: {}".format(files))
client = OpenAI(
api_key=openai_api_key,
)
result = list()
if files is not None and len(files) != 0:
files = [
client.files.create(
file=open(file, "rb"),
purpose='assistants'
) for file in files
]
file_ids = [file.id for file in files]
result.extend(file_ids)
return result
def click_list_function_python_script():
function_script_dir = project_path / "functions"
result = ""
for script in function_script_dir.glob("*.py"):
if script.name == "__init__.py":
continue
result += script.name
result += "\n"
return result
def click_upload_function_python_script(files: List[str]):
tgt = project_path / "functions"
if files is None:
return None
for file in files:
shutil.copy(file, tgt.as_posix())
return None
def click_delete_function_python_script(filename: str):
function_script_dir = project_path / "functions"
filename = function_script_dir / filename.strip()
filename = filename.as_posix()
try:
os.remove(filename)
result = "success"
except FileNotFoundError as e:
result = str(e)
except Exception as e:
result = str(e)
return result
def convert_message_list_to_conversation(message_list: List[dict]) -> List[Tuple[str, str]]:
conversation = list()
for message in message_list:
role = message["role"]
content = message["content"]
for c in content:
c_type = c["type"]
if c_type != "text":
continue
text: dict = c["text"]
if c_type == "text":
text_value = text["value"]
text_annotations = text["annotations"]
msg = text_value
for text_annotation in text_annotations:
a_type = text_annotation["type"]
if a_type == "file_citation":
msg += "\n\n"
msg += "\nquote: \n{}\nfile_id: \n{}".format(
text_annotation["file_citation"]["quote"],
text_annotation["file_citation"]["file_id"],
)
else:
raise NotImplementedError
if role == "assistant":
msg = [None, msg]
else:
msg = [msg, None]
conversation.append(msg)
return conversation
def refresh(openai_api_key: str,
thread_id: str,
):
client = OpenAI(
api_key=openai_api_key,
)
message_list = client.beta.threads.messages.list(
thread_id=thread_id
)
message_list = message_list.model_dump(mode="json")
message_list = message_list["data"]
message_list = list(sorted(message_list, key=lambda x: x["created_at"]))
logger.debug("message_list: {}".format(message_list))
conversation = convert_message_list_to_conversation(message_list)
return conversation
def add_and_run(openai_api_key: str,
assistant_id: str,
thread_id: str,
name: str,
instructions: str,
description: str,
tools: str,
files: List[str],
file_ids: str,
model: str,
query: str,
):
client = OpenAI(
api_key=openai_api_key,
)
if assistant_id is None or len(assistant_id.strip()) == 0:
assistant_id, _ = click_create_assistant(
openai_api_key,
name, instructions, description, tools, files, file_ids, model
)
if thread_id is None or len(thread_id.strip()) == 0:
thread = client.beta.threads.create()
thread_id = thread.id
logger.info(f"assistant_id: {assistant_id}, thread_id: {thread_id}")
message = client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=query
)
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id,
)
delta_time = 0.1
last_conversation = None
no_updates_count = 0
max_no_updates_count = 5
while True:
time.sleep(delta_time)
run = client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id,
)
# required action
if run.required_action is not None:
if run.required_action.type == "submit_tool_outputs":
tool_outputs = list()
for tool_call in run.required_action.submit_tool_outputs.tool_calls:
function_name = tool_call.function.name
function_to_call = dynamic_import_function(function_name, function_name)
function_args = json.loads(tool_call.function.arguments)
function_response = function_to_call(
location=function_args.get("location"),
unit=function_args.get("unit"),
)
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": function_response,
})
run = client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=run.id,
tool_outputs=tool_outputs
)
# get message
conversation = refresh(openai_api_key, thread_id)
if conversation == last_conversation:
if any([run.completed_at is not None,
run.cancelled_at is not None,
run.failed_at is not None,
run.expires_at is not None]):
no_updates_count += 1
if no_updates_count >= max_no_updates_count:
break
last_conversation = conversation
result = [
assistant_id, thread_id,
conversation,
]
yield result
def main():
args = get_args()
gr_description = """
OpenAI Assistant
"""
# example json
with open(args.examples_json_file, "r", encoding="utf-8") as f:
examples = json.load(f)
for example in examples:
files: List[str] = example[4]
if files is None:
continue
files = [(project_path / file).as_posix() for file in files]
example[4] = files
# ui
with gr.Blocks() as blocks:
gr.Markdown(value=gr_description)
with gr.Row():
# settings
with gr.Column(scale=3):
with gr.Tabs():
with gr.TabItem("create assistant"):
openai_api_key = gr.Text(
value=args.openai_api_key,
label="openai_api_key",
placeholder="Fill with your `openai_api_key`"
)
name = gr.Textbox(label="name")
instructions = gr.Textbox(label="instructions")
description = gr.Textbox(label="description")
model = gr.Dropdown(["gpt-4-1106-preview"], value="gpt-4-1106-preview", label="model")
# functions
tools = gr.TextArea(label="functions")
# upload files
retrieval_files = gr.Files(label="retrieval_files")
retrieval_file_ids = gr.TextArea(label="retrieval_file_ids")
# create assistant
create_assistant_button = gr.Button("create assistant")
with gr.TabItem("assistants"):
list_assistant_button = gr.Button("list assistant")
assistant_list = gr.TextArea(label="assistant_list")
delete_assistant_id = gr.Textbox(max_lines=1, label="delete_assistant_id")
delete_assistant_button = gr.Button("delete assistant")
delete_all_assistant_button = gr.Button("delete all assistant")
with gr.TabItem("files"):
list_file_button = gr.Button("list file")
file_list = gr.TextArea(label="file_list")
upload_files = gr.Files(label="upload_files")
upload_files_button = gr.Button("upload file")
delete_file_id = gr.Textbox(max_lines=1, label="delete_file_id")
delete_file_button = gr.Button("delete file")
with gr.TabItem("function script"):
list_function_python_script_button = gr.Button("list python script")
list_function_python_script_list = gr.TextArea(label="python_script_list")
upload_function_python_script_files = gr.Files(label="upload_python_script_files")
upload_function_python_script_button = gr.Button("upload python script")
delete_function_python_script_file = gr.Textbox(max_lines=1, label="delete_python_script_file")
delete_function_python_script_button = gr.Button("delete python script")
# chat
with gr.Column(scale=5):
chat_bot = gr.Chatbot(label="conversation", height=600)
query = gr.Textbox(lines=1, label="query")
with gr.Row():
with gr.Column(scale=1):
add_and_run_button = gr.Button("Add and run")
with gr.Column(scale=1):
refresh_button = gr.Button("Refresh")
# states
with gr.Column(scale=2):
assistant_id = gr.Textbox(value=None, label="assistant_id")
thread_id = gr.Textbox(value=None, label="thread_id")
tips = gr.TextArea(value=None, label="tips")
# examples
with gr.Row():
gr.Examples(
examples=examples,
inputs=[
name, instructions, description, tools, retrieval_files, model,
query, tips
],
examples_per_page=5
)
# create assistant
create_assistant_button.click(
click_create_assistant,
inputs=[
openai_api_key,
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model,
],
outputs=[
assistant_id, thread_id
]
)
# list assistant
list_assistant_button.click(
click_list_assistant,
inputs=[
openai_api_key
],
outputs=[
assistant_list
]
)
# delete assistant button
delete_assistant_button.click(
click_delete_assistant,
inputs=[
openai_api_key,
delete_assistant_id
],
outputs=[
delete_assistant_id
]
)
# delete all assistant
delete_all_assistant_button.click(
click_delete_all_assistant,
inputs=[
openai_api_key
],
outputs=[
file_list
]
)
# list file
list_file_button.click(
click_list_file,
inputs=[
openai_api_key
],
outputs=[
file_list
],
)
# delete file
delete_file_button.click(
click_delete_file,
inputs=[
openai_api_key,
delete_file_id
],
outputs=[
delete_file_id
]
)
# upload files
upload_files_button.click(
click_upload_files,
inputs=[
openai_api_key,
upload_files
],
outputs=[
]
)
# list python script
list_function_python_script_button.click(
click_list_function_python_script,
inputs=[],
outputs=[
list_function_python_script_list
]
)
# upload function python script
upload_function_python_script_button.click(
click_upload_function_python_script,
inputs=[
upload_function_python_script_files
],
outputs=[
upload_function_python_script_files
]
)
# delete function python script
delete_function_python_script_button.click(
click_delete_function_python_script,
inputs=[
delete_function_python_script_file
],
outputs=[
delete_function_python_script_file
]
)
# add and run
add_and_run_button.click(
add_and_run,
inputs=[
openai_api_key,
assistant_id, thread_id,
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model,
query,
],
outputs=[
assistant_id, thread_id,
chat_bot
],
)
# refresh
refresh_button.click(
refresh,
inputs=[
openai_api_key,
thread_id,
],
outputs=[
chat_bot
]
)
blocks.queue().launch()
return
if __name__ == '__main__':
main()