lawyer_assitance / main.py
qgyd2021's picture
[update]add 三国演义.txt
98d9a76
raw
history blame
20.5 kB
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
https://huggingface.co/spaces/fffiloni/langchain-chat-with-pdf-openai
"""
import argparse
import json
import logging
import time
from typing import List, Tuple
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
import gradio as gr
import openai
from openai import OpenAI
from threading import Thread
import _queue
from queue import Queue
import project_settings as settings
from project_settings import project_path
logger = logging.getLogger(__name__)
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--openai_api_key",
default=settings.environment.get("openai_api_key", default=None, dtype=str),
type=str
)
args = parser.parse_args()
return args
def greet(question: str, history: List[Tuple[str, str]]):
answer = "Hello " + question + "!"
result = history + [(question, answer)]
return result
def click_create_assistant(openai_api_key: str,
name: str,
instructions: str,
description: str,
tools: str,
files: List[str],
file_ids: str,
model: str,
):
logger.info("click create assistant, name: {}".format(name))
client = OpenAI(
api_key=openai_api_key,
)
# tools
tools = str(tools).strip()
if tools is not None and len(tools) != 0:
tools = tools.split("\n")
tools = [json.loads(tool) for tool in tools if len(tool.strip()) != 0]
else:
tools = list()
# files
if files is not None and len(files) != 0:
files = [
client.files.create(
file=open(file, "rb"),
purpose='assistants'
) for file in files
]
else:
files = list()
# file_ids
file_ids = str(file_ids).strip()
if file_ids is not None and len(file_ids) != 0:
file_ids = file_ids.split("\n")
file_ids = [file_id.strip() for file_id in file_ids if len(file_id.strip()) != 0]
else:
file_ids = list()
# assistant
assistant = client.beta.assistants.create(
name=name,
instructions=instructions,
description=description,
tools=tools,
file_ids=file_ids + [file.id for file in files],
model=model,
)
assistant_id = assistant.id
# thread
thread = client.beta.threads.create()
thread_id = thread.id
return assistant_id, thread_id
def click_list_assistant(openai_api_key: str) -> str:
client = OpenAI(
api_key=openai_api_key,
)
assistant_list = client.beta.assistants.list()
assistant_list = assistant_list.model_dump(mode="json")
result = ""
for a in assistant_list["data"]:
assis = "id: \n{}\nname: \n{}\ndescription: \n{}\n\n".format(a["id"], a["name"], a["description"])
result += assis
return result
def click_delete_assistant(openai_api_key: str,
assistant_id: str) -> str:
logger.info("click delete assistant, assistant_id: {}".format(assistant_id))
client = OpenAI(
api_key=openai_api_key,
)
try:
assistant_deleted = client.beta.assistants.delete(assistant_id=assistant_id)
result = "success" if assistant_deleted.deleted else "failed"
except openai.NotFoundError as e:
result = e.message
return result
def click_list_file(openai_api_key: str):
client = OpenAI(
api_key=openai_api_key,
)
file_list = client.files.list()
file_list = file_list.model_dump(mode="json")
result = ""
for f in file_list["data"]:
file = "id: \n{}\nfilename: \n{}\nbytes: \n{}\nstatus: \n{}\n\n".format(
f["id"], f["filename"], f["bytes"], f["status"]
)
result += file
return result
def click_delete_file(openai_api_key: str,
file_id: str) -> str:
logger.info("click delete file, file_id: {}".format(file_id))
client = OpenAI(
api_key=openai_api_key,
)
try:
assistant_deleted = client.files.delete(file_id=file_id)
result = "success" if assistant_deleted.deleted else "failed"
except openai.NotFoundError as e:
result = e.message
return result
def click_upload_files(openai_api_key: str,
files: List[str],
):
logger.info("click upload files, files: {}".format(files))
client = OpenAI(
api_key=openai_api_key,
)
result = list()
if files is not None and len(files) != 0:
files = [
client.files.create(
file=open(file, "rb"),
purpose='assistants'
) for file in files
]
file_ids = [file.id for file in files]
result.extend(file_ids)
return result
def get_message_list(client: OpenAI, thread_id: str):
"""
SyncCursorPage[ThreadMessage](
data=[
ThreadMessage(
id='msg_kb0f2fyDC6OwMyXxKbUpcuBS',
assistant_id='asst_DzVVZkE0dIGe0gsOdsdn3A0w',
content=[
MessageContentText(
text=Text(
annotations=[
TextAnnotationFileCitation(
end_index=44,
file_citation=TextAnnotationFileCitationFileCitation(
file_id='file-IwzwXQkixMu7fvgGoC1alIWu',
quote='念刘备、关羽、张飞,虽然异姓,既结为兄弟,则同心协力,救困扶危;上报国家,下安黎庶。不求同年同月同日生,只愿同年同月同日死。皇天后土,实鉴此心,背义忘恩,天人共戮!”誓毕,拜玄德为兄,关羽次之,张飞为弟'
),
start_index=34,
text='【7†source】',
type='file_citation'
)
],
value='刘备和张飞虽然是异姓,但他们结为了兄弟,其中刘备被拜为兄,而张飞为弟【7†source】。'
),
type='text'
)
],
created_at=1699493845,
file_ids=[],
metadata={},
object='thread.message',
role='assistant',
run_id='run_zJYZX0KFEvEh2VG5x5zSLq9s',
thread_id='thread_3JWRdjvZDJTBgZ0tlrrKXnrt'
),
ThreadMessage(
id='msg_tc5Tit7q19S5TSgvmBauME3H',
assistant_id=None,
content=[
MessageContentText(
text=Text(
annotations=[],
value='刘备和张飞是什么关系。'
),
type='text'
)
],
created_at=1699493838,
file_ids=[],
metadata={},
object='thread.message',
role='user',
run_id=None,
thread_id='thread_3JWRdjvZDJTBgZ0tlrrKXnrt'
)
],
object='list',
first_id='msg_kb0f2fyDC6OwMyXxKbUpcuBS',
last_id='msg_tc5Tit7q19S5TSgvmBauME3H',
has_more=False
)
"""
messages = client.beta.threads.messages.list(
thread_id=thread_id
)
# print(messages)
result = list()
for message in messages.data:
content = list()
for msg in message.content:
annotations = list()
for annotation in msg.text.annotations:
a = {
"start_index": annotation.start_index,
"end_index": annotation.end_index,
"text": annotation.text,
"type": annotation.type,
}
if annotation.type == "file_citation":
a["file_citation"] = {
"file_id": annotation.file_citation.file_id,
"quote": annotation.file_citation.quote,
}
annotations.append(a)
content.append({
"text": {
"annotations": annotations,
"value": msg.text.value,
},
"type": msg.type,
})
result.append({
"id": message.id,
"assistant_id": message.assistant_id,
"content": content,
"created_at": message.created_at,
"file_ids": message.file_ids,
"metadata": message.metadata,
"object": message.object,
"role": message.role,
"run_id": message.run_id,
"thread_id": message.thread_id,
})
result = list(sorted(result, key=lambda x: x["created_at"]))
return result
def convert_message_list_to_response(message_list: List[dict]) -> str:
response = ""
for message in message_list:
role = message["role"]
content = message["content"]
for c in content:
if c["type"] != "text":
continue
text: dict = c["text"]
msg = "{}: \n{}\n".format(role, text["value"])
response += msg
response += "-" * 80
response += "\n"
return response
def convert_message_list_to_conversation(message_list: List[dict]) -> List[Tuple[str, str]]:
conversation = list()
for message in message_list:
role = message["role"]
content = message["content"]
for c in content:
c_type = c["type"]
if c_type != "text":
continue
text: dict = c["text"]
if c_type == "text":
text_value = text["value"]
text_annotations = text["annotations"]
msg = text_value
for text_annotation in text_annotations:
a_type = text_annotation["type"]
if a_type == "file_citation":
msg += "\n\n"
msg += "\nquote: \n{}\nfile_id: \n{}".format(
text_annotation["file_citation"]["quote"],
text_annotation["file_citation"]["file_id"],
)
else:
raise NotImplementedError
if role == "assistant":
msg = [None, msg]
else:
msg = [msg, None]
conversation.append(msg)
return conversation
def streaming_refresh(openai_api_key: str,
thread_id: str,
queue: Queue,
):
delta_time = 0.3
last_response = None
no_updates_count = 0
max_no_updates_count = 5
while True:
time.sleep(delta_time)
this_response = refresh(openai_api_key, thread_id)
if this_response == last_response:
no_updates_count += 1
if no_updates_count >= max_no_updates_count:
break
last_response = this_response
queue.put(this_response, block=True, timeout=2)
return last_response
def refresh(openai_api_key: str,
thread_id: str,
):
client = OpenAI(
api_key=openai_api_key,
)
message_list = get_message_list(client, thread_id=thread_id)
print(message_list)
logger.info("message_list: {}".format(message_list))
conversation = convert_message_list_to_conversation(message_list)
return conversation
def add_and_run(openai_api_key: str,
assistant_id: str,
thread_id: str,
name: str,
instructions: str,
description: str,
tools: str,
files: List[str],
file_ids: str,
model: str,
query: str,
):
client = OpenAI(
api_key=openai_api_key,
)
if assistant_id is None or len(assistant_id.strip()) == 0:
assistant_id = click_create_assistant(
openai_api_key,
name, instructions, description, tools, files, file_ids, model
)
if thread_id is None or len(thread_id.strip()) == 0:
thread = client.beta.threads.create()
thread_id = thread.id
message = client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=query
)
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id,
)
run = client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
response_queue = Queue(maxsize=10)
refresh_kwargs = dict(
openai_api_key=openai_api_key,
thread_id=thread_id,
queue=response_queue,
)
thread = Thread(target=streaming_refresh, kwargs=refresh_kwargs)
thread.start()
delta_time = 0.1
last_response = None
no_updates_count = 0
max_no_updates_count = 10
while True:
time.sleep(delta_time)
try:
this_response = response_queue.get(block=True, timeout=2)
except _queue.Empty:
break
if this_response == last_response:
no_updates_count += 1
if no_updates_count >= max_no_updates_count:
break
last_response = this_response
result = [
assistant_id, thread_id,
last_response,
[]
]
yield result
def main():
args = get_args()
gr_description = """
OpenAI Assistant
"""
# ui
with gr.Blocks() as blocks:
gr.Markdown(value=gr_description)
with gr.Row():
# settings
with gr.Column(scale=3):
with gr.Tabs():
with gr.TabItem("create assistant"):
openai_api_key = gr.Text(
value=args.openai_api_key,
label="openai_api_key",
placeholder="Fill with your `openai_api_key`"
)
name = gr.Textbox(label="name")
instructions = gr.Textbox(label="instructions")
description = gr.Textbox(label="description")
model = gr.Dropdown(["gpt-4-1106-preview"], value="gpt-4-1106-preview", label="model")
# functions
tools = gr.TextArea(label="functions")
# upload files
retrieval_files = gr.Files(label="retrieval_files")
retrieval_file_ids = gr.TextArea(label="retrieval_file_ids")
# create assistant
create_assistant_button = gr.Button("create assistant")
with gr.TabItem("list assistant"):
list_assistant_button = gr.Button("list assistant")
assistant_list = gr.TextArea(label="assistant_list")
delete_assistant_id = gr.Textbox(label="delete_assistant_id")
delete_assistant_button = gr.Button("delete assistant")
with gr.TabItem("list file"):
list_file_button = gr.Button("list file")
file_list = gr.TextArea(label="file_list")
delete_file_id = gr.Textbox(label="delete_file_id")
delete_file_button = gr.Button("delete file")
upload_files = gr.Files(label="upload_files")
upload_files_button = gr.Button("upload file")
# chat
with gr.Column(scale=5):
chat_bot = gr.Chatbot(label="conversation", height=600)
query = gr.Textbox(lines=1, label="query")
with gr.Row():
with gr.Column(scale=1):
add_and_run_button = gr.Button("Add and run")
with gr.Column(scale=1):
refresh_button = gr.Button("Refresh")
# states
with gr.Column(scale=2):
assistant_id = gr.Textbox(value=None, label="assistant_id")
thread_id = gr.Textbox(value=None, label="thread_id")
# examples
with gr.Row():
gr.Examples(
examples=[
[
"Math Tutor",
"You are a personal math tutor. Write and run code to answer math questions.",
"Official math test cases",
None,
None,
"gpt-4-1106-preview",
"123 * 524 等于多少?"
],
[
"小说专家",
"根据小说内容回答问题。",
"三国演义文档问答测试",
"{\"type\": \"retrieval\"}",
[
(project_path / "data/三国演义.txt").as_posix()
],
"gpt-4-1106-preview",
"刘备和张飞是什么关系。"
],
],
inputs=[
name, instructions, description, tools, retrieval_files, model,
query,
],
examples_per_page=5
)
# create assistant
create_assistant_button.click(
click_create_assistant,
inputs=[
openai_api_key,
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model,
],
outputs=[
assistant_id, thread_id
]
)
# list assistant
list_assistant_button.click(
click_list_assistant,
inputs=[
openai_api_key
],
outputs=[
assistant_list
]
)
# delete assistant button
delete_assistant_button.click(
click_delete_assistant,
inputs=[
openai_api_key,
delete_assistant_id
],
outputs=[
delete_assistant_id
]
)
# list file
list_file_button.click(
click_list_file,
inputs=[
openai_api_key
],
outputs=[
file_list
],
)
# delete file
delete_file_button.click(
click_delete_file,
inputs=[
openai_api_key,
delete_file_id
],
outputs=[
delete_file_id
]
)
# upload files
upload_files_button.click(
click_upload_files,
inputs=[
openai_api_key,
upload_files
],
outputs=[
]
)
# add and run
add_and_run_button.click(
add_and_run,
inputs=[
openai_api_key,
assistant_id, thread_id,
name, instructions, description, tools, retrieval_files, retrieval_file_ids, model,
query,
],
outputs=[
assistant_id, thread_id,
chat_bot
],
)
# refresh
refresh_button.click(
refresh,
inputs=[
openai_api_key,
thread_id,
],
outputs=[
chat_bot
]
)
blocks.queue().launch()
return
if __name__ == '__main__':
main()